JUC原理之AQS

2020-03-16  本文已影响0人  AlienPaul

AQS

AQS即AbstractQueuedSynchronizer(抽象队列同步器)。AQS是ReentrantLockReentrantReadWriteLockCountDownLatchSemaphore等多线程同步工具的基础。

AQS内部维护了一个双向链表,用于保存因为等待获取锁而阻塞的线程,所有获取同步状态失败的线程会依次追加到链表末尾。

AQS内部双向队列示意图

锁的类型

AQS可用来构造各种各样的锁。这些锁主要分为几种类型呢?

CAS操作

CAS即Compare And Set,字面意思是比较并设置。CAS操作是原子性的。

CAS操作用于更新值。首先对比变量原值,如果相同,再设置变量为新的值,中间过程不会被其他线程打断。

CAS操作底层由Unsafe实现。Unsafe为sun公司提供的一系列内存操作native方法的工具包,由于使用了类似C语言指针操作内存的方式,性能很高,但是具有很大的风险。通常来说我们无法在自己的应用中直接使用这个类。

Unsafe提供了如下几个方法:

这三个方法针对Object,int和long三种类型提供了CAS功能。

接下来有个问题,这些方法中的offset是如何获取的?

比如我们有一个class:

public class Demo {
    private volatile int first;
    private volatile int second;
}

获取second这个int变量offset的方法为:

long offset = unsafe.objectFieldOffset(Demo.class.getDeclaredField("second"));

使用这个offset,我们就可以通过Unsafe等帮助来使用CAS设置second这个变量的内容了。

compareAndSwapInt(demo, offset, 0, 1);

compareAndSwap系列方法是有返回值的。如果返回true说明CAS操作成功,如果为false说明操作失败,通常原因为目标变量的值和期待的值不相等(有其他线程同时操作这个变量)。

AQS的几个成员变量

下文要提到的3个AQS的成员变量如下:

Node

Node是AQS中最关键的内部类,用来代表双向链表的节点。Node中有如下成员变量:

Node的状态有5种:

AQS几个重要的方法

AQS作为一个同步框架,可以衍生出不同的线程同步工具。AQS是一个抽象类,是否能够获取同步状态,唤醒等待线程等操作需要在AQS各个行为不同的子类中分别定义。这些方法最重要的是如下4个:

除此之外,AQS还提供了几个常用的工具方法供实现类调用。这些方法如下:

这些方法的代码在后面用到的时候分析。

独占模式执行过程

获取同步状态

独占模式使用acquire方法获取同步状态。

方法代码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法的逻辑如下所示:

  1. 执行tryAcquire方法。这个方法返回是否能够获取到同步状态。需要在用户实现类中编写相关逻辑。
  2. tryAcquire返回true,acquire方法直接返回,代表获取到同步状态,线程可以继续执行。
  3. 如果tryAcquire方法返回false。当前thread会被封装为Node,加入到等待队列。
  4. 入队列后执行acquireQueued方法,尝试获取同步状态,如果这个节点前置节点的状态为SIGNAL,会进入阻塞状态。如果前置节点不是SIGNAL也不是CANCELLED状态,CAS设置它为SIGNAL状态,然后自己进入阻塞状态。

入队等待操作

addWaiter方法

addWaiter将线程包装为一个node,放入AQS队列的tail。

注意:这个地方用到了Node类的构造函数Node(Thread thread, Node mode)。其中mode决定了同步模式,可选值为Node.EXCLUSIVE(独占,同时只有一个线程处于同步状态)和Node.SHARED(可以有多个线程处于同步状态)。

// 对于tryAcquire方法,由于它是独占模式,这里参数传入的是Node.EXCLUSIVE
private Node addWaiter(Node mode) {
    // 将当前线程包装为一个Node,使用独占模式: {node.prev=null, node.next=null}
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 下面这一段是快速入队尾的逻辑,如果失败需要自旋入队尾
    Node pred = tail;
    // 如果tail不为空
    if (pred != null) {
        // 设置node前置节点为tail节点
        node.prev = pred; // {node.prev=tail, node.next=null}
        // 使用CAS设置新创建的节点为tail
        if (compareAndSetTail(pred, node)) {
            // 如果设置成功,设置原来tail节点的下一个节点为这个新添加的node
            // 然后返回
            pred.next = node;
            return node;
        }
    }
    // 如果tail为空,或者是CAS设置tail失败,执行这段逻辑
    enq(node);
    // 返回新创建的node
    return node;
}

enq(node)方法,该方法负责初始化队列的head和tail,同时将参数node设置为tail。相当于慢速版的设置tail方法。代码如下:

private Node enq(final Node node) {
    // 自旋反复尝试
    for (;;) {
        Node t = tail;
        // 如果tail为null,说明AQS的等待队列第一次使用,没有任何node,尚未初始化,执行初始化逻辑
        if (t == null) {
            // 创建一个空的node,并将其设置为tail和head
            // 然后再次执行该循环,进入else分支
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 设置node的前置节点为tail
            node.prev = t;
            // CAS尝试入队尾
            // 如果失败,自旋反复尝试直到成功
            if (compareAndSetTail(t, node)) {
                // 如果成功,设置原来tail的后置节点为node,并退出循环返回node的前置节点
                t.next = node;
                return t;
            }
        }
    }
}

用一个例子说明:
如果thread1第一个入队,首先初始化队列:

head:new Node()
tail:new Node()

然后CAS设置thread1所在的node为tail:

head:{id=0, prev=null, next=1, thread=null}
tail: {id=1, prev=0, next=null, thread=thread1}

注意:这里id是为了指代方便故意添加的,实际node没有这个属性。

同理,如果再入队thread2,队列节点依次为:

head:{id=0, prev=null, next=1, thread=null}
{id=1, prev=0, next=2, thread=thread1}
tail: {id=2, prev=1, next=null, thread=thread2}

acquireQueued方法

acquireQueued方法负责:

代码如下:

// 返回当前线程是否被中断
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 获取node的前置节点
            final Node p = node.predecessor();
            // 如果node的前置节点为head,尝试获取同步状态。head是一个虚节点,真正排队等待的节点位于head之后
            if (p == head && tryAcquire(arg)) {
                // tryAcquire如果成功,将node设置为head
                setHead(node);
                // 设置原来head节点的next为null,方便GC
                // 原来head节点被移除队列
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果node的前置节点不为head或者tryAcquire失败
            // 执行shouldParkAfterFailedAcquire判断是否需要阻塞
            // parkAndCheckInterrupt会阻塞当前线程,如果线程被中断,方法返回true,然后interrupted变量被设置为true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果中间发生异常,取消获取同步状态
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire方法。

该方法判断node前置节点的waitStatus。有如下3种情况:

代码如下所示:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果前置节点的状态为SIGNAL,返回true,可以阻塞当前线程
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        // 如果前置节点的状态为cancelled,则从后向前遍历,找到最近的一个状态不为cancelled的节点
        // 并将该节点的下一个节点设计为当前节点。(中间一连串状态为cancelled的节点被移除队列)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // CAS设置前置节点的状态为SIGNAL
        // 可能会问这里为何不用确保CAS成功
        // 因为执行到这里,方法会返回false,线程不会阻塞
        // 外层方法acquireQueued会保持自旋,再次调用此方法
        // CAS会再次执行,直到成功
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

下面举例说明下整个步骤。

队列初始状态为:

head: {thread: null, ws: 0}
tail: {thread: thread1, ws: 0}

第一次执行完该方法后,由于前置节点的状态为0,将前置节点的状态设置为SIGNAL,返回false。
现在队列中的节点为:

head: {thread: null, ws: SIGNAL}
tail: {thread: thread1, ws: 0}

由于shouldParkAfterFailedAcquire返回false,acquireQueued方法会重复执行。会再次tryAcquire,若成功,则获取到同步状态,方法返回,线程继续执行。若失败,执行shouldParkAfterFailedAcquire。这时候当前节点的前置节点状态为SIGNAL,会执行parkAndCheckInterrupt,线程会被阻塞。

parkAndCheckInterrupt方法负责阻塞线程。如果线程被唤醒,返回线程是否被中断。代码如下:

private final boolean parkAndCheckInterrupt() {
    // 当前线程阻塞,直到被unpark,或者被中断时候返回
    // 返回线程的中断状态
    LockSupport.park(this);
    return Thread.interrupted();
}

LockSupport.park()方法用于手工阻塞线程的运行。

最后分析下cancelAcquire的逻辑。

cancelAcquire方法用于取消节点的排队过程。代码和分析如下:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // node为null,返回
    if (node == null)
        return;

    // 清除node的thread变量
    node.thread = null;

    // Skip cancelled predecessors
    // 剔除队列中在该node之前,连续的状态为CANCELLED的node
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 获取队列中从后向前最近一个状态不为CANCELLED节点的下一个节点
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 设置node状态为CANCELLED
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        // 如果node在队尾,CAS设置对尾为pred(队列中从后向前最近一个状态不为CANCELLED节点)
        // 如果CAS成功,再CAS设置tail节点的next节点为null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 如果pred不是head
        // 确保pred节点的状态为SIGNAL
        // 同时pred的thread不能为null
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                // 队列跳过这个CANCELLED的node
                // 设置pred节点的next为node的next节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 否则(pred为head或者CAS设置pred状态为SIGNAL失败),需要唤醒后面排队的线程
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

释放同步状态

release方法

释放同步状态为release方法。代码和解释如下:

public final boolean release(int arg) {
    // 如果tryRelease返回true,说明需要唤醒队列中排队的线程
    if (tryRelease(arg)) {
        Node h = head;
        // head不为空并且状态不为0(此时为SIGNAL)之时,唤醒后续节点
        // (状态为SIGNAL之后的节点才会被park)
        if (h != null && h.waitStatus != 0)
            // 唤醒head节点的后置节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

unparkSuccessor方法

unparkSuccessor方法设置node节点的状态为0,然后向后找到距离node最近的状态不为cancelled的节点,唤醒它的线程。

代码如下:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    // 如果是除cancelled外的其他状态,设置head状态为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // s为head的后置节点
    Node s = node.next;
    // 如果s节点为null或者状态为cancelled
    // 从后向前遍历,找到距离head最近状态不为cancelled的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒这个节点的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享模式执行过程

获取同步状态

获取同步状态的入口为acquireShared方法。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

其中tryAcquireShared的返回值含义为:

如果tryAcquireShared返回值小于0,说明说去同步状态失败,执行doAcquireShared方法。

doAcquireShared对应独占模式的acquireQueued方法。逻辑基本类似。

代码如下:

private void doAcquireShared(int arg) {
    // 节点加入队尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 获取tryAcquireShared返回值
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 和独占模式的不同之处,这里需要向后置节点传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate方法用于设置node为head,并且向后续节点传递release事件。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 设置node为新的head节点
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    // 如果propagate > 0 
    // 或者原先head节点为null,或者状态不为cancelled之时,都会执行
    // 或者目前head节点为null,或者状态不为cancelled之时,也会执行(head可能会在运行行代码之前被修改)
    // 这段逻辑为传播过程,唤醒后续的节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next; 
        // 如果s节点为null(node位于队列尾部),或者后置节点类型为共享节点的话,唤醒后面的节点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

这里可能有疑问,为什么要传递release事件。以ReentrantReadWriteLock为例,这个lock的read锁是共享的,write锁是独占的。如果write锁释放之后,某个线程获得了read锁,这时候通过传递release事件,后续等待read锁的线程才能够唤醒并继续执行。

doReleaseShared方法:

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    // 自旋
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果head节点状态为SIGNAL
            if (ws == Node.SIGNAL) {
                // 使用CAS操作,设置head状态为0,如果失败再次尝试
                // (1)
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后置节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                // 设置h节点状态为PROPAGATE,表示下一次acquireShared执行时此节点无条件propagate
                // setHeadAndPropagate方法中判断时该node的waitStatus小于0,可以向后propagate
                // 共享模式中的doReleaseShared可能会被多个线程同时执行
                // 如果一个线程刚执行完(1),另一线程就会遇到ws为0的情况
                continue;                // loop on failed CAS
        }
        
        // 这里用来传播release操作
        // setHeadAndPropagate会改变head,如果有线程tryAcquire成功,执行完setHeadAndPropagate方法的setHead方法,head会发生变化,说明此时有线程被唤醒,再次循环doReleaseShared,尽管被唤醒的线程也有可能同时执行doReleaseShared,但这样不会影响release的传播
        // 如果head没有改变,说明没有线程被唤醒(后续线程tryAcquireShared返回值不大于0),此时传播行为终止,退出循环
        // 也有可能是被唤醒的线程获取到同步状态后尚未执行到setHeadAndPropagate的setHead方法,此时head没有改变。尽管这时候退出循环,但是被唤醒的线程仍有机会执行doReleaseShared。不影响release的传播
        if (h == head)                   // loop if head changed
            break;
    }
}

释放同步状态

释放同步状态的入口方法为releaseShared。代码如下:

public final boolean releaseShared(int arg) {
    // 如果tryReleaseShared返回true,说明需要唤醒队列中等待的线程
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

其中doReleaseShared前面已经分析过,此处不再赘述。

带有时间限制的获取锁的方法

AQS还支持在允许时间内获取同步状态的方法。如果立刻获取成功,或者在允许时间内获取成功返回true,否则在等待最大允许时间之后,返回false。

下面以有限时间内获取独占锁的方法说明下工作原理。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 计算超时的时刻
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 计算还剩下多少时间
            nanosTimeout = deadline - System.nanoTime();
            // 如果超时,返回false
            if (nanosTimeout <= 0L)
                return false;
            // spinForTimeoutThreshold为1000纳秒
            // 如果剩下的时间多于spinForTimeoutThreshold,再阻塞当前线程
            // 否则不必阻塞,保持自旋即可
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

该方法中使用了spinForTimeoutThreshold变量来优化。线程的阻塞和唤醒过程是比较耗时的。如果线程只需要“阻塞”等待很小的一段时间,为了保正线程从阻塞状态恢复的速度,不要使用Locksupport.park。可以让线程自旋,虽然会消耗一定的CPU资源,但是线程一直保持执行状态,跳出自旋状态的耗时要比中阻塞状态中恢复小得多。

带有时间限制的获取共享锁的方法和独占锁的类似,不再赘述。

上一篇下一篇

猜你喜欢

热点阅读