Java

超详细!AQS(AbstractQueuedSynchroniz

2019-10-14  本文已影响0人  洞庭湖上的麻雀

Doug Lea 在 java.util.concurrent(JUC)中提供一套基础工具用于帮助开发者更加方便的开发并发程序,包括 LockSemaphoreCountDownLatchCyclicBarrier等等,而实现这些类的实现都借助了一个能够控制多个线程的并发访问的工具,那就是 AbstractQueuedSynchronizer(AQS)。

AQS 的数据结构形式如下图所示,其维护了一个 FIFO 的双向队列,尝试获取锁的线程都以节点的形式存在于队列中


aqs.png

源码分析

在对源码分析之前,首先需要了解一些基础的内容。

首先,锁分为两种,独占锁和共享锁,顾名思义,独占锁是指最多同时只能有一个线程获取到锁,而共享锁则允许最多 n 个线程同时获取到锁。根据在获取锁的过程中是否响应中断请求,可分为响应中断和不响应中断的请求。

其次,每个节点都有其对应的状态,初始状态为0。

// 等待超时或被中断,取消获取锁
static final int CANCELLED =  1;
// 说明该节点的后续被挂起了,当释放锁或取消时,需要唤醒后继节点
static final int SIGNAL    = -1;
// 表示节点处于Condition队列中
static final int CONDITION = -2;
// 用于共享式锁,表示下一次尝试获取共享锁时,需要无条件传播下去
static final int PROPAGATE = -3;

为了更好的理解源码,我会通过在源码的基础上增加注释的方式对源码进行解释(英文注释为源码本来的注释)。对于有方法调用的地方,可以直接跳到对应方法的讲解,按流程一步步理解,也可以通过注释了解整个方法的步骤,再细看之前调用的每个方法。

独占锁

不响应中断的独占锁获取

/**
* 不响应中断的独占锁获取入口
* 其中 tryAcuquire() 方法为获取锁的抽象方法,返回 true 表示获取锁成功,需要实现类根据获取锁的方式自己定义
*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 如果 tryAcquire() 获取锁失败,则通过 addWaiter() 加入到同步队列中,再通过 acquireQueued() 不断尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //由于不响应中断,如果检测到中断,acquireQueued() 会返回 true,进入方法体
        // 由于检测时使用了 Thread.interrupted(),中断标志被重置,需要恢复中断标志
        selfInterrupt();
}

/**
* 将线程信息包装成一个 Node 加入到同步队列的队尾中
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 尝试通过一次 CAS 将节点加入到队尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 走到这里说明要么有竞争 CAS 失败,要么同步器队列还没初始化即 pred == null
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 无限循环 CAS 直到将节点加入到队尾中
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
* 因获取锁失败而加入同步队列中的线程在这里不断尝试获取锁
* 返回中断状态交由上层函数处理
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 由于这是一个先进先出的队列,只有当自己的前驱是头结点(头结点表示已经获取到锁)时,才能轮到自己来争夺锁
            if (p == head && tryAcquire(arg)) {
                // tryAcquire() 返回 true 说明获取锁成功
                // 将 nod e节点设置为 head,此外 setHead() 是不需要 CAS 的,因为不会有竞争
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获取失败后,查看是否需要被挂起,如果需要挂起,检查是否有中断信息
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 复习一下,SIGNAL 说明该节点的后续被挂起了,当释放锁或取消时,需要唤醒后继节点
    // 如果前驱节点已经是 SIGNAL 状态了 说明当前线程可以安心被挂起了,等待前驱来唤醒自己
    if (ws == Node.SIGNAL)
        return true;
    // ws > 0 说明前驱节点被取消了(CANCELLED == 1),需要跳过被取消的节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点通过CAS改为 SIGNAL 状态,但最后还是会返回 false 
        // 如果在下一次循环中如果还是没拿到锁,则会进入该方法第一个判断,返回true,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    // 挂起线程
    LockSupport.park(this);
    return Thread.interrupted();
}

响应中断的独占锁获取

/**
* 方法入口
*/
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

/**
* 和不响应中断的获取方法唯一不同的是,在检测到中断后是抛出中断异常而不是返回true,其他没有区别
*/
private void doAcquireInterruptibly(int arg)
    //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
    // ...
}

带超时的响应中断的独占锁获取

/**
* 方法入口
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

/**
 * 基本上和之前的差不多,如果超时了就直接返回 false,挂起线程时也使用了带计时的 parkNanos
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果超时了 返回false
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 注意这里 nanosTimeout > spinForTimeoutThreshold(默认1000纳秒)时才挂起,小于这个阈值时直接自旋,不再挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

独占锁释放

/**
 * 和加锁一样,这里的 tryRelease() 也是抽象方法,需要子类自己实现
 * 实际工作就是唤醒后继节点而已,出队的操作也是在获取锁的时候由后继结点完成的
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果 h.waitStatus == 0 ,说明不是 SIGNAL 状态,没有需要唤醒的节点,直接返回
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
        
    // 如果后继节点已经取消了,那么重新调整后继直到没有取消的为止
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果有未取消的后继,唤醒他
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享锁

不响应中断的共享锁获取

在实现上,共享锁和独占锁在实现上的核心区别在于:

队列中的线程节点尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点。

/**
* 方法入口,tryAcquireShared为抽象方法:
* 返回小于0表示获取失败
* 等于0表示当前线程获取到锁,但后续线程获取不到,即不需要传播后续节点
* 大于0表示后续线程也能获取到,需要传播后续节点
*/
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // >=0表示获取锁成功
                if (r >= 0) {
                    // 这里和独占模型不同,除了设置头结点后还需要向后传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    
    // 如果propagate > 0 或者 h.waitStatus < 0(PROPAGATE) 需要唤醒后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果后继结点是独占结点,就不唤醒了
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 队列里至少有2个节点,否则没有传播必要
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 和共享锁不同的是,这个方法可以在setHeadAndPropagate和releaseShared两个方法中被调用
                // 存在一个线程正获取完锁向后传播,另一个线程释放锁的情况,所以需要CAS控制
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // ws == 0 表明是队列的最后一个节点,那么CAS为PROPAGATE,表明下一次tryShared时,需要传播
            // 如果失败说明有新后继节点将其改为了SIGNAL后挂起了,那么继续循环传播
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果head改变了,说明有新的排队的线程获取到了锁,再次检查
        if (h == head)                   // loop if head changed
            break;
    }
}

共享锁释放

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放成功后,往后传播
        doReleaseShared();
        return true;
    }
    return false;
}

Condition

Condition 提供了线程之间的通信机制,和 synchronize 中的 wait() 和 notify() 的作用是一样的,并且同一个锁可以有多个 condition。

Condition是一个接口,实际上 lock.newCondition() 返回的是 AQS 的内部类 ConditionObject。其核心的两个方法就是 await() 和 signal()。

当调用await()时,线程加入到等待队列中等待,和同步队列相似,也是一个FIFO的队列,但虽然用的数据结构相同,等待队列只用了单向的功能。其维护的数据结构图如下所示:

condition.png
/**
* 将当前线程信息包装加入等待队列中并挂起线程等待唤醒
* 由于能调用 await() 的线程一定是获取到锁的,所以下面的操作都不需要额外的CAS操作来处理线程竞争
*/
public final void await() throws InterruptedException {
     if (Thread.interrupted())
        throw new InterruptedException();
     // 加入到等待队列中
     Node node = addConditionWaiter();
    // 释放锁,fullyRelease() 调用的是独占锁的释放方法realse(state),即一次释放所有的重入锁, state记录了重入的次数
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 只要还没有被 signal() 给加入到同步队列,就挂起,除非被中断
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 如果被中断了 跳出循环,返回 0 或 THROW_IE 或 REINTERRUPT
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 到这里为止,不管怎么出的循环,都已经被加入同步队列了(要么被 signal() 加入,要么在中断检测方法中加入)
    // ----------------------------------------------------------
    // 别忘了 acquireQueued() 返回的获取锁的过程中是否被中断了
    // 如果在获取锁的过程中被中断了,并且之前的 interruptMode != THROW_IE,那么也视为在 signal() 之后被中断,设为REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 只有在 signal() 前中断的线程还会在等待队列中留有节点,才会满足这个条件
    if (node.nextWaiter != null)
        // 将状态不是CONDITION的节点从队列中删除
        unlinkCancelledWaiters(); 
    if (interruptMode != 0)
        // 抛出异常 或 重置中断标识位
        reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果最后一个等待队列被取消了,清除出去
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 这个方法就是从头到尾遍历一遍链表将状态不为 CONDITION 的节点删除等待队列,代码略
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/**
* 如果没有被中断,返回 0
* 如果在被signal之前中断了,返回 THROW_IE,表示需要抛出异常
* 如果在signal之后中断了,返回 REINTERRUPT,表示不抛出,只恢复中断位
*/
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 如果CAS成功了,说明还没有被signal加入同步队列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 由于没有signal,这里需要加入同步队列,才能之后争夺锁
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
     // 说明已经被signal了,防止还没被加入到同步队列的情况
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

/**
* 从等待队列中找到第一个线程唤醒
*/
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 出队
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
* 清空等待队列, 将等待的节点按顺序加入到同步队列中
*/
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}


final boolean transferForSignal(Node node) {
    // 如果CAS失败,说明被cancell了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 将节点加入到同步队列中,注意enq()会返回node的前驱节点p
    Node p = enq(node);
    int ws = p.waitStatus;
    // 因为此时节点还是挂起的,按照同步队列的结构,需要将前驱结点的状态改为SIGNAL
    // 如果前驱被取消了,或者CAS前驱状态为SIGNAL失败了,那么就唤醒线程,让其自己走去获取锁的步骤,虽然线程可能会被再次挂起,但这是无害的操作
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

文章中的图片来源于《Java并发编程的艺术》

上一篇 下一篇

猜你喜欢

热点阅读