AQS原理(一):基于ReentrantLock

2018-11-26  本文已影响7人  放开那个BUG

AQS介绍

在使用者的角度上,AQS的功能分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的 API,要么使用了共享锁的功能,而不会同时使用两套 API,就算是 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套 API 来实现的

独占锁

解读AQS可以从使用了它独占控制功能的子类 ReentrantLock 说起,分析 ReentrantLock 的同时看一看 AQS 的实现,再推理出 AQS 独特的设计思路和实现方式。最后,再看其共享控制功能的实现。

一般用ReentrantLock,都是这么使用:

        reentrantLock.lock()
        //do something
        reentrantLock.unlock()

ReentrantLock保证 do something 在同一个时间只有一个线程执行这段代码,其他线程被挂起,直到获取锁。所以,ReentrantLock 实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock 使用的就是 AQS 的独占 API 实现的。

我们从 ReentrantLock 的实现开始一起看看重入锁是怎么实现的。

竞争锁

找到ReentrantLock类的lock()方法:

    public void lock() {
        sync.lock();
    }

ReentrantLock 内部有代理类完成具体操作,ReentrantLock 只是封装了统一的一套 API 而已。并且,ReentrantLock 又分为公平锁和非公平锁,所以,ReentrantLock 内部只有两个 sync 的实现:


公平锁:每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁,类似于排队吃饭。

非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用 lock 方法的先后顺序无关,类似于堵车时,加塞的那些 XXXX。

ReentrantLock或者AQS的实现简述如下:有一个被volatile修饰的标志位被叫做key,有没有线程拿走了锁,还需要一个线程安全的队列,维护一堆被挂起的线程。当锁被归还时,能通知这些被挂起的线程,可以来竞争锁。

而公平锁和非公平锁,唯一的区别是获取锁的时候是先进入队列排队再获取锁,还是先获取锁,不成功再进入队列。

公平锁
首先我们看一下ReentrantLock中的公平锁的实现:

        final void lock() {
            acquire(1);
        }

acquire()方法调用的是AQS的acquire()方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

此方法先尝试获取锁,获取不到则创建一个waiter(当前线程)后放入到队列中。

点击tryAcquire()方法:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

留空了,Doug Lea 是想留给子类去实现(既然要给子类实现,应该用抽象方法,但是 Doug Lea 没有这么做,原因是 AQS 有两种功能,面向两种使用场景,需要给子类定义的方法都是抽象方法了,会导致子类无论如何都需要实现另外一种场景的抽象方法,显然,这对子类来说是不友好的。)

既然是子类的tryAcquire()方法,我们查看FairSync的tryAcquire()方法(FairSync几次Sync,而Sync继承AQS):

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

首先调用了AQS的getState()方法,因为在 AQS 里面有个叫 state 的标志位 :

    /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

那个state就是我们前面所说的key。

继续FairSync的tryAcquire()方法:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();// 获取当前线程 
            int c = getState();// 获取父类 AQS 中的标志位 
            if (c == 0) {
                 // 如果队列中没有其他线程  说明没有线程正在占有锁!
                if (!hasQueuedPredecessors() &&
                    // 修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来 
                    //的,从上面的图中可以知道,这个值是写死的 1
                    compareAndSetState(0, acquires)) {  
                   // 如果通过 CAS 操作将状态为更新成功则代表当前线程获取锁,因            
                  //此,将当前线程设置到 AQS 的一个变量中,说明这个线程拿走了锁。
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果不为 0 意味着,锁已经被拿走了,但是,因为 ReentrantLock 是重        
           //入锁,是可以重复 lock,unlock 的,只要成对出现行。一次。这里还要再判 
            //断一次 获取锁的线程是不是当前请求锁的线程。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;  // 如果是的,累加在 state 字段上就可以了。
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

到此,如果如果获取锁,tryAcquire 返回 true,反之,返回 false,回到 AQS 的 acquire 方法。如果没有获取到锁,按照我们的描述,应该将当前线程放到队列中去,只不过,在放之前,需要做些包装。

先看 addWaiter 方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

用当前线程去构造一个Node对象,node是一个表示 Node 类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS 的这个队列中,哪些节点是独占的,哪些是共享的。

创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过 cas 方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。

将线程的节点接入到队里中后,当然还需要做一件事: 将当前线程挂起!这个事,由 acquireQueued 来做。

在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构,我们知道,队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。

而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):


黄色结点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的结点(tail 后面)后面。这个从 enq ()方法可以看出来,上文中有提到 enq ()方法为将节点插入队列的方法:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

接下来看AQS的acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
             // 如果当前的节点是 head 说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的。
                    setHead(node);// 成功后,将上图中的黄色节点移除,Node1 变成头节点。
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && 
                // 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。
                    parkAndCheckInterrupt()) 
               // 如果需要,借助 JUC 包下的 LockSopport 类的静态方法 Park 挂起当前线程。知道被唤醒。
                    interrupted = true;
            }
        } finally {
            if (failed) // 如果有异常 
                cancelAcquire(node);// 取消请求,对应到队列操作,就是将当前节点从队列中移除。
        }
    }

上面的代码有几处需要说明:
1.Node结点中,除了存储当前线程,结点类型,队列中前后元素的变量,还有一个叫waitStatus的变量,用于描述结点的状态。
原因是:AQS的队列中,在有并发时,会存取一定数量的结点,每个结点中都有表示线程状态的量,如果有些线程等不及获取锁,需要放弃竞争,退出队列;有些线程等待一些条件满足后才能执行(这里的描述很像某个 J.U.C 包下的工具类,ReentrankLock 的 Condition,事实上,Condition 同样也是 AQS 的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫 waitStatus, 它有四种状态:

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

分别表示:

  • 1.结点取消
  • 2.结点等待触发
  • 3.结点等待条件
  • 4.结点状态需要向后传播
    只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。
  1. 对线程的挂起及唤醒操作是通过使用 UNSAFE 类调用 JNI 方法实现的。当然,还提供了挂起指定时间后唤醒的 API,在后面我们会讲到。

到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到 AQS 队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS 的队列为 FIFO 队列,所以,每次被 CPU 假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS 通过这样的方式,实现了竞争的排队策略。

释放锁

释放锁的流程简述如下:因为获取锁的线程的结点在AQS的头节点位置,所有先应该释放锁,再将头节点移除,然后让后一个做头节点,并通知它来竞争锁。

我们可以先从ReentrantLock的FairSync来说明:

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unlock()方法调用了AQS中的release()方法,同样传入了参数1。

同样,release()方法为空方法,子类自己实现逻辑

protected final boolean tryRelease(int releases) {
            int c = getState() - releases; 
            if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {// 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁。
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
 }

释放锁,成功后,继续执行unparkSuccessor()方法:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

值得注意的是,寻找的顺序是从队列尾部开始往前去找的最前面的一个 waitStatus 小于 0 的节点。

到此,ReentrantLock 的 lock 和 unlock 方法已经基本解析完毕了,唯独还剩下一个非公平锁 NonfairSync 没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那 ReentrantLock 是怎么实现的呢?再看两段代码:

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

非公平锁的 lock 方法的处理方式是: 在 lock 的时候先直接 cas 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

          .......
      }

而对于公平锁:则是老老实实的开始就走 AQS 的流程排队获取锁。如果前面有人调用过其 lock 方法,则排在队列中前面,也就更有机会更早的获取锁,从而达到“公平”的目的。

参考资料

https://www.infoq.cn/article/jdk1.8-abstractqueuedsynchronizer

上一篇下一篇

猜你喜欢

热点阅读