AbstractQueuedSynchronizer框架浅析
AbstractQueuedSynchronizer框架浅析
1.概述
AbstractQueuedSynchronizer(AQS)抽象类提供一个实现阻塞锁和依赖于FIFO等待队列的同步器的框架。
AQS被设计用来作为众多同步器的基类,例如ReentrantLock、Semaphore、CountDownLatch、FutureTask以及ReentrantReadWriteLock。AQS依赖于一个整数值,用来代表状态。AQS的子类可以通过覆写protect方法来改变状态,并且定义状态代表具体什么含义。不同的同步器对状态的含义解释不同:
- 在ReentrantLock中,AQS的同步状态用于保存锁获取操作的次数;
- 在FutureTask中,AQS同步状态被用来保存任务的状态;
- 在Semaphore和CountDownLatch中,AQS的同步状态用于保存当前可用许可的数量;
针对AQS中的状态,只可以通过AQS的getState()、setState()和compareAndSetState()方法来改变状态。
同步器类应该使用私有的AQS子类来实现功能,而不应该直接继承AQS类。AQS类没有实现任何同步接口,它只是定义了一些可以被具体同步器和锁调用的方法,例如acquireInterruptibly方法。AQS子类需要自己实现以下方法:
- tryAcquire()
- tryRelease()
- tryAcquireShared()
- tryReleaseShared()
- isHeldExclusively()
这些方法默认会抛出UnsupportedOperationException异常。在这些方法实现中,可以使用getState()、setState()和compareAndSetState()来获取或修改AQS的状态。这些方法的实现必须是线程安全的,并且应该实现简单且没有阻塞。只有这些方法可以被子类覆写,其他的AQS的公开方法都是final方法。
AQS同时支持独占操作模式(例如ReentrantLock)和非独占操作模式(例如Semaphore和CountDownLatch)。当以独占模式获取锁时,只有一个线程能访问成功,其他线程都访问失败;而以非独占模式获取锁时,多个线程可以同时访问成功。不同操作模式的线程都在同一个FIFO队列中等待。通常,AQS的子类只支持一种操作模式(独占或非独占),但也有同时支持两种操作模式的同步器,例如ReadWriteLock的子类,它的读取锁是非独占操作模式,而写入锁是独占操作模式。
由于在将线程放入FIFO等待队列之前,需要尝试一次acquire,因此有可能新的acquire线程可以获取成功,尽管等待队列中还有其他线程阻塞等待,这是一种非公平策略。然而,可以在tryAcquire()或者tryAcquireShared()方法中禁止线程抢占,具体是通过hasQueuedPredecessors()方法判断等待队列中是否有线程在阻塞等待,如果有线程阻塞等待,则让tryAcquire()或者tryAcquireShared()方法返回false,这样的话,acquire线程会被放入等待队列的尾部,然后唤醒阻塞等待的线程,这是一种公平的策略。
AQS类为同步器的状态、参数的获取和释放,以及内部FIFO等待队列,提供了一个高效的和可扩展的基础。当这些不能满足你的要求时,你可以自定义java.util.concurrent.atomic原子类,自定义java.util.Queue类,以及LockSupport类来提供阻塞支持。
AQS框架的一个类图如下所示:
AQS框架类图2.源码分析
2.1 AbstractQueuedSynchronizer类的继承关系
AQS类继承了AbstractOwnableSynchronizer类,实现了Serializable接口。AbstractOwnableSynchronizer类主要是用来保存同步器被哪个线程独占使用。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
......
}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
//The current owner of exclusive mode synchronization.
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
2.2 Node内部类
在AQS类中,定义了一个FIFO等待队列节点的内部类Node。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/*
* 非负值意味着该节点不需要signal。
* 该值默认初始化为0,表示普通的同步节点,主要是通过CAS方法来修改该变量值。
*/
volatile int waitStatus;
/*
* 指向前继节点,当前节点需要依赖于前继节点来检查waitStatus。
* 在入队列的时候赋值,在出队列的时候为null。
*/
volatile Node prev;
/*
* 指向后继节点,当前节点依赖后继节点来唤醒释放
* 在入队列的时候赋值,在出队列的时候为null。
*/
volatile Node next;
/*
* 将该节点入队列的线程,在构造节点的时候初始化,使用完之后变为null
*/
volatile Thread thread;
/*
* 指向下一个在条件等待,或者是特定的SHARED值的节点。
* 条件队列只有在获取独占锁时才可以被访问,我们需要一个单链表队列来保存节点,当他们在条件上等待时。
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
等待队列是CLH锁队列的变种,CLH锁通常被用来实现自旋锁。在节点中的waitStatus域保存了线程是否需要被阻塞的信息。当一个节点的前继节点被释放了,节点将会收到通知。在等待队列中的每一个节点,都充当着特定通知形式的监控器,并且持有一个等待线程。节点中的waitStatus域并不控制线程是否能获取到锁。如果一个线程是队列中的第一个线程,并不能保证它能竞争成功获取到锁,而只是给予了它参与竞争的权利。
为了将一个Node节点放入到LCH锁队列中,只需要将该Node节点拼接到队列尾部就行;如果为了出队列,则只需设置队列的头指针位置head。
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
插入节点到LCH队列中,只需一个在tail域上的原子操作。相似的,出队列仅仅需要更新head域,但是出队列还需要做一些额外的工作,来决定新的head节点的后继节点是哪个,其中需要考虑的因素有线程是否被取消了,操作是否超时了以及线程是否被中断了。
在Node节点中的prev域主要用来处理节点被取消的情况。如果一个Node节点被取消了,那边它的后继节点需要重新连接到一个未被取消的前继节点。
在Node节点中的next域主要被用来实现阻塞机制。在每个节点中都保存有线程ID,因此当需要唤醒下一个节点时,只需要通过遍历next域,找到后继节点,通过节点获取到线程ID,从而知道该唤醒哪个线程。
等待状态只能取以下几个值:
- SIGNAL:该节点的后继节点当前是阻塞的,因此当前节点在释放和取消之后,必须唤醒它的后继节点。为了避免竞争,acquire方法必须首先进入SIGNAL等着状态,然后再尝试原子获取,这个获取过程可能会失败或者阻塞。
- CANCELLED:该节点由于超时或者中断了被取消了。节点进入了CANCELLED状态之后,就不会再发生状态的变化了。特别地,处于CANCELLED状态节点的线程不会再被阻塞了。
- CONDITION:该节点当前处于一个条件队列中。经过转移之后,该节点将会被作为同步队列的节点使用,此时节点的状态会被设置为0。
- PROPAGATE:releaseShared方法应该传递给其他Node节点。在doReleaseShared方法中,确保传递会继续。
- 0:非以上任何状态;
等待状态值使用数值来简化使用,非负值意味着节点不需要被通知唤醒,因此大多数代码只需检查数值的正负就可以知道是否需要唤醒了。
2.3 AQS类的重要成员变量
/*
* 等待队列的头节点,延迟初始化
* 除了初始化可以修改head值,还可以通过setHead方法设置
* 只要head节点存在,那么该节点的waitStatus状态将不会是CANCELLED
*/
private transient volatile Node head;
/*
* 等待队列的尾节点,延迟初始化
* 只能通过enq方法来添加一个新的等待节点到队列中,从而来修改tail值
*/
private transient volatile Node tail;
/*
* 同步状态
*/
private volatile int state;
可以看到,在AQS类中,有三个比较重要的成员变量,其中两个是表示等待队列的头指针和尾指针。另外一个表示同步的状态。
与state相关的方法有三个:
/*
* 获取当前同步状态
*/
protected final int getState() {
return state;
}
/*
* 设置新的同步状态
*/
protected final void setState(int newState) {
state = newState;
}
/*
* 采用CAS方法来更新同步状态
* expect 期望的值
* update 更新为新的值
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);//采用了sun.misc.Unsafe类来实现CAS操作
}
在AQS中使用到了sun.misc.Unsafe类来实现CAS操作,Unsafe类的compareAndSwapInt()和compareAndSwapLong()等方法包装了CAS操作,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令。
与head和tail相关的方法有两个:
/*
* 将队列的头结点设置为node结点
* 该方法只被acquire系列方法调用
*/
private void setHead(Node node) {
head = node;//让head指针指向node结点
node.thread = null;
node.prev = null;
}
/*
* 插入node节点到队列中,必要时初始化
* 返回node节点的前继节点,即原来的尾节点
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点为空,则先进行初始化操作,创建一个新的node节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//head节点被成功初始化后,将tail节点指向head节点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//将tail节点更新为node节点
t.next = node;
return t;//返回node的前继节点
}
}
}
}
可以看到,head和tail节点的初始化操作是在setHead()和enq()方法中进行的,同时更新操作也是在这两个方法中进行的。
2.4 AQS类暴露给子类实现的方法
/*
* 尝试以独占模式获取
* 该方法应该查询对象的状态是否允许以独占模式获取,如果允许,则获取。
* 该方法总是被执行acquire方法的线程执行,如果该方法失败了,则acquire方法将该线程放入队列中,直到有其他线程调用了release方法来发送通知信号。
* 如果获取成功了,则返回true。
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/*
* 尝试修改状态来反映以独占模式释放
* 该方法总是被执行释放的线程触发
* 如果该对象处于完全释放的状态则返回true
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/*
* 尝试以非独占模式来获取
* 该方法应该查询是否对象允许以非独占模式来获取,如果允许,则获取。
* 该方法总是被执行acquire方法的线程执行。如果该方法失败了,则acquire方法将该线程放入队列中,直到有其他线程调用了release方法来发送通知信号
* 返回值为负值,表示失败;
* 返回0,表示以非独占模式获取成功,但后续的以非独占模式获取将失败
* 返回正值,表示以非独占模式获取成功,后续的以非独占模式获取也会成功
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/*
* 尝试修改状态来反映以非独占模式释放
* 该方法总是被执行释放的线程触发
* 返回true,表示以非独占模式释放成功
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/*
* 如果同步器是被当前线程以独占模式访问,则返回true。
* 该方法在每次调用非等待ConditionObject方法时被触发。
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
2.5 AQS类定义的acquire系列方法
2.5.1 acquire()方法
/*
* 以独占模式获取,屏蔽中断
* 至少触发一次tryAcquire()方法,如果获取成功,直接返回
* 如果获取失败,则线程入队列,然后通过tryAcquire()不断尝试,直至成功
* 该方法可以被用来实现Lock.lock()方法
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()方法是空实现,需要子类覆写该方法,实现具体的获取操作。addWaiter()方法主要是为当前线程创建一个新的Node节点,并把该Node节点以指定的模式存放入队列中。
/*
* 为当前线程创建一个Node节点,并且设置为指定mode,最后将该node节点放入等待队列中
* @param mode mode有两种取值:Node.EXCLUSIVE和Node.SHARED,分别代表以独占模式和非独占模式
* @return 返回新的Node节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//给当前线程创建一个新的Node节点,节点模式为mode
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;//将node节点的前继指针指向尾节点
if (compareAndSetTail(pred, node)) {//更新tail指针指向node节点
pred.next = node;//将原来tail节点的后继指针指向node节点
return node;//这样node节点成功链接到tail节点后面,并更新tail节点指向node节点了
}
}
// 如果前面将node节点入队列失败,则再通过enq()方法入队列,其实现思想和上面的过程一致
enq(node);
return node;
}
addWaiter()方法首先根据mode模式给当前线程创建一个node节点,然后将该node节点放入队列的尾部。acquireQueued()方法以独占不可中断方式获取。
/*
* 为队列中的线程,以独占不可中断模式获取
* 该方法被条件等待和acquire方法使用
* @param node 节点
* @param arg 获取的参数
* @return 在等待的过程中被中断了,则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//无限循环
final Node p = node.predecessor();//获取当前节点的前继节点
if (p == head && tryAcquire(arg)) {//如果前继节点等于head节点,并且尝试获取成功
setHead(node);//更新head节点为node节点
p.next = null; // help GC
failed = false;
return interrupted;//返回是否中断了
}
// 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)//发生了异常,则取消获取操作
cancelAcquire(node);//取消获取操作
}
}
在acquireQueued()方法中,从尾节点开始循环前向遍历,如果当前节点的前继节点是头节点,并且tryAcquire()方法返回true了,则更新头结点,并返回。如果在向前遍历的过程中,遇到了节点获取失败需要挂起时,则会通过LockSupport的park()将当前线程挂起。
/*
* 检查和更新获取失败节点的状态
* 如果线程应该被阻塞,则返回true
* 该方法需要满足pred == node.prev
* @param pred node节点的前继节点,保存状态
* @param node node节点
* @return 如果线程应该被阻塞,则返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//获取前继节点的等待状态
if (ws == Node.SIGNAL)// SIGNAL状态
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//CANCELLED状态
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;//向前遍历搜索,找出前继节点
} while (pred.waitStatus > 0);
pred.next = node;
} else {// waitStatus为0或者为PROPAGATE
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//设置waitStatus为SIGNAL
}
return false;
}
在shouldParkAfterFailedAcquire()方法中,根据pred节点的等待状态做出相应的处理:
- SIGNAL状态:说明该节点已经更新了状态,请求释放,因此可以返回true
- CANCELLED状态:说明节点已经被取消了,忽略前继节点状态为CANCELLED的节点,同时更新node和pred节点值
- PROPAGATE状态或0:说明节点需要更新状态为SIGNAL。
可以看到,shouldParkAfterFailedAcquire()方法只有节点等待状态为SIGNAL时,才会返回true。后续才会执行parkAndCheckInterrupt()方法。
/*
* 停止当前线程参与系统调度,即挂起当前线程,并返回当前线程是否被中断了
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//挂断当前线程
return Thread.interrupted();//返回线程是否被中断了
}
LockSupport.park()方法:
/*
* 挂起线程
* 当发生了以下几种情况,可以唤醒线程:
* 1.其他线程触发了LockSupport.unlock()方法,唤醒线程
* 2.其他线程触发了Thread.interrupt()方法,打断当前线程
* 3.该调用无条件返回了
* 该方法不会记录什么原因导致该方法返回了,因此方法调用者,需要自己重新检查导致线程挂起的条件
* @param blocker 导致线程挂起的同步器
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
从前面的一些方法调用可以看到,acquire方法主要分为两种情况来处理:
- 第一次通过tryAcquire()方法获取成功了,则直接返回;
- 当第一次tryAcquire()失败时,接下来为当前线程创建一个以独占操作模式的Node节点,并把Node节点放入等待队列的尾部。然后在acquireQueued()方法中,从尾节点开始向前,循环从等待队列中取出节点,判断是否允许获取操作。如果不允许获取,即需要阻塞,则通过LockSupport.lock()方法挂起当前线程。当其他线程解除了当前线程的阻塞,或者是发生了中断,则返回继续下一个循环。最终只有遍历到头结点head,并且tryAcquire()方法返回true时,才会从acquireQueued()方法中退出,代表获取完成了。
2.5.2 acquireInterruptibly()方法
/*
* 以独占模式获取,如果发生了中断,则停止获取
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquire()方法
* 如果第一次tryAcquire()方法失败,则将线程放入等待队列中,然后循环调用tryAcquire()方法,直至返回成功或者线程被中断了
* 该方法可以被用来实现Lock.lockInterruptibly()
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//如果线程被中断了,则直接返回异常
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
当tryAcquire()失败时,则调用doAcquireInterruptibly()方法重复的获取。
/*
* 以独占可中断模式获取
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);//给当前线程,创建一个独占模式的节点,并把节点放入到等待队列尾部
boolean failed = true;
try {
for (;;) {//从尾节点开始循环处理
final Node p = node.predecessor();//当前节点的前继节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();// 如果线程被中断过,则抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到doAcquireInterruptibly()方法与acquireQueued()实现大致相似,唯一的不同之处是,doAcquireInterruptibly()检测到线程被中断之后,会抛出一个中断异常。
2.5.3 acquireShared()方法
/*
* 以非独占模式获取,屏蔽中断
* 至少会触发调用一次tryAcquireShared()
* 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
在tryAcquireShared()方法获取失败,会调用doAcquireShared()继续重复的获取。
/*
* 以非独占不可中断模式获取
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//为当前线程创建一个非独占模式的Node节点,并把给Node节点放入到等待队列的尾部
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//从尾节点开始循环,不断向前遍历节点
final Node p = node.predecessor();
if (p == head) {//遍历到头节点
int r = tryAcquireShared(arg);
if (r >= 0) {//获取成功了,直接返回
setHeadAndPropagate(node, r);//更新头结点和同步状态
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared()方法与acquireQueued()方法实现类似,唯一不同之处是获取成功之后,会调用setHeadAndPropagate()方法来更head节点以及同步状态。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
在以下几种情况下,会对Node节点的后继节点进行判断是否需要释放:
- propagate大于0;
- 之前的头结点head为空;
- 之前的头结点head的waitStatus状态小于0;
- 当前的头结点head为空;
- 当前的头结点head的waitStatus状态小于0;
在满足以上几种情况后,如果后继节点为空,或者后继节点是非独占模式的,则执行释放操作。
/*
* 非独占模式的释放
* 通知后继节点并且确保释放的传递
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {// 从头结点开始循环
Node h = head;
// 如果头结点不为空,并且头结点不等于尾节点
if (h != null && h != tail) {
int ws = h.waitStatus;//等待状态
if (ws == Node.SIGNAL) {//等待状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//1.将Node节点的状态更新为0
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒h节点的后继节点
}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//2.将Node节点的状态更新为PROPAGATE
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
可以看到,在doReleaseShared()方法中,主要的工作有:
-
先将头节点从SIGNAL状态更新为0,并且唤醒头结点的后继节点;
-
将头节点的状态从0更新为PROPAGATE;
-
如果头结点在更新状态的时候没有发生改变,则退出循环;
/*
-
唤醒Node的后继节点
/
private void unparkSuccessor(Node node) {
/- If status is negative (i.e., possibly needing signal) try
- to clear in anticipation of signalling. It is OK if this
- fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
- Thread to unpark is held in successor, which is normally
- just the next node. But if cancelled or apparently null,
- traverse backwards from tail to find the actual
- non-cancelled successor.
*/
Node s = node.next;//Node的后继节点
if (s == null || s.waitStatus > 0) {// 后继节点为空,或者后继节点的等待状态大于0,则尝试从尾部节点开始到Node节点为止,寻找最靠近Node节点的等待状态小于等于0的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 存在后继节点,其等待状态小于等于0
if (s != null)
LockSupport.unpark(s.thread);//唤醒该节点关联的线程
}
-
在unparkSuccessor()方法中,主要是唤醒Node的后继节点中等待状态小于等于0的节点。
2.5.4 acquireSharedInterruptibly()方法
/*
* 以非独占模式获取,如果发生了中断,则停止获取
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquireShared()方法
* 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//线程被中断了
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在tryAcquireShared()返回失败后,会调用doAcquireSharedInterruptibly()方法重复尝试获取。
/*
* 以非独占可中断模式获取
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//获取前继节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();//抛出中断异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到doAcquireSharedInterruptibly()方法与doAcquireShared()方法实现类似,唯一的不同之处是,在线程等待的过程中,如果被中断了,则会抛出中断异常。
2.5.5 tryAcquireNanos()
/*
* 尝试以独占模式获取,如果发生了中断则停止,如果超时了,则返回失败
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquire()方法
* 如果调用tryAcquire()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquire()方法,直到返回成功,或者被中断,或者超时了。
* 该方法可以被用来实现Lock.tryLock(long,TimeUnit)方法
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())//检查线程是否被中断了
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
当tryAcquire()方法返回失败时,会去调用doAcquireNanos()方法,重复尝试获取。
/*
* 以独占超时模式获取
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)// 如果超时时间小于0,则直接返回
return false;
final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间
final Node node = addWaiter(Node.EXCLUSIVE);//为当前线程创建一个独占模式的Node节点,并把Node放入到等待队列中
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//前继节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();//计算剩余时间
if (nanosTimeout <= 0L)//超时了,直接返回
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)//在获取失败后,如果需要让线程挂起,则通过LockSupport的parkNanos()方法,让线程挂起指定的时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//如果线程被中断了,则抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireNanos()方法与acquireQueued()方法实现很类似,不同之处在于,doAcquireNanos()加了一个超时判断,如果超时了,则直接返回。另外,doAcquireNanos()使用带超时时间的LockSupport.parkNanos()方法来暂停线程。
2.5.6 tryAcquireSharedNanos()方法
/*
* 尝试以非独占模式获取,如果发生了中断,则停止,如果超时了,则返回失败
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquireShared()方法
* 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功,或者被中断,或者超时了。
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())//线程被中断了
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
当tryAcquireShared()方法返回失败时,会去调用doAcquireSharedNanos()不断重复尝试获取。
/*
* 以非独占超时模式获取
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)// 如果超时时间小于0,则直接返回
return false;
final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间
final Node node = addWaiter(Node.SHARED);//为当前线程创建一个非独占模式的Node节点,并把Node放入到等待队列中
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//前继节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();//计算剩余时间
if (nanosTimeout <= 0L)//超时了,直接返回
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) //在获取失败后,如果需要让线程挂起,则通过LockSupport的parkNanos()方法,让线程挂起指定的时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedNanos()方法与doAcquireNanos()方法实现类似,唯一的区别是doAcquireSharedNanos()方法中是以非独占模式去获取状态,调用的是tryAcquireShared()方法去获取状态。
至此,已经介绍了所有公开的与acquire相关的final方法。下面看看所有公开的与release相关的final方法。
2.6 AQS类里面定义的release方法
2.6.1 release()方法
/*
* 以独占模式释放
* 如果tryRelease()方法返回true,则至少有一个线程非阻塞
* 该方法可以用来实现Lock.unlock()方法
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//tryRelease()返回true,则返回true
Node h = head;//头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒头结点的后继节点
return true;
}
return false;
}
可以看到,在release()方法中,首先会调用tryRelease()方法尝试释放,如果释放成功,则返回true;如果释放失败,则直接返回false。在tryRelease()方法返回成功后,还会根据头结点的等待状态来判断是否需要唤醒头结点的后继节点。
2.6.2 releaseShared()方法
/*
* 以非独占模式释放
* 如果tryReleaseShared()方法返回true,则至少有一个线程非阻塞
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//tryReleaseShared()返回true,则返回true
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()方法在5.3中的acquireShared()方法中有介绍到,其主要是先将头节点从SIGNAL状态更新为0,并且唤醒头结点的后继节点,然后将头节点的状态从0更新为PROPAGATE。
3.AQS的使用
我们通常不是直接继承AQS类,而是将相应的功能委托为私有的AQS子类来实现。下面是AQS类源码中介绍的两个使用范例:
- 使用范例1
下面是一个不可重入互斥锁Mutex,使用0代表非锁定状态,使用1代表锁定状态。虽然不可重入锁不需要严格记录持有锁的当前线程,但是在Mutex类中,实现了记录当前持有锁的线程,这样更容易监控。另外,Mutex类也支持Condition条件,并且暴露了一些方法给外部使用。
public class Mutex implements Lock ,Serializable {
private static class Sync extends AbstractQueuedSynchronizer{
// Reports whether in locked state
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
@Override
protected boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
@Override
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition(){
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked(){
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads(){
return sync.hasQueuedThreads();
}
}
- 使用范例2
下面是一个实现类似闭锁CountDownLatch功能的类,它是以非独占模式获取和释放。
public class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer{
boolean isSignalled(){
return getState() != 0;
}
@Override
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled(){
return sync.isSignalled();
}
public void signal(){
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}