AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
,队列同步器
,简称AQS
,它是java并发用来构建锁
或者其他同步组件
的基础框架
AQS
本身是没有实现任何同步接口
的,它仅仅只是定义了同步状态
的获取和释放
的方法来供自定义的同步组件的使用
AQS主要是怎么使用的呢?
在java的同步组件中,AQS的子类
一般是同步组件的静态内部类
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
... ...
AQS的实现依赖内部的同步队列(FIFO双向队列)
来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node
,并将其加入同步队列
,同时阻塞当前线程
。当同步状态释放
时,唤醒队列的首节点
- Node
Node主要包含以下成员变量
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
static final Node EXCLUSIVE = null;
当前线程被取消;
static final int CANCELLED = 1;
当前节点的后继节点需要运行;
static final int SIGNAL = -1;
当前节点在等待condition
static final int CONDITION = -2;
当前场景下后续的acquireShared可以执行
static final int PROPAGATE = -3;
prev:前驱节点;
next:后继节点;
thread:进入队列的当前线程;
nextWaiter:存储condition队列中的后继节点。
Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
对于锁
的获取,请求形成节点将其挂在队列尾部
,至于资源的转移,是从头到尾
进行,队列的基本结构就出来了:
同步队列插入/删除节点
- 节点插入
AQS提供基于CAS的设置尾节点的方法:
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
需要传递当前线程认为的尾节点
和当前节点
,设置成功后,当前节点
与尾节点
建立关联。
- 节点删除
同步队列遵循FIFO
,首节点
是获取同步状态成功
的节点,首节点
的线程在释放同步状态之后
将会唤醒后继节点
,后继节点
将会在获取同步状态
成功的时候将自己设置为首节点
image.png
注:设置首节点
是由获取同步状态成功的线程
来完成,因为每次只会有一个线程
能够成功的获取到同步状态
,所以,设置首节点
并不需要CAS
来保证。
AQS源码解析
AQS提供以下接口以供实现自定义同步器:
- protected boolean tryAcquire(int arg)
独占式
获取同步状态
,该方法的实现需要先查询当前的同步状态是否可以获取,如果可以获取再进行获取 - protected boolean tryRelease(int arg)
释放状态
- protected int tryAcquireShared(int arg)
共享式获取同步状态
- protected boolean tryReleaseShared(int arg)
共享式释放状态
- protected boolean isHeldExclusively()
独占模式下,判断同步状态是否已经被占用
AQS提供两种方式来操作同步状态,独占式与共享式,
独占式同步状态获取 - acquire实现
- 具体执行流程如下:
1.调用tryAcquire
方法尝试获取同步状态
;
2.如果获取不到同步状态
,将当前线程
构造成节点Node
并加入同步队列
;
3.再次尝试获取
,如果还是没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。
下面我们具体来看一下节点的构造以及加入同步队列部分的代码实现
addWaiter实现
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
使用当前thread构造Node;
尝试在队尾插入节点,如果尾节点已经存在,就做以下操作
- 分配引用T指向尾节点;
- 将
待插入
节点的prev指针
指向尾节点
; - 如果
尾节点
还为T
,将当前尾节点
设置为带待插入节点
; - T的next指针指向待插入节点
快速在队尾
插入节点
,失败
则进入enq(Node node)
方法
- enq实现
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq的逻辑可以确保Node可以有顺序的添加到同步队列中,具体的加入队列的逻辑如下:
初始化同步队列:如果尾节点为空,分配一个头结点,并将尾节点指向头结点;
节点入队,通过CAS将节点设置为尾节点,以此在队尾做节点插入。
可以看出,整个enq方法通过“死循环”来保证节点的正确插入。
进入同步队列之后接下来就是同步状态的获取了,或者说是访问控制acquireQueued
。对于同步队列中的线程,在同一时刻只能由队列首节点
获取同步状态,其他的线程进入等待,直到符合条件才能继续进行。
- AbstractQueuedSynchronizer.acquireQueued实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取当前节点
的前驱节点
;
如果当前节点
的前驱节点
是头节点
,并且可以获取同步状态
,设置当前节点
为头结点
,该节点占有锁
;
不满足条件的线程进入等待状态。
在整个方法中,当前线程一直都在“死循环”中尝试获取同步状态
从代码的逻辑也可以看出,其实在节点与节点之间在循环检查的过程中是不会相互通信的,仅仅只是判断自己当前的
前驱
是不是头结点
,这样设计使得节点的释放符合FIFO,同时也避免了过早通知。
-
acquire实现总结
-
同步状态维护:
对同步状态的操作是原子、非阻塞的,通过AQS提供的对状态访问的方法来对同步状态进行操作,并且利用CAS来确保原子操作; -
状态获取:
一旦线程成功的修改了同步状态,那么该线程会被设置为同步队列的头节点; -
同步队列维护:
image.png
不符合获取同步状态的线程会进入等待状态,直到符合条件被唤醒再开始执行。
整个执行流程如下:
当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,让后续节点可以获取到同步状态
,调用方法release(int arg)
方法可以释放同步状态
-
独占式同步状态释放 - release实现
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
尝试释放状态,tryRelease
保证将状态重置回去,同样采用CAS
来保证操作的原子性;
释放成功后,调用unparkSuccessor
唤醒当前节点
的后继节点线程
unparkSuccessor实现
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
取出当前节点的next节点
,将该节点线程唤醒
,被唤醒的线程获取同步状态。这里主要通过LockSupport
的unpark
方法唤醒线程
共享式同步状态获取
共享式
获取与独占式
获取最主要的区别就是在同一时刻能否有多个线程可以同时获取到同步状态。这两种不同的方式在获取资源区别如下图所示
共享式访问资源时,其他共享式访问都是被允许的;
独占式访问资源时,在同一时刻只能有一个访问,其他的访问都被阻塞
AQS提供
acquireShared
方法来支持共享式
获取同步状态
- acquireShared实现
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
调用tryAcquireShared(int arg)方法尝试获取同步状态:
tryAcquireShared方法返回值 > 0时,表示能够获取到同步状态;
获取失败调用doAcquireShared(int arg)方法进入同步队列。
以CountDownLatch的tryAcquireShared实现来看,state为0时,返回1,获取不到同步状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- doAcquireShared实现
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取当前节点的前驱节点;
如果当前节点的前驱节点是头结点,并且获取到的共享同步状态 > 0,设置当前节点的为头结点,获取同步状态成功;
不满足条件的线程自旋等待。
与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)方法可以释放同步状态。
共享式同步状态释放 - releaseShared实现
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
调用tryReleaseShared
方法释放状态
;
调用doReleaseShared
方法唤醒后继节点
;
https://www.jianshu.com/p/df0d7d6571de
https://segmentfault.com/a/1190000008471362#item-2