6-AbstractQueuedSynchronizer(一)—
类介绍
AbstractQueuedSynchronizer
继承关系
直接贴图:

AbstractQueuedSynchronizer
作用及简介
AbstractQueuedSynchronizer
通过一个双向队列对锁做了基本实现。
AbstractQueuedSynchronizer
对锁的实现提供了支持,在使用时最好通过私有内部类继承来及进行AQS
的继承和定制。
AQS
没有实现任何和锁相关的接口,他对锁可能用到的接口做了实现。他支持共享模式和互斥模式的锁,当然,他并不care这两种模式是什么含义,它只是定义了这两种模式并给出了一些实现。
AQS
中
- 通过内部类
Node
和一些方法完成对FIFO
队列通用操作的实现- 通过内部类
ConditionObject
和一些方法完成对阻塞队列的实现。
AQS
还提供了一些其他的方法,例如:
- 监视器方法,用来显示当前队列的状态
- 一些常用的锁相关方法,子类可自行使用以方便实现
AQS
和之前阅读的集合类源码一样使用了适配器模式,在上面提供的方法中依赖的核心方法都是空实现,子类可根据实际情况和使用场景进行自行实现,包括:
tryAcquire()
tryRelease()
tryAcquireShared()
tryReleaseShared()
isHeldExclusively()
新来的线程在竞争锁时会和队头的线程一起竞争,也就有可能直接获得锁而不需要入队,这种贪心模式在吞吐量等方面有很多优势。如果你要防止饥饿状态发生要追求公平的话,可以通过重定义tryAcquire()/tryAcquireShared()
使只有队中的线程能获得锁。
博客思路介绍
concurrent
框架的思路在AQS
中有不少体现。所以我们打算着重记录一下,记录的思路如下:
- 引入及介绍
AQS
队列通用的方法 - 介绍
AQS
预提供的各种和锁获得、释放相关的方法;及暴露出来的用来重写的方法。 - 介绍队列相关的监控方法
- 介绍
Condition
相关的方法 - 扩展、总结及展望
本文章主要对AQS
进行大概的定位和使用的介绍,并对AQS
中的内部类Node
定义的双向队列以及相关基本操作进行介绍。
Node
内部类介绍
源码介绍
内部类
内部类主要定义了四个部分:
- 获取模式定义:
- 共享模式
- 互斥模式
- 结点状态定义
- 已取消(1)【节点对应的线程已经取消】
- 正常排队(0)【节点对应的线程正常排队】
- 需唤醒后继节点(-1)【节点的后继节点线程被阻塞,需要进行唤醒操作】
- (互斥)等待获得锁(-2)【节点在互斥队列中等待】
- (共享)需传播(-3)【节点以共享模式排队,并应该把共享、可用的消息继续传递给下个节点】
- 正常排队的前驱、后继节点
- 互斥锁的队列的后驱节点
static final class Node {
/**
* 节点的模式【共享/互斥】(默认应该是互斥)
*/
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
类数据结构
在类中还提供了和队列相关的数据结构及操作:
/**
* 队列头
*/
private transient volatile Node head;
/**
* 队尾
*/
private transient volatile Node tail;
/**
* 异步状态。。。。。emmmm,后面再说
*/
private volatile int state;
/**
* 定义的一个临界值,如果距离超时时间比此值大,说明时间还很长就阻塞等待;如果比此值小,就
* 死循环尝试了
*/
static final long spinForTimeoutThreshold = 1000L;
双向队列基本操作介绍
进队列【入参为已建立的节点】
源码
/**
* 将入参 node插入到队列中
* @param node 要插入的数据节点
* @return 插入节点的前缀
*/
private Node enq(final Node node) {
// 由于队列采用懒加载,故采用循环,直至初始化成功并
// 入队成功后再返回
for (;;) {
Node t = tail;
if (t == null) { // 队列未初始化,先初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;// 先把node的pre挂上【方便在查询时避免有新节点入队引起的竞争】
if (compareAndSetTail(t, node)) {// 设置tail的指向
t.next = node;// 把node前置节点的next挂上
return t;
}
}
}
}
思路
这个方法之所以复杂,是因为在入队的时候兼顾了两个地方:
-
由于队列是有空的头部节点的,以方便使用。而且队列的初始化采用懒加载,所以用一个循环,如果没初始化就先初始化;然后再执行一遍,初始化完成后开始添加。
-
由于要考虑多线程引起的竞争问题,我们在入队列时
- 先把
node.prev
连上 - 再移动
tail
以入队列 - 最后把
pre
的next
搭上
这样可以防止队列入到一半时有人遍历队列引起的类似不可重复读的问题。不过,这样在后面的读取操作时也要注意了:
读到
next == null
不代表读到队尾,要从队尾通过pre
从新读一下以确保是队尾。next
的地位就作为获得改节点后继的便捷通道而存在 - 先把
队列情况如下:

进队列【入参为节点类型】
源码
/**
* 根据入参的模式创建一个当前线程的节点,并进队列
*
* @param mode Node.EXCLUSIVE 用来创建互斥锁节点, Node.SHARED 用来创建共享锁的节点
* @return 新创建的节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
思路
没啥新的,和第一个进队列函数一样,要注意的是mode
本身就是一个Node
实例,具体可参见上面的内部类源码。
疑惑:这样会不会更简单
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); enq(node); return node; }
设置队头
源码
/**
* 用来设置类中定义的`head`,也就是双向队列的头部,从而达到出队列的目的
* 只有acquire相关的方法调用这个,用来在获得锁后唤醒此节点的线程,然后出队。
* 此方法也会把head指向节点的thread,prev置空以方便垃圾回收。
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
思路
要注意的点:
注意了,在enq()
方法的介绍中也介绍过,这个双向队列是有假头的,头部指向的是上一次唤醒的节点,也就是用过的废节点,所以它方便GC置空的是head
指向的节点的field
。
唤醒后继
源码
/**
* 如果此节点有后继节点,则唤醒。
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* 因为此方法已经是要唤醒此节点的后继节点了,所以如果它原来的状态是负值
*(SIGNAL,CONDITION,PROPAGATE),那就置成0,表示不会再传播唤醒操作
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 获得node节点的有效的后继节点,思路如下:
* 1. 如果他的next存在且后继节点对应的状态不大于0(等于1,即CANCELLED)
* 则此步操作完成,得到了有效的后继节点
* 2. 如果1不满足,问题就出来了,可能的情况如下:
* a. 这个节点就是tail指向的节点,即队尾
* b. 这个节点是tail之前指向的节点,刚刚有个新节点入队了,tail后移,新尾
* 节点的pre设置了,但是没来得及设置此节点的next
* c. 此节点的next指向的节点因为一些原因被取消了,需要再重新找后继节点
* 综合a,b,c三种状态,直接从尾部用pre向前遍历即可搞定,找到非node的第一个
* 可用节点即可。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
/**
* 如果找到了有效的后继节点,就直接唤醒后继节点对应的线程
*/
if (s != null)
LockSupport.unpark(s.thread);
}
思路
如果队列情况如下:

在唤醒A的后继节点时会出现E比D,C更早释放的情况,即代码注释中的c情况,这个我们保留吧。反正我觉得这个和FIFO的队列是由冲突的。
唤醒后继【共享模式】
源码
/**
* 共享模式下的唤醒后继节点
*/
private void doReleaseShared() {
/*
* 为了确保我们的唤醒信号可以传播下去。我们分两种情况:
* 1. 如果头节点状态是 SIGNAL,那么就直接唤醒下个节点对应的线程
* 2. 如果头节点状态是默认状态0,即是普通的一个节点,那就把头节点状态置为
* PROPAGATE 以方便下次有线程唤醒时 PROPAGATE 状态可以继续传播下去
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
思路
这里为什么用循环?
防止在操作时有节点出队列,如果队头有变化,之前做的唤醒successor或者设置队头的 PROPAGATE 都没用了。
为什么 SIGNAL 要置成0 (即普通节点)并唤醒下一个节点,而普通节点要置成 PROPAGATE 并退出?
什么是共享模式?是内部类
Node
中的字段nextWaiter
是SHARED
还是EXCLUSIVE
。和waitStatus
没有必然关系。在方法的名称中已经指明是共享模式,接下来要关系的只是共享模式的锁释放信号是否要继续传递下去,即是否继续唤醒下一个线程,所以循环中的代码就好理解了:
- 如果头节点的状态是 SIGNAL 就按要求继续唤醒
- 如果头节点状态是 0 ,现在还不能唤醒,就标记一下,等下个操作
为什么共享模式 SIGNAL 只唤醒了下一个节点?
在
acquire
后,即线程被唤醒并获得锁后,会继续唤醒他的下个节点,我们这里只唤醒一个即可实现链式唤醒。
设置头部【共享模式】
源码
/**
* 设置队头并唤醒后继【设置队头其实就是做出队操作】
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 尝试唤醒下一个节点的线程如果以下条件满足:
* 1. 入参或者head提示我们需要继续唤醒【abc满足一个即可】
* a. 入参propagate为正
* b. head节点为空【所以我们无法判断是否需要继续唤醒,宁错杀不放过!唤醒】
* c. head节点表示需要唤醒其后继节点
* 2. 新的头节点的后继节点正以共享模式等待
* a. 新节点的后继节点为空【所以我们无法判断是否需要继续唤醒,
* 宁错杀不放过!唤醒】
* b. 新节点的后继节点正以共享模式等待
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
思路
这个的处理思路我大概理解如下:
-
设置队头,该怎么设就怎么设【对应方法名中的
setHead
】 -
唤醒后继【对应方法名中的
Propagate
】原因:反正在阻塞的过程中是存在”虚假唤醒“的情况【参考3.1 虚假唤醒】,在使用阻塞时都用的循环做条件判断,所以多唤醒几次也无所谓了。
参考文献
jdk源码