• 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html
  • 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html

ReentrantLock 实现原理

技术杂谈 勤劳的小蚂蚁 3个月前 (01-26) 64次浏览 已收录 0个评论 扫描二维码

使用 synchronize 来做同步处理时,锁的获取和释放都是隐式的,实现的原理是通过编译后加上不同的机器指令来实现。
ReentrantLock 就是一个普通的类,它是基于 AQS(AbstractQueuedSynchronizer)来实现的。
是一个重入锁:一个线程获得了锁之后仍然可以反复的加锁,不会出现自己阻塞自己的情况。
AQSJava 并发包里实现锁、同步的一个重要的基础框架。

锁类型

ReentrantLock 分为公平锁非公平锁,可以通过构造方法来指定具体类型:
  1.    //默认非公平锁
  2.    publicReentrantLock(){
  3.        sync =newNonfairSync();
  4.    }
  5.    //公平锁
  6.    publicReentrantLock(boolean fair){
  7.        sync = fair ?newFairSync():newNonfairSync();
  8.    }
默认一般使用非公平锁,它的效率和吞吐量都比公平锁高的多(后面会分析具体原因)。

获取锁

通常的使用方式如下:
  1.    privateReentrantLock lock =newReentrantLock();
  2.    publicvoid run(){
  3.        lock.lock();
  4.        try{
  5.            //do bussiness
  6.        }catch(InterruptedException e){
  7.            e.printStackTrace();
  8.        }finally{
  9.            lock.unlock();
  10.        }
  11.    }

公平锁获取锁

首先看下获取锁的过程:
  1.    publicvoid lock(){
  2.        sync.lock();
  3.    }
可以看到是使用 sync的方法,而这个方法是一个抽象方法,具体是由其子类( FairSync)来实现的,以下是公平锁的实现:
  1.        finalvoid lock(){
  2.            acquire(1);
  3.        }
  4.        //AbstractQueuedSynchronizer 中的 acquire()
  5.        publicfinalvoid acquire(int arg){
  6.        if(!tryAcquire(arg)&&
  7.            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  8.            selfInterrupt();
  9.        }
第一步是尝试获取锁( tryAcquire(arg)),这个也是由其子类实现:
  1.        protectedfinalboolean tryAcquire(int acquires){
  2.            finalThread current =Thread.currentThread();
  3.            int c = getState();
  4.            if(c ==0){
  5.                if(!hasQueuedPredecessors()&&
  6.                    compareAndSetState(0, acquires)){
  7.                    setExclusiveOwnerThread(current);
  8.                    returntrue;
  9.                }
  10.            }
  11.            elseif(current == getExclusiveOwnerThread()){
  12.                int nextc = c + acquires;
  13.                if(nextc <0)
  14.                    thrownewError("Maximum lock count exceeded");
  15.                setState(nextc);
  16.                returntrue;
  17.            }
  18.            returnfalse;
  19.        }
  20.    }
首先会判断 AQS 中的 state 是否等于 0,0 表示目前没有其他线程获得锁,当前线程就可以尝试获取锁。
注意:尝试之前会利用 hasQueuedPredecessors() 方法来判断 AQS 的队列中中是否有其他线程,如果有则不会尝试获取锁(这是公平锁特有的情况)。
如果队列中没有线程就利用 CAS 来将 AQS 中的 state 修改为1,也就是获取锁,获取成功则将当前线程置为获得锁的独占线程( setExclusiveOwnerThread(current))。
如果 state 大于 0 时,说明锁已经被获取了,则需要判断获取锁的线程是否为当前线程( ReentrantLock 支持重入),是则需要将 state+1,并将值更新。

写入队列

如果 tryAcquire(arg) 获取锁失败,则需要用 addWaiter(Node.EXCLUSIVE) 将当前线程写入队列中。
写入之前需要将当前线程包装为一个 Node 对象( addWaiter(Node.EXCLUSIVE))。
AQS 中的队列是由 Node 节点组成的双向链表实现的。
包装代码:
  1.    privateNode addWaiter(Node mode){
  2.        Node node =newNode(Thread.currentThread(), mode);
  3.        // Try the fast path of enq; backup to full enq on failure
  4.        Node pred = tail;
  5.        if(pred !=null){
  6.            node.prev = pred;
  7.            if(compareAndSetTail(pred, node)){
  8.                pred.next = node;
  9.                return node;
  10.            }
  11.        }
  12.        enq(node);
  13.        return node;
  14.    }
首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node); 来写入了。
  1.    privateNode enq(finalNode node){
  2.        for(;;){
  3.            Node t = tail;
  4.            if(t ==null){// Must initialize
  5.                if(compareAndSetHead(newNode()))
  6.                    tail = head;
  7.            }else{
  8.                node.prev = t;
  9.                if(compareAndSetTail(t, node)){
  10.                    t.next = node;
  11.                    return t;
  12.                }
  13.            }
  14.        }
  15.    }
这个处理逻辑就相当于 自旋加上 CAS 保证一定能写入队列。

挂起等待线程

写入队列之后需要将当前线程挂起(利用 acquireQueued(addWaiter(Node.EXCLUSIVE),arg)):
  1.    finalboolean acquireQueued(finalNode node,int arg){
  2.        boolean failed =true;
  3.        try{
  4.            boolean interrupted =false;
  5.            for(;;){
  6.                finalNode p = node.predecessor();
  7.                if(p == head && tryAcquire(arg)){
  8.                    setHead(node);
  9.                    p.next =null;// help GC
  10.                    failed =false;
  11.                    return interrupted;
  12.                }
  13.                if(shouldParkAfterFailedAcquire(p, node)&&
  14.                    parkAndCheckInterrupt())
  15.                    interrupted =true;
  16.            }
  17.        }finally{
  18.            if(failed)
  19.                cancelAcquire(node);
  20.        }
  21.    }
首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。
如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理( shouldParkAfterFailedAcquire(p,node))。
waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。
shouldParkAfterFailedAcquire(p,node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt()
  1.    privatefinalboolean parkAndCheckInterrupt(){
  2.        LockSupport.park(this);
  3.        returnThread.interrupted();
  4.    }
他是利用 LockSupportpart 方法来挂起当前线程的,直到被唤醒。

非公平锁获取锁

公平锁与非公平锁的差异主要在获取锁:
公平锁就相当于买票,后来的人需要排到队尾依次买票,不能插队
而非公平锁则没有这些规则,是抢占模式,每来一个人不会去管队列如何,直接尝试获取锁。
非公平锁:
  1.        finalvoid lock(){
  2.            //直接尝试获取锁
  3.            if(compareAndSetState(0,1))
  4.                setExclusiveOwnerThread(Thread.currentThread());
  5.            else
  6.                acquire(1);
  7.        }
公平锁:
  1.        finalvoid lock(){
  2.            acquire(1);
  3.        }
还要一个重要的区别是在尝试获取锁时 tryAcquire(arg),非公平锁是不需要判断队列中是否还有其他线程,也是直接尝试获取锁:
  1.        finalboolean nonfairTryAcquire(int acquires){
  2.            finalThread current =Thread.currentThread();
  3.            int c = getState();
  4.            if(c ==0){
  5.                //没有 !hasQueuedPredecessors() 判断
  6.                if(compareAndSetState(0, acquires)){
  7.                    setExclusiveOwnerThread(current);
  8.                    returntrue;
  9.                }
  10.            }
  11.            elseif(current == getExclusiveOwnerThread()){
  12.                int nextc = c + acquires;
  13.                if(nextc <0)// overflow
  14.                    thrownewError("Maximum lock count exceeded");
  15.                setState(nextc);
  16.                returntrue;
  17.            }
  18.            returnfalse;
  19.        }

释放锁

公平锁和非公平锁的释放流程都是一样的:
  1.    publicvoid unlock(){
  2.        sync.release(1);
  3.    }
  4.    publicfinalboolean release(int arg){
  5.        if(tryRelease(arg)){
  6.            Node h = head;
  7.            if(h !=null&& h.waitStatus !=0)
  8.                   //唤醒被挂起的线程
  9.                unparkSuccessor(h);
  10.            returntrue;
  11.        }
  12.        returnfalse;
  13.    }
  14.    //尝试释放锁
  15.    protectedfinalboolean tryRelease(int releases){
  16.        int c = getState()- releases;
  17.        if(Thread.currentThread()!= getExclusiveOwnerThread())
  18.            thrownewIllegalMonitorStateException();
  19.        boolean free =false;
  20.        if(c ==0){
  21.            free =true;
  22.            setExclusiveOwnerThread(null);
  23.        }
  24.        setState(c);
  25.        return free;
  26.    }        
首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。
释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。

总结

由于公平锁需要关心队列的情况,得按照队列里的先后顺序来获取锁(会造成大量的线程上下文切换),而非公平锁则没有这个限制。
所以也就能解释非公平锁的效率会被公平锁更高。


丨极客文库, 版权所有丨如未注明 , 均为原创丨
本网站采用知识共享署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议进行授权
转载请注明原文链接:ReentrantLock 实现原理
喜欢 (0)
[247507792@qq.com]
分享 (0)
勤劳的小蚂蚁
关于作者:
温馨提示:本文来源于网络,转载文章皆标明了出处,如果您发现侵权文章,请及时向站长反馈删除。

您必须 登录 才能发表评论!

  • 精品技术教程
  • 编程资源分享
  • 问答交流社区
  • 极客文库知识库

客服QQ


QQ:2248886839


工作时间:09:00-23:00