Java小白系列(十二):抽象队列同步器(AbstractQue
一、前言
抽象队列同步器,是一个抽象类,它采用了模板方法的设计模式《Java设计模式二:模板模式》。同时,它也内置了 CLH 队列《小白十:CLH》,让加入的线程自旋,当该线程(节点)的前驱节点拿到锁时,该线程就进入阻塞,等待前驱节点的唤醒,而线程的唤醒方式则是通过线程中断的方式来实现的,关于线程中断,大家可以详看:《小白十一:线程中断》。
既然 AQS 是抽象类,所以,本文只分析核心流程,遇到需要子类实现的方法时,我们先跳过,到时候再结合具体的子类我们进行整体分析。
二、独占与共享
我们在学习 CLH 的时候,就讲到了一个节点可能是独占模式,也可能是共享模式,但对于 AQS 来说,但凡没有获取到锁的线程,都要被加入到 AQS 的 CLH 队列中,因此,AQS 并不是特别关心获取锁的线程是哪种模式,我们在使用模板设计模式时,也需要有这种思想。
虽然 AQS 不关心,但它给出了一组可选的方法需要子类来重载:
- 如果子类是独占模式,则只需要重载并实现独占模式相关的方法(比如:写锁);
- 如果子类 是共享模式,则只需要重载并实现共享模式相关的方法(比如:读锁);
- 那如果都实现了呢?这种锁就是上面两种的结合(读写锁);
这段注释说明中,我们可以了解到几点:
- 获取状态: getState;
- 修改状态:setState / compareAndSetState;
- 另外5个方法需要我们根据实际情况下选择性重载;
- 可选性的方法实现时,需要保证内部线程安全,且快速以及不被阻塞;
- AQS 中的其它方法不允许被重载,因为是 final 的;
因此,AQS 提供了几个可选的方法需要实现:
private static final class LockSync extends AbstractQueuedSynchronizer {
protected LockSync() {
super();
}
@Override
protected boolean isHeldExclusively() {
return super.isHeldExclusively();
}
@Override
protected boolean tryAcquire(int arg) {
return super.tryAcquire(arg);
}
@Override
protected boolean tryRelease(int arg) {
return super.tryRelease(arg);
}
@Override
protected int tryAcquireShared(int arg) {
return super.tryAcquireShared(arg);
}
@Override
protected boolean tryReleaseShared(int arg) {
return super.tryReleaseShared(arg);
}
}
我们来分析一下这几个方法的含义:
- isHeldExclusively:该线程是否正在独占资源。只有用到Condition才需要去实现它;
- tryAcquire:独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False;
- tryRelease:独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False;
- tryAcquireShared:共享方式。arg为获取锁的次数,尝试获取资源;
- 负数表示失败;
- 0表示成功,但没有剩余可用资源;
- 正数表示成功,且有剩余资源;
- tryReleaseShared:共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False;
根据上面这些,我们之后会分析到的(提前剧透):
- ReentrantLock:是独占锁,所以实现了 tryAcquire + tryRelease + isHeldExclusively;
- ReentrantReadWriteLock:是共享锁,所以都实现了;
三、AQS 核心流程
我们的 AQS 内部采用 CLH 队列来管理所有需要获取锁而获取失败,需要加入到队列中,本地自旋来等待锁的释放。
因此,流程的起点是:
- 阻塞的线程从『尝试获取锁』开始;
- 占用中的线程在使用完后,从『尝试释放锁』开始;
这也是符合『锁』的流程:加锁 or 解锁!
我们来看个独占锁的请求锁和释放锁的流程:
// 获取锁流程
Acquire:
while (!tryAcquire(arg)) {
// enqueue thread if it is not already queued;
// possibly block current thread;
}
// 释放锁流程
Release:
if (tryRelease(arg)) {
// unblock the first queued thread;
}
(共享模式与上面的类似,只是调用的方法不同;且因为是共享模式,当一个锁释放时,会引起一堆线程的竞争)
3.1、再谈 CLH 的公平与非公平性
不知道大家是否还记得我在《小白十》中分析 CLH 时,通过文字的描述方式和大家说过了,CLH 本身是公平的(严格的FIFO),但是,之所以存在不公平的竞争,原因在于:新来的线程是先入队再请求锁,还是先请求锁且失败后再入队列。JDK 源码中注释也说了这一点:
公平与非公平.png原注释也说了:
因为在入队之前可以先尝试一下获取锁,因此,新来的线程就有机会与队列中被阻塞的首元素竞争。当然,我们也可以在 tryAcquire 或 tryAcquireShared 方法中来禁止这种不公平的竞争行为。一般大多数公平同步器会根据 AQS 中的 hasQueuedPredecessors 这个方法的返回来返回结果。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
该方法会在之后详细展开分析,现简单分析返回结果;返回 true 表明队列中有正在排队或正在竞争的线程;返回 false 表明队列可能为空(或者当前竞争的线程就是自己)。
所以,tryAcquire 实现公平性,仅依赖 hasQueuedPredecessors 的返回结果即可,简单来说,代码可如下实现:
protected boolean tryAcquire(int arg) {
return !hasQueuedPredecessors();
}
即:新来的线程想要获取锁时,先获取 hasQueuedPredecessors 的结果,如果 hasQueuedPredecessors 返回 true,表明队列有排队的元素,那么 tryAcquire 就返回 false ,表明获取锁失败!
3.2、获取锁流程
3.2.1、AQS.acquire
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
该方法是独占模式调用的:
- tryAcquire:先尝试获取锁;(由子类来决定是否公平)
- addWaiter:如果不成功,则封装成 CLH 的一个节点,然后加入队列;
- acquireQueued:等待线程原地自旋,自旋一段时间后进入阻塞;
- selfInterrupt:线程中断来唤醒线程(park后被中断是没作用的,因此,unpark后再中断一下);
3.2.2、AQS.addWaiter(重点:入队)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Creates and enqueues node for current thread and given mode.
*/
private Node addWaiter(Node mode) {
// 封装为一个独占模式的 Node
Node node = new Node(Thread.currentThread(), mode);
// 获取 tail 尾节点
Node pred = tail;
// tail != null,pred 就是 tail 节点
if (pred != null) {
// 新节点.prev = pred (tail节点 )
node.prev = pred;
// CAS,直接设置 tail = node
if (compareAndSetTail(pred, node)) {
// pred (tail节点).next = node
pred.next = node;
return node;
}
}
// tail = null,表明队列还未初始化
// 需要先初始化 head & tail
enq(node);
// 返回新的节点
return node;
}
}
- 先封装为一个独占模式的 Node;
- 快速获取 tail 尾节点;
- 如果尾节点为 null,则直接调用 enq 入队;
- 如果尾节点不为 null:
- 则将新的节点 prev 指向 tail 节点;
- 然后 CAS 直接设置新的节点为 tail 节点;
- 原 tail 节点的 next 指向新 tail 节点;
3.2.3、AQS.enq
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}
该方法主要是用来初始化 CLH 队列头结点,然后再插入新的节点。
我们来分析下:
- 该方法是一个无限循环;
- 获取 tail 尾节点;
- 如果尾节点为 null,则 head = tail = null;
- 构造 head 结点,且 CAS 设置 head = tail = new Node();
- tail 不为空,则执行和 addWaiter 中加入节点一样的过程;
- 加入成功则返回新节点的前驱节点;
3.2.4、AQS.acquireQueued(重点:自旋 -> park阻塞 -> 拿到锁返回)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 重点:自旋
for (;;) {
// 获取 node 的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点,且获取到锁,才会退出该循环
// 因此,这里我们也要注意一点:
// 即便前驱节点是头节点,此时该线程也只是尝试竞争获取锁,而
// 在非公平模式下,还是有可能失败使其继续自旋!
if (p == head && tryAcquire(arg)) {
// 将该节点设为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 该线程也不能一直自旋消耗CPU是吧?所以,自旋到了一定时候,就要进入阻塞状态,
// 等待被唤醒,如果下面的两个条件成立,则表示线程进入阻塞状态,待之后前驱节点
// 释放锁后,通过中断来唤醒该线程
if (shouldParkAfterFailedAcquire(p, node) && // 判断前驱节点的状态是否为 Node.SIGNAL
// 若 shouldParkAfterFailedAcquire = true 才会走到该方法
// 该方法就是 park 线程进入阻塞状态,一旦park成功,就被挂起,
// 挂起中的线程是无法响应线程中断的,因此在 unpark 后检查一下
// 线程是否曾经被中断过,返回线程中断的状态
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),
// 那么取消结点在队列中的等待
if (failed)
cancelAcquire(node);
}
}
}
parkAndCheckInterrupt 线程挂起后就不会再继续执行后续代码!直到被前驱节点释放锁时 unpark 才行!并且,在挂起过程中,线程中断也无法响应!
3.2.5、AQS.shouldParkAfterFailedAcquire
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点已经被设置为该状态:即该节点线程释放锁时会通知下一个节点
*/
return true;
if (ws > 0) {
/*
* 该线程取消了
*/
do {
// 不断向前查找节点,直到节点的状态 <= 0
// 这行代码分解为两行:
// pred = pred.prev; // 获取前驱节点;
// node.prev = pred; // 新节点的prev指向新的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 新的前驱节点的next = 新的节点
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
重点是 compareAndSetWaitStatus,一旦前驱节点的状态被设置为 SIGNAL,那么 node 的下一次自旋就会进入阻塞状态,因为,下一次循环进入该方法时,前驱节点的 waitStatus 状态为 SIGNAL。
3.2.6、AQS.parkAndCheckInterrupt(重点:挂起)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
private final boolean parkAndCheckInterrupt() {
// 调用 park 使线程进入 waiting 状态
LockSupport.park(this); // -----------------挂起!unpark后才会继续执行下一行代码
// 返回线程是否被中断过,并清除中断状态!
return Thread.interrupted();
}
}
该方法很简单,直接让线程进入阻塞状态,然后,返回是否曾经被中断过。
至此,整个获取锁的流程就分析完了,我们来小结下:
- tryAcquire尝试获取锁;
- 获取失败,addWaiter入队;
- acquireQueued 自旋、检查or设置前驱节点 waitStatus = SIGNAL;
- 然后 park 挂起,挂起中不响应中断;
- 等待前驱节点释放时来 unpark (下面要分析的);
- unpark 后的下一次循环开始进入锁的竞争(因为 unpark 后,其前驱是 head 节点);
- 如果失败,再次挂起,重复 5,6步骤;
- 如果成功,则把自己设置为 head 节点,返回是否曾经被中断过;
我们可以看到 acquire 是整个获取锁的入口,一旦没有成功,该方法也会被阻塞,直接该调用者(即线程)拿到锁,或者超时、取消等其它原因才会继续执行完;整个过程其实非常的简单,只是有些细节,大家需要注意,凡是我标注的重点,大家都应该仔细看看其中的注释,细细品味各种可能性。
3.3、释放锁流程
我们分析完了获取锁的流程,并且也提到了一点点 unpark ,所以,我们的释放锁的流程会非常的容易理解,流程也非常的短小。获取锁的入口是『acquire』,而释放锁的入口则是『release』;对于共享模式,释放锁的模式则是『releaseShared』。
3.3.1、AQS.release
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
该方法很简单,调用子类『tryRelease』,具体由子类实现,我们直接跳过,只研究成功的情况。当 tryRelease 成功释放锁,则拿到当前 head 节点,如果 head.waitStatus != 0 ,则去 unpark;那 unpark 谁呢?当然是 unpark 下一个节点嘛。
3.3.2、unparkSuccessor
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
private void unparkSuccessor(Node node) {
// 先尝试将自己的 waitStatus 置为 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取下一个节点,并尝试去唤醒它!
Node s = node.next;
// 如果下一个节点不存在,或者已被 cancelled
if (s == null || s.waitStatus > 0) {
s = null; // 置该节点为null
// 从后向前遍历,直到 t = null or t == node 时结束
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// unpark!取消线程的挂起
LockSupport.unpark(s.thread);
}
}
整个解锁流程就分析完了,对,就这么简单!
四、小结
AQS 咋一看,方法很多,但我们只关注主流程,其实就几个函数,也非常简单;希望大家通过学习并掌握主流程之后,再去了解其它方法,就不会有任何压力了。AQS 分析完了,接下来,我将会带领大家开始分析我们常用的一些子类,如:ReentrantLock、ReadWriteLock等。