Java并发编程(七):AQS框架
1 概述
AQS即AbstractQueuedSynchronizer(抽象队列同步器),是一个用于构建锁和同步器的框架,在本系列上一篇文章中提到过的Semaphore,CountDownLacth甚至包括显式锁ReentrantLock的底层都是基于AQS框架构建的。AQS解决了很多同步相关的细节问题,例如阻塞线程和唤醒线程的操作、等待线程采用FIFO队列操作顺序等都由AQS内部完成,依赖该框架的类可以非常简单、灵活的构造出线程安全的锁或者同步工具类。
大多数程序员应该都不会直接使用AQS框架,毕竟JDK里的几个同步工具类就已经能满足绝大多数需求了,确实没有必要再“重复造轮子”。但理解AQS有助于理解这些内置的同步工具类,能准确的判断出各种同步工具类的适用场景以及优缺点,从而做出合理的决策。
下面我将尽量详细的介绍AQS的几个重要部分,包括但不限于:
- AQS的源码分析
- 模板方法模式在AQS框架的应用
- 利用AQS自定义同步工具类,即“重复造轮子”
需要说明一点,软件开发并不推荐“重复造轮子”,我这里提到的要“重复造轮子”并没有要违背这一原则的意思。在实际开发产品的过程中,“重复造轮子”肯定是不值得提倡的,不仅要耗费大量的时间、精力,结果还有极大的可能会不尽人意。但在学习的过程中,“重复造轮子”有益于理解一个个抽象、复杂的知识。
2 AQS源码分析
和之前一样,不要一股脑的上来就从头开始阅读源码,应该要找到一个入口点,然后沿着这个入口慢慢深入。AQS的基本操作就是获取以及释放操作。
2.1 获取操作
获取操作有如下几个方法:
- acquire(int),以独占模式获取资源。
- acquireInterruptibly(),也是独占模式,该操作可以响应中断。
- acquireShared(),以共享模式获取资源。
- acquireSharedInterruptibly(),也是共享模式,该操作可以响应中断。
2.1.1 acquire
下面来分别看看其源码:
public final void acquire(int arg) {
//如果成功获取资源,就直接返回。否则就进入等待队列,知道获取成功为止。
//同时不响应中断
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//将线程加入到等待队列的末尾
private Node addWaiter(Node mode) {
//构造一个Node节点,该节点与线程实例绑定
//mode即模式,有Node类中有两种模式,分别是EXCLUSIVE和SHARED,SHARED是一个Node实例对象,EXCLUSIVE的值是null
Node node = new Node(Thread.currentThread(), mode);
/**下面就是常规的将节点加入到队列的操作了**/
//tail是尾节点,这里先保存尾节点
Node pred = tail;
if (pred != null) {
//把node的前驱节点设置成当前的尾节点
node.prev = pred;
//将node节点设置成尾节点,然后返回该node节点。这里采用CAS操作来完成。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果pred为null,说明当前队列中没有节点,那么就应该初始化队列并直接将node节点插入到队列中
//该方法并不仅仅是初始化,内部逻辑还包含了普通的插入逻辑。
//作者也许是为了能复用方法吧。
enq(node);
return node;
}
//处于队列中的线程尝试去获取资源,直到有线程获取成功为止,期间如果线程被中断过,就会返回true,否则返回false
final boolean acquireQueued(final Node node, int arg) {
//失败标志
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
//无限循环,即自旋,直到有线程能成功获取资源为止
for (;;) {
//拿到该节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点就是头结点并且tryAcquire()返回true(即尝试获取资源成功)
//那么就将头节点设置为当前节点,并把之前的节点删除
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//能走到这,说明上面没有获取成功,或者没有资格获取资源。
//那么就判断是否能进入阻塞等待状态,如果能就进入阻塞等待状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果过程中有被中断过,那么就设置该标志为true
interrupted = true;
}
} finally {
//如果请求失败,那么就取消本次获取操作
if (failed)
cancelAcquire(node);
}
}
在看shouldParkAfterFailedAcquire和parkAndCheckInterrupt之前,写来简单介绍一些Node类,该类表示队列的节点,包含了前驱节点以及后继节点的信息,而且和线程实例绑定。所以也可以简单的把Node类当做线程来看待。除此之外,Node还包含了几个状态,分别是:
- CANCELLED。即取消状态,值为1,在同步队列中等待的节点超时或者被中断,此时就需要从队列中取消该节点,方法就是将节点状态设置成CANCELLED。
- SIGNAL。其实就是出于唤醒的状态,值为-1,当前驱节点释放资源的时候,如果当前节点处于该状态,就会得到执行。
- CONDITION。值为-2,和Condition有关与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGAT。值为-3,和共享模式有关,在共享模式下,该状态表示可运行。
- 0。即初始状态。
现在来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//先获取前驱节点的状态
int ws = pred.waitStatus;
//如果前驱节点处于SIGNAL状态,那么就说明至少前驱节点没有被取消
//那当前节点就可以放心等待了。
//主要是为了避免线程在一个已经处于取消状态的线程傻傻的等着。(甚至后续的节点都傻傻的等着)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//ws大于0只有一个状态,即CANCELLED,说明当前节点的前驱节点是一个“死节点”,
//那么就应该跳过该前驱节点,然后一直往前检查,直到不是CANCELLED状态为止。找到之后,将节点插入到新找到的节点后面,作为其新的后继节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前驱节点的状态既不是CANCELLED也不是SIGNAL,那么就设置成SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//调用 LockSupport.park(this)进入阻塞状态
LockSupport.park(this);
//被唤醒之后,判断是否被中断过(即检查中断标志位)
return Thread.interrupted();
}
看了那么多源码,可能还是会有点云里雾里的,下面简单总结一下整个流程:
- 线程先调用tryAcquire()来尝试获取资源,如果获取成功,直接返回。
- 如果获取失败,那么就调用addWaiter()把线程加入到等待队列中去。
- 然后调用acquireQueued()尝试从队列中拿出线程来获取资源,如果成功获取,那么就重置队列头节点,然后返回即可(如果期间遇到过中断,那么就设置一下中断标志位)。
- 如果3中获取失败,那么就判断当前节点是否能进入等待状态,如果能,那么就直接进入等待状态,否则就继续往前检查节点,找到之后将当前节点插入到新找到的节点后面作为其后继节点,然后再次回到第3步,再次判断是否能进入等待状态。
- 当节点会唤醒时,会再次尝试获取资源,如果还是获取失败,那么就重复第4个过程。如果获取成功,就重置头结点,然后返回即可。
上述节点获取失败的情况不一定就是资源被其他节点占着,也有可能是当前节点没有资格获取资源,只有队列的头结点的后继几点(即老二)才有资格获取资源。节点是否能进入等待状态,也需要判断,因为有可能当前节点的前驱节点处于被取消的状态,这就就永远无法轮到当前节点去获取资源了(当然后面的节点也没有机会),所以需要有一定的“插队”,即直接插入到前面若干个处于取消状态的的节点前面。有点类似下图这种情况:
![](https://img.haomeiwen.com/i4880496/b17736cbadb92f27.png)
在节点完成了这一些列操作之后,才能进入等待状态(安心休息,等待唤醒)。
2.1.2 acquireInterruptibly
源码如下所示:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
如果请求获取资源的线程检查到被中断,那么直接抛出InterruptedException异常。doAcquireInterruptibly()的逻辑和acquireQueued的逻辑几乎一样,再次不再赘述,各位可以自行查看。
2.1.3 acquireShared
源码如下所示:
public final void acquireShared(int arg) {
//如果tryAcquireShared返回值大于等于0,进表示获取成功,直接返回即可
//否则进入doAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//调用addWaiter(),将节点加入到队列中,注意此时的Node模式是SHARED(共享)
final Node node = addWaiter(Node.SHARED);
//是否失败标志
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
//自旋
for (;;) {
//拿到当前节点的前驱节点
final Node p = node.predecessor();
//判断前驱节点是否是头节点
if (p == head) {
//再次调用tryAcquireShared()请求资源
int r = tryAcquireShared(arg);
//如果返回值大于等于0,说明获取资源成功
if (r >= 0) {
//设置node节点为头节点,并且根据资源剩余数来判断是否需要唤醒其他线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//和之前一样
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//设置头结点
setHead(node);
//如果资源还有剩余,就唤醒其他线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//doReleaseShared()里有唤醒线程的逻辑
doReleaseShared();
}
}
acquireShared是以共享的模式获取资源,也比较有用。例如CountDownLatch,它底层就是采用的共享模式,线程获取的资源个数受到内部的count值影响,AQS底层会根据tryAcquireShared()的返回值来判断是否还允许线程获取资源。(后面我会介绍到CountDownLatch的具体逻辑,这里先不详细说)
2.1.4 acquireSharedInterruptibly
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
和acquireInterruptibly()差不多,doAcquireInterruptibly()的逻辑也和doAcquireShared()非常相似,就不再赘述了。
2.2 释放操作
主要有两个API:
- release()
- releaseShared()
2.2.1 release
其源码如下所示:
public final boolean release(int arg) {
//尝试释放资源,成功就继续,失败直接返回false
if (tryRelease(arg)) {
//保存头节点
Node h = head;
//如果头结点不为null且头节点的状态不是初始状态
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//拿到队列头结点的节点状态
int ws = node.waitStatus;
//如果状态值小于0,即不处于取消状态
if (ws < 0)
//那么就用CAS操作将其设置为初始状态
compareAndSetWaitStatus(node, ws, 0);
//拿到其后继节点
Node s = node.next;
//如果后继节点为null或者后继节点的状态为取消状态
if (s == null || s.waitStatus > 0) {
//值s为空
s = null;
//从队列尾部开始,一直遍历到当前队列的第一个不处于取消状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果能在队列中找到非取消状态的后续节点,那么就唤醒该节点所绑定的线程
if (s != null)
LockSupport.unpark(s.thread);
}
释放操作会获取操作简单多了,tryRelease()是非常重要的,觉得了是否要唤醒等待中的线程。unparkSuccessor的逻辑看似复杂,其实非常简单,就是如果直接后继节点存在且处于非取消状态,那么就直接唤醒该节点绑定的线程,否则就找到从头节点出发,位于最前面的非取消状态节点,找到之后唤醒该节点绑定的线程(虽然是从后往前找,仔细看看逻辑会发现,即使找到了也不会立即停止,所以结果就是找到队列最前的符合条件的节点)。
2.2.2 releaseShared
public final boolean releaseShared(int arg) {
//尝试释放资源
if (tryReleaseShared(arg)) {
//执行共享模式的释放操作
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
//如果头结点不为null且h节点不是唯一的节点(h != tail)
if (h != null && h != tail) {
//获取头结点的状态
int ws = h.waitStatus;
//如果状态是SIGNAL,说明可以唤醒后继节点了
if (ws == Node.SIGNAL) {
//这里的CAS是为了保证头结点在这段时间内没有被其他线程修改
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//检查成功后,调用unparkSuccessor()唤醒后继节点即可
unparkSuccessor(h);
}
//如果状态等于0(初始状态),并且CAS操作失败,就继续循环
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头结点发生改变,即已经唤醒了后继节点,并将后继节点设置为头结点了
//就直接跳出循环
if (h == head) // loop if head changed
break;
}
}
如果理解了之前的源码分析,理解这应该就没什么难度了,解释已经在注释中写出,不再赘述。
依照惯例,说一下release的大致流程:
- 先调用tryRelease(),如果返回值是true,即释放资源成功,那么就执行后续的唤醒队列中线程的操作,否则返回false,表示本次释放失败。
- 唤醒线程的时候,会从头节点开始,寻找处于非取消状态的后继节点,然后将头节点设置才该后继节点,并唤醒和该节点绑定的线程。唤醒的顺序遵循FIFO规则。
- 对于共享模式的唤醒,还涉及到头结点的状态问题。这点在源码中已经有注释解释了。
- 最后成功唤醒节点,节点会从之前阻塞的地方继续执行。
3 模板方法模式在AQS的应用
在上面的源码分析中,我们总是看到tryRelease()和tryAcquire(),但一直没有分析其源码。这其实是有原因的,下面先来看看他俩在AQS里的实现:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
首先,他们是由protected修饰的,即子类能访问该方法,外部不能访问该方法,然后他们的实现都非常简单粗暴,直接抛出UnsupportedOperationException异常,根据经验(如果有的话)大概可以知道,这玩意儿应该就是专门让子类实现的,父类不提供默认的实现。
再来看看acquire()方法的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这一个if很长,按照调用顺序可以拆解成三步:
- 调用tryAcquire()方法。
- 调用addWaiter()方法。
- 调用acquireQueued()方法。
在acquire()方法中,这三步的顺序是固定的,换句话说:父类AQS帮所有的子类已经规划好了基本路线,子类只需要填充每个步骤的具体实现即可,在acquire()里,子类只需要也应该仅仅填充tryAcquire()即可。(其他的方法最好不要覆盖)这不就是我们在设计模式中学习到的“模板方法模式”吗?下面来看一下Semaphore的实现验证我们的猜想:
//Sync继承了AbstractQueuedSynchronizer
//发现还是抽象类,不用着急,等会会看到具体的实现类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
//setState()是AQS抽象类中的方法,用于设置状态,状态值的int类型的值
setState(permits);
}
final int getPermits() {
return getState();
}
//以非公平、共享模式获取资源
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取状态值(在这里就是许可证数)
int available = getState();
//计算剩余的许可证数
int remaining = available - acquires;
//如果剩余的许可证已经小于0或者CAS将状态设置成功,都会返回剩余数量
//如果CAS操作失败,就会再次循环尝试(自旋)
//如果返回值小于0,acquire()就会失败。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//重写了父类的tryReleaseShared(),该方法会根据具体的实现特点来决定返回值
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//先获取当前的许可证数
int current = getState();
//计算新的许可证数
int next = current + releases;
//如果增加之后的许可证数比原先的还少,那么就说明发生整形溢出了。
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS操作设置新的状态(许可证数)
if (compareAndSetState(current, next))
return true;
}
}
//减少许可证数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//清理所有的许可证数
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
Sync仍然是抽象类,之所以要这么设计,应该是因为左下想把非公平和公平模式的实现分离吧。下面来看看两个具体的实现类:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//非公平模式下的tryAcquireShared()
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//公平模式下的tryAcquireShared()实现
protected int tryAcquireShared(int acquires) {
for (;;) {
//先判断是否有前驱节点,如果存在的话,那就说明还没轮到当前线程去获取资源
if (hasQueuedPredecessors())
return -1;
//和之前一样,不说了
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
下面来总结一下,Semaphore调用acquire()的流程:
- 调用acquire()会直接调用sync.acquireSharedInterruptibly(1); 其中sync是FairSync或者NonfairSync类的实例,这取决于Semaphore构造时候的参数选择。
- 在acquireSharedInterruptibly()方法里会调用tryAcquireShared()方法,因为子类实现了该方法,所以不会抛出异常,acquireSharedInterruptibly()的内部逻辑会根据tryAcquireShared()的返回值决定是否去获取资源。
- 之后的逻辑就和之前分析AQS源码时说的一样了,不再赘述。
经过这一番分析,验证了本节开始的猜想,AQS框架确实采用了模板方法模式来构建框架的基本骨架,并且工作的很好,代码结果也非常清晰,前提是对Java的继承体系有足够的了解,否则总会在某个地方感觉到迷惑。(基础非常重要!!!)
4 使用AQS框架自定义同步工具类
这应该是最令人激动的部分了,之前的源码分析什么的确实是非常枯燥。下面我将模仿CountDownLacth写一个BinanryLatch类,和CountDownLacth的不同仅仅在于CountDownLacth的状态可以有多个,而该类只有两个状态,即0和1,话不多说,直接上代码:
public class BinaryLatch {
//Sync实例
private final Sync sync;
//因为是二元的,不需要在内部保存计数器值
public BinaryLatch() {
sync = new Sync();
}
public void signal() {
//即释放资源,参数是1
sync.releaseShared(1);
}
public void await() throws InterruptedException {
//尝试获取资源,参数是0
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedLongSynchronizer {
@Override
protected boolean tryReleaseShared(long arg) {
//被调用的时候,arg的值会是1,然后将状态设置为1
setState(arg);
return true;
}
@Override
protected long tryAcquireShared(long arg) {
//获取状态,如果状态等于1,说明已经是否资源了,返回一个大于0的数即可,否则返回一个小于等于0的值
return getState() == 1 ? 1 : -1;
}
}
}
该类在调用await()的时候会把状态设置成0,在调用signal()将状态设置成1之前,tryAcquireShared()的返回值一直就是-1(小于0),所以所有调用该await()方法的线程都会被阻塞,在调用signal()之后,会将状态设置成1,然后唤醒所有之前被阻塞的线程,这些线程再次调用tryAcquireShared()方法(回想一下之前的源码分析),这次调用会返回1(大于0),所以能成功获取资源,最后,线程获取到资源之后继续执行其逻辑。
下面是一个使用BinaryLatch的示例:
public class Main {
public static void main(String[] args) throws InterruptedException {
BinaryLatch latch = new BinaryLatch();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread() + "\t到达!");
latch.await();
System.out.println(Thread.currentThread() + "\t出发!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(new Random().nextInt(100));
}
Thread.sleep(100);
latch.signal();
}
}
输出类似如下所示:
Thread[Thread-0,5,main] 到达!
Thread[Thread-1,5,main] 到达!
Thread[Thread-2,5,main] 到达!
Thread[Thread-3,5,main] 到达!
Thread[Thread-4,5,main] 到达!
Thread[Thread-5,5,main] 到达!
Thread[Thread-6,5,main] 到达!
Thread[Thread-7,5,main] 到达!
Thread[Thread-8,5,main] 到达!
Thread[Thread-9,5,main] 到达!
Thread[Thread-0,5,main] 出发!
Thread[Thread-4,5,main] 出发!
Thread[Thread-6,5,main] 出发!
Thread[Thread-3,5,main] 出发!
Thread[Thread-9,5,main] 出发!
Thread[Thread-1,5,main] 出发!
Thread[Thread-2,5,main] 出发!
Thread[Thread-8,5,main] 出发!
Thread[Thread-7,5,main] 出发!
Thread[Thread-5,5,main] 出发!
发现,确实达到了我们预想的效果,在主线程调用signal()之前,所有线程会被阻塞,在调用signal()之后,所有线程被唤醒,然后成功获取资源,继续执行逻辑。
5 小结
本文比较详细的分析了AQS框架的源码,还简单了说了一下“模板方法模式”在AQS框架里的应用,希望能加深大家对设计模式的理解,最后仅仅使用了不到50行代码(还算上了空行和注释)就实现了一个能用的同步工具了BinaryLatch,这一切都归功于AQS强大的能力。最后,希望大家能自己多阅读几遍AQS的源码(我也是阅读了很多遍,才能勉强理解),这对源码阅读能力有很大的裨益。
6 参考资料
《Java并发编程实战》