Java 并发编程

Java并发编程(七):AQS框架

2018-12-09  本文已影响0人  yeonon

1 概述

AQS即AbstractQueuedSynchronizer(抽象队列同步器),是一个用于构建锁和同步器的框架,在本系列上一篇文章中提到过的Semaphore,CountDownLacth甚至包括显式锁ReentrantLock的底层都是基于AQS框架构建的。AQS解决了很多同步相关的细节问题,例如阻塞线程和唤醒线程的操作、等待线程采用FIFO队列操作顺序等都由AQS内部完成,依赖该框架的类可以非常简单、灵活的构造出线程安全的锁或者同步工具类。

大多数程序员应该都不会直接使用AQS框架,毕竟JDK里的几个同步工具类就已经能满足绝大多数需求了,确实没有必要再“重复造轮子”。但理解AQS有助于理解这些内置的同步工具类,能准确的判断出各种同步工具类的适用场景以及优缺点,从而做出合理的决策。

下面我将尽量详细的介绍AQS的几个重要部分,包括但不限于:

需要说明一点,软件开发并不推荐“重复造轮子”,我这里提到的要“重复造轮子”并没有要违背这一原则的意思。在实际开发产品的过程中,“重复造轮子”肯定是不值得提倡的,不仅要耗费大量的时间、精力,结果还有极大的可能会不尽人意。但在学习的过程中,“重复造轮子”有益于理解一个个抽象、复杂的知识。

2 AQS源码分析

和之前一样,不要一股脑的上来就从头开始阅读源码,应该要找到一个入口点,然后沿着这个入口慢慢深入。AQS的基本操作就是获取以及释放操作。

2.1 获取操作

获取操作有如下几个方法:

2.1.1 acquire

下面来分别看看其源码:

public final void acquire(int arg) {
    //如果成功获取资源,就直接返回。否则就进入等待队列,知道获取成功为止。
    //同时不响应中断
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//将线程加入到等待队列的末尾
private Node addWaiter(Node mode) {
    //构造一个Node节点,该节点与线程实例绑定
    //mode即模式,有Node类中有两种模式,分别是EXCLUSIVE和SHARED,SHARED是一个Node实例对象,EXCLUSIVE的值是null
    Node node = new Node(Thread.currentThread(), mode);
    
    /**下面就是常规的将节点加入到队列的操作了**/
    
    //tail是尾节点,这里先保存尾节点
    Node pred = tail;
    if (pred != null) {
        //把node的前驱节点设置成当前的尾节点
        node.prev = pred;
        //将node节点设置成尾节点,然后返回该node节点。这里采用CAS操作来完成。
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果pred为null,说明当前队列中没有节点,那么就应该初始化队列并直接将node节点插入到队列中
    //该方法并不仅仅是初始化,内部逻辑还包含了普通的插入逻辑。
    //作者也许是为了能复用方法吧。
    enq(node);
    return node;
}

//处于队列中的线程尝试去获取资源,直到有线程获取成功为止,期间如果线程被中断过,就会返回true,否则返回false
final boolean acquireQueued(final Node node, int arg) {
    //失败标志
    boolean failed = true;
    try {
        //中断标志
        boolean interrupted = false;
        //无限循环,即自旋,直到有线程能成功获取资源为止
        for (;;) {
            //拿到该节点的前驱节点
            final Node p = node.predecessor();
            //如果前驱节点就是头结点并且tryAcquire()返回true(即尝试获取资源成功)
            //那么就将头节点设置为当前节点,并把之前的节点删除
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //能走到这,说明上面没有获取成功,或者没有资格获取资源。
            //那么就判断是否能进入阻塞等待状态,如果能就进入阻塞等待状态
            
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //如果过程中有被中断过,那么就设置该标志为true
                interrupted = true;
        }
    } finally {
        //如果请求失败,那么就取消本次获取操作
        if (failed)
            cancelAcquire(node);
    }
}

在看shouldParkAfterFailedAcquire和parkAndCheckInterrupt之前,写来简单介绍一些Node类,该类表示队列的节点,包含了前驱节点以及后继节点的信息,而且和线程实例绑定。所以也可以简单的把Node类当做线程来看待。除此之外,Node还包含了几个状态,分别是:

现在来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //先获取前驱节点的状态
    int ws = pred.waitStatus;
    //如果前驱节点处于SIGNAL状态,那么就说明至少前驱节点没有被取消
    //那当前节点就可以放心等待了。
    //主要是为了避免线程在一个已经处于取消状态的线程傻傻的等着。(甚至后续的节点都傻傻的等着)
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    //ws大于0只有一个状态,即CANCELLED,说明当前节点的前驱节点是一个“死节点”,
    //那么就应该跳过该前驱节点,然后一直往前检查,直到不是CANCELLED状态为止。找到之后,将节点插入到新找到的节点后面,作为其新的后继节点
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //如果前驱节点的状态既不是CANCELLED也不是SIGNAL,那么就设置成SIGNAL。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    //调用 LockSupport.park(this)进入阻塞状态
    LockSupport.park(this);
    //被唤醒之后,判断是否被中断过(即检查中断标志位)
    return Thread.interrupted();
}

看了那么多源码,可能还是会有点云里雾里的,下面简单总结一下整个流程:

  1. 线程先调用tryAcquire()来尝试获取资源,如果获取成功,直接返回。
  2. 如果获取失败,那么就调用addWaiter()把线程加入到等待队列中去。
  3. 然后调用acquireQueued()尝试从队列中拿出线程来获取资源,如果成功获取,那么就重置队列头节点,然后返回即可(如果期间遇到过中断,那么就设置一下中断标志位)。
  4. 如果3中获取失败,那么就判断当前节点是否能进入等待状态,如果能,那么就直接进入等待状态,否则就继续往前检查节点,找到之后将当前节点插入到新找到的节点后面作为其后继节点,然后再次回到第3步,再次判断是否能进入等待状态。
  5. 当节点会唤醒时,会再次尝试获取资源,如果还是获取失败,那么就重复第4个过程。如果获取成功,就重置头结点,然后返回即可。

上述节点获取失败的情况不一定就是资源被其他节点占着,也有可能是当前节点没有资格获取资源,只有队列的头结点的后继几点(即老二)才有资格获取资源。节点是否能进入等待状态,也需要判断,因为有可能当前节点的前驱节点处于被取消的状态,这就就永远无法轮到当前节点去获取资源了(当然后面的节点也没有机会),所以需要有一定的“插队”,即直接插入到前面若干个处于取消状态的的节点前面。有点类似下图这种情况:

ibFT4e.png

在节点完成了这一些列操作之后,才能进入等待状态(安心休息,等待唤醒)。

2.1.2 acquireInterruptibly

源码如下所示:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

如果请求获取资源的线程检查到被中断,那么直接抛出InterruptedException异常。doAcquireInterruptibly()的逻辑和acquireQueued的逻辑几乎一样,再次不再赘述,各位可以自行查看。

2.1.3 acquireShared

源码如下所示:

public final void acquireShared(int arg) {
    //如果tryAcquireShared返回值大于等于0,进表示获取成功,直接返回即可
    //否则进入doAcquireShared方法
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    //调用addWaiter(),将节点加入到队列中,注意此时的Node模式是SHARED(共享)
    final Node node = addWaiter(Node.SHARED);
    //是否失败标志
    boolean failed = true;
    try {
        //中断标志
        boolean interrupted = false;
        //自旋
        for (;;) {
            //拿到当前节点的前驱节点
            final Node p = node.predecessor();
            //判断前驱节点是否是头节点
            if (p == head) {
                //再次调用tryAcquireShared()请求资源
                int r = tryAcquireShared(arg);
                //如果返回值大于等于0,说明获取资源成功
                if (r >= 0) {
                    //设置node节点为头节点,并且根据资源剩余数来判断是否需要唤醒其他线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //和之前一样
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    //设置头结点
    setHead(node);
    //如果资源还有剩余,就唤醒其他线程
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            //doReleaseShared()里有唤醒线程的逻辑
            doReleaseShared();
    }
}

acquireShared是以共享的模式获取资源,也比较有用。例如CountDownLatch,它底层就是采用的共享模式,线程获取的资源个数受到内部的count值影响,AQS底层会根据tryAcquireShared()的返回值来判断是否还允许线程获取资源。(后面我会介绍到CountDownLatch的具体逻辑,这里先不详细说)

2.1.4 acquireSharedInterruptibly

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

和acquireInterruptibly()差不多,doAcquireInterruptibly()的逻辑也和doAcquireShared()非常相似,就不再赘述了。

2.2 释放操作

主要有两个API:

2.2.1 release

其源码如下所示:

public final boolean release(int arg) {
    //尝试释放资源,成功就继续,失败直接返回false
    if (tryRelease(arg)) {
        //保存头节点
        Node h = head;
        //如果头结点不为null且头节点的状态不是初始状态
        if (h != null && h.waitStatus != 0)
            //唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}   

private void unparkSuccessor(Node node) {
    //拿到队列头结点的节点状态
    int ws = node.waitStatus;
    //如果状态值小于0,即不处于取消状态
    if (ws < 0)
        //那么就用CAS操作将其设置为初始状态
        compareAndSetWaitStatus(node, ws, 0);
    //拿到其后继节点
    Node s = node.next;
    //如果后继节点为null或者后继节点的状态为取消状态
    if (s == null || s.waitStatus > 0) {
        //值s为空
        s = null;
        //从队列尾部开始,一直遍历到当前队列的第一个不处于取消状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //如果能在队列中找到非取消状态的后续节点,那么就唤醒该节点所绑定的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放操作会获取操作简单多了,tryRelease()是非常重要的,觉得了是否要唤醒等待中的线程。unparkSuccessor的逻辑看似复杂,其实非常简单,就是如果直接后继节点存在且处于非取消状态,那么就直接唤醒该节点绑定的线程,否则就找到从头节点出发,位于最前面的非取消状态节点,找到之后唤醒该节点绑定的线程(虽然是从后往前找,仔细看看逻辑会发现,即使找到了也不会立即停止,所以结果就是找到队列最前的符合条件的节点)。

2.2.2 releaseShared

public final boolean releaseShared(int arg) {
    //尝试释放资源
    if (tryReleaseShared(arg)) {
        //执行共享模式的释放操作
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        //如果头结点不为null且h节点不是唯一的节点(h != tail)
        if (h != null && h != tail) {
            //获取头结点的状态
            int ws = h.waitStatus;
            //如果状态是SIGNAL,说明可以唤醒后继节点了
            if (ws == Node.SIGNAL) {
                //这里的CAS是为了保证头结点在这段时间内没有被其他线程修改
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //检查成功后,调用unparkSuccessor()唤醒后继节点即可
                unparkSuccessor(h);
            }
            //如果状态等于0(初始状态),并且CAS操作失败,就继续循环
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //如果头结点发生改变,即已经唤醒了后继节点,并将后继节点设置为头结点了
        //就直接跳出循环
        if (h == head)                   // loop if head changed
            break;
    }
}

如果理解了之前的源码分析,理解这应该就没什么难度了,解释已经在注释中写出,不再赘述。

依照惯例,说一下release的大致流程:

  1. 先调用tryRelease(),如果返回值是true,即释放资源成功,那么就执行后续的唤醒队列中线程的操作,否则返回false,表示本次释放失败。
  2. 唤醒线程的时候,会从头节点开始,寻找处于非取消状态的后继节点,然后将头节点设置才该后继节点,并唤醒和该节点绑定的线程。唤醒的顺序遵循FIFO规则。
  3. 对于共享模式的唤醒,还涉及到头结点的状态问题。这点在源码中已经有注释解释了。
  4. 最后成功唤醒节点,节点会从之前阻塞的地方继续执行。

3 模板方法模式在AQS的应用

在上面的源码分析中,我们总是看到tryRelease()和tryAcquire(),但一直没有分析其源码。这其实是有原因的,下面先来看看他俩在AQS里的实现:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

首先,他们是由protected修饰的,即子类能访问该方法,外部不能访问该方法,然后他们的实现都非常简单粗暴,直接抛出UnsupportedOperationException异常,根据经验(如果有的话)大概可以知道,这玩意儿应该就是专门让子类实现的,父类不提供默认的实现。

再来看看acquire()方法的源码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这一个if很长,按照调用顺序可以拆解成三步:

  1. 调用tryAcquire()方法。
  2. 调用addWaiter()方法。
  3. 调用acquireQueued()方法。

在acquire()方法中,这三步的顺序是固定的,换句话说:父类AQS帮所有的子类已经规划好了基本路线,子类只需要填充每个步骤的具体实现即可,在acquire()里,子类只需要也应该仅仅填充tryAcquire()即可。(其他的方法最好不要覆盖)这不就是我们在设计模式中学习到的“模板方法模式”吗?下面来看一下Semaphore的实现验证我们的猜想:

//Sync继承了AbstractQueuedSynchronizer
//发现还是抽象类,不用着急,等会会看到具体的实现类
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    
    Sync(int permits) {
        //setState()是AQS抽象类中的方法,用于设置状态,状态值的int类型的值
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }

    //以非公平、共享模式获取资源
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            //获取状态值(在这里就是许可证数)
            int available = getState();
            //计算剩余的许可证数
            int remaining = available - acquires;
            //如果剩余的许可证已经小于0或者CAS将状态设置成功,都会返回剩余数量
            //如果CAS操作失败,就会再次循环尝试(自旋)
           //如果返回值小于0,acquire()就会失败。
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    //重写了父类的tryReleaseShared(),该方法会根据具体的实现特点来决定返回值
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            //先获取当前的许可证数
            int current = getState();
            //计算新的许可证数
            int next = current + releases;
            //如果增加之后的许可证数比原先的还少,那么就说明发生整形溢出了。
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //CAS操作设置新的状态(许可证数)
            if (compareAndSetState(current, next))
                return true;
        }
    }

    //减少许可证数
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    
    //清理所有的许可证数
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

Sync仍然是抽象类,之所以要这么设计,应该是因为左下想把非公平和公平模式的实现分离吧。下面来看看两个具体的实现类:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    //非公平模式下的tryAcquireShared()
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    //公平模式下的tryAcquireShared()实现
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            //先判断是否有前驱节点,如果存在的话,那就说明还没轮到当前线程去获取资源
            if (hasQueuedPredecessors())
                return -1;
            //和之前一样,不说了
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

下面来总结一下,Semaphore调用acquire()的流程:

  1. 调用acquire()会直接调用sync.acquireSharedInterruptibly(1); 其中sync是FairSync或者NonfairSync类的实例,这取决于Semaphore构造时候的参数选择。
  2. 在acquireSharedInterruptibly()方法里会调用tryAcquireShared()方法,因为子类实现了该方法,所以不会抛出异常,acquireSharedInterruptibly()的内部逻辑会根据tryAcquireShared()的返回值决定是否去获取资源。
  3. 之后的逻辑就和之前分析AQS源码时说的一样了,不再赘述。

经过这一番分析,验证了本节开始的猜想,AQS框架确实采用了模板方法模式来构建框架的基本骨架,并且工作的很好,代码结果也非常清晰,前提是对Java的继承体系有足够的了解,否则总会在某个地方感觉到迷惑。(基础非常重要!!!)

4 使用AQS框架自定义同步工具类

这应该是最令人激动的部分了,之前的源码分析什么的确实是非常枯燥。下面我将模仿CountDownLacth写一个BinanryLatch类,和CountDownLacth的不同仅仅在于CountDownLacth的状态可以有多个,而该类只有两个状态,即0和1,话不多说,直接上代码:

public class BinaryLatch {

    //Sync实例
    private final Sync sync;

    //因为是二元的,不需要在内部保存计数器值
    public BinaryLatch() {
        sync = new Sync();
    }

    public void signal() {
        //即释放资源,参数是1
        sync.releaseShared(1);
    }
    
    public void await() throws InterruptedException {
        //尝试获取资源,参数是0
        sync.acquireSharedInterruptibly(0);
    }


    private class Sync extends AbstractQueuedLongSynchronizer {

        @Override
        protected boolean tryReleaseShared(long arg) {
            //被调用的时候,arg的值会是1,然后将状态设置为1
            setState(arg);
            return true;
        }

        @Override
        protected long tryAcquireShared(long arg) {
            //获取状态,如果状态等于1,说明已经是否资源了,返回一个大于0的数即可,否则返回一个小于等于0的值
            return getState() == 1 ? 1 : -1;
        }
    }
}


该类在调用await()的时候会把状态设置成0,在调用signal()将状态设置成1之前,tryAcquireShared()的返回值一直就是-1(小于0),所以所有调用该await()方法的线程都会被阻塞,在调用signal()之后,会将状态设置成1,然后唤醒所有之前被阻塞的线程,这些线程再次调用tryAcquireShared()方法(回想一下之前的源码分析),这次调用会返回1(大于0),所以能成功获取资源,最后,线程获取到资源之后继续执行其逻辑。

下面是一个使用BinaryLatch的示例:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        BinaryLatch latch = new BinaryLatch();

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread() + "\t到达!");
                    latch.await();
                    System.out.println(Thread.currentThread() + "\t出发!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            Thread.sleep(new Random().nextInt(100));
        }

        Thread.sleep(100);
        latch.signal();
    }
}

输出类似如下所示:

Thread[Thread-0,5,main] 到达!
Thread[Thread-1,5,main] 到达!
Thread[Thread-2,5,main] 到达!
Thread[Thread-3,5,main] 到达!
Thread[Thread-4,5,main] 到达!
Thread[Thread-5,5,main] 到达!
Thread[Thread-6,5,main] 到达!
Thread[Thread-7,5,main] 到达!
Thread[Thread-8,5,main] 到达!
Thread[Thread-9,5,main] 到达!
Thread[Thread-0,5,main] 出发!
Thread[Thread-4,5,main] 出发!
Thread[Thread-6,5,main] 出发!
Thread[Thread-3,5,main] 出发!
Thread[Thread-9,5,main] 出发!
Thread[Thread-1,5,main] 出发!
Thread[Thread-2,5,main] 出发!
Thread[Thread-8,5,main] 出发!
Thread[Thread-7,5,main] 出发!
Thread[Thread-5,5,main] 出发!

发现,确实达到了我们预想的效果,在主线程调用signal()之前,所有线程会被阻塞,在调用signal()之后,所有线程被唤醒,然后成功获取资源,继续执行逻辑。

5 小结

本文比较详细的分析了AQS框架的源码,还简单了说了一下“模板方法模式”在AQS框架里的应用,希望能加深大家对设计模式的理解,最后仅仅使用了不到50行代码(还算上了空行和注释)就实现了一个能用的同步工具了BinaryLatch,这一切都归功于AQS强大的能力。最后,希望大家能自己多阅读几遍AQS的源码(我也是阅读了很多遍,才能勉强理解),这对源码阅读能力有很大的裨益。

6 参考资料

《Java并发编程实战》

Java并发之AQS详解

上一篇 下一篇

猜你喜欢

热点阅读