AQS(AbstractQueuedSynchronizer)详
什么是AQS?
-
AQS(AbstractQueuedSynchronizer): 是并发容器J.U.C(java.util.concurrent)下locks包内的一个类. 它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列. 底层实现的数据结构是一个双向链表.
-
AQS的核心思想是, 如果被请求的共享资源是空闲的, 则将当前请求资源的线程设置为有效线程, 并且将共享的资源设置为锁定的状态. 如果被请求的共享资源被占用, 那么就需要一套线程阻塞/等待以及唤醒进行锁分配的机制, 这个机制AQS是用CLH(参考:
备注1
)队列锁实现的, 就是将暂时获取不到锁的线程加入到队列中进行等待. -
AQS定义两种资源共享方式: Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)
-
AQS同步器是用一个int变量state来表示状态. 同步功能使用的方法都是子类继承AbstractQueuedSynchronizer类实现的. 子类通过继承同步器实现自身需要的方法来管理state状态, 管理的方式就是通过
accquire()/accquireShared()/release()/releaseShared()
等方法来操作状态. 在多线程环境下状态的操作必须保证其原子性, 所以子类在状态的管理中需要使用AQS同步器提供的三个方法操作state:getState()/setState(int)/compareAndSetState(int, int)
. -
子类推荐被定义为自定义同步装置的内部类
(大佬都是这么实现的, 跟着没毛病)
.
备注1:
CLH锁即Craig, Landin, and Hagersten (CLH) locks. CLH锁是一个自旋锁。能确保无饥饿性. 提供先来先服务的公平性.
CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁, 申请线程仅仅在本地变量上自旋, 它不断轮询前驱的状态, 假设发现前驱释放了锁就结束自旋
AQS中的数据结构
节点和同步队列
Node节点属性
- 节点的状态waitStatus.
CANCELLED(1): 表示当前节点被取消, 进入改状态的节点将不会在发生变化.
SIGNAL(-1): 表示后继结点在等待当前结点唤醒. 后继结点入队时, 会将前继结点的状态更新为SIGNAL.
CONDITION:(-2) : 表示当前节点在condition队列中进行等待. 当其他线程调用了Condition的signal()方法后, CONDITION状态的结点将从等待队列转移到同步队列中, 等待获取同步锁.
PROPAGATE(-3): 共享模式下, 前继结点不仅会唤醒其后继结点, 同时也可能会唤醒后继的后继结点.
值为0, 表示当前节点在sync队列中,等待着获取同步锁.
- Node prev: 前驱节点.
- Node next: 后继节点.
- Node nextWaiter: 存储condition队列中的后继节点.
- Thread thread: 当前线程.
同步队列数据结构
核心方法分析
public final void acquire(int arg)
该方法是独占模式下线程获取共享资源的入口, 如果获取到资源后, 线程直接返回.否则将进入等待队列, 直到获取到资源为止(整个过程忽略中断的影响
). 这就是Lock.lock()的语义, 你也可以自定义Lock顶层接口, 参考 Doug Lea对Lock的定义.
public final void acquire(int var1) {
if (!tryAcquire(var1) && acquireQueued(addWaiter(Node.EXCLUSIVE), var1)) {
selfInterrupt();
}
}
函数流程如下:
- tryAcquire(): 尝试直接获取资源, 如果成功直接返回
(调用tryAcquire更改状态,需要保证原子性. 这里体现了非公平锁, 每个线程获取锁时会尝试直接抢占加塞一次, 而CLH队列中可能还有别的线程在等待)
. - addWaiter(): 如果获取不到, 将当前线程构造成节点Node并加入sync队列的尾部, 并且标记为独占模式.
- acquireQueued(): 使线程阻塞在等待队列中获取资源, 一直获取到资源后才返回. 如果在整个等待过程中被中断过, 则返回true, 否则返回false.
- 如果线程在等待过程中被中断过, 它是不响应的. 只是获取资源后才再进行自我中断selfInterrupt(), 将中断补上
(响应前面说的, 整个等待过程忽略中断的影响)
.
1. tryAcquire()方法
此方法尝试去获取独占资源. 如果获取成功, 则直接返回true, 否则直接返回false. 这也正是tryLock()的语义, 还是那句话. 当然不仅仅只限于tryLock().
如下是tryAcquire()的源码
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里throw异常是留给我们进行实现的. AQS只是一个框架, 具体资源的获取和释放逻辑由我们自定义同步器去实现(就像ReentrantLock类)
. 需要自定义实现的方法都没有定义成abstract, 由我们根据同步器独占/共享自有选择.
2. addWaiter(Node)方法
private Node addWaiter(Node mode) {
// 以给定模式构造结点. mode有两种: EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试直接将节点放到sync队列尾部,
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果放入尾部失败, 调用enq()入队
enq(node);
return node;
}
3. enq(Node)方法
private Node enq(final Node node) {
//CAS"自旋", 直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空, 创建一个空的结点作为head结点, 并将tail也指向它, 这是一个初始化的动作
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程, 放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
CAS自旋volatile变量, 保证了可见性, 操作上又是原子方法. 这是一种很经典的用法
4. acquireQueued(Node, int)方法
当节点进入同步队列后, 接下来就是要等待获取锁(访问控制), 同一时刻只有一个线程在运行, 其他都要进入等待状态. 每个线程节点都是独立的, 他们进行自旋判断, 当发现前驱节点是头结点并且获取了状态(tryAcquire()自己实现原子性操作)
, 那这个线程就可以运行了.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否可以成功拿到状态
try {
boolean interrupted = false;//处理过程中是否被中断过
for (;;) {//自旋
final Node p = node.predecessor();//获取当前节点的前驱节点
//如果前驱节点是head, 当前节点就是排第二. 这个时候可以尝试去获取资源了(头结点可能释放完唤醒自己了)
if (p == head && tryAcquire(arg)) {
setHead(node);//设置头节点为当前节点
p.next = null; // help GC setHead()中node.prev已置为null, 此处再将head.next置为null. 方便gc回收head节点.
failed = false;//标记成功获取资源
return interrupted;
}
//不满足唤醒条件, 调用park()进入waiting状态, 等待unpark(). 如果等待的过程被中断, 线程会从park()中醒过来, 发现拿不到资源后继续进入park()中等待.
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;//如果线程被终端, 标记interrupted为true, 等待线程获取到资源后在中断
}
} finally {
if (failed)//如果等待过程中没有成功获取资源(不可控异常), 取消线程在队列的等待
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()方法如果发现前驱节点状态不是SIGNAL, 会标记前驱节点状态为SIGNAL(-1). 如果发现前驱节点放弃等待了就一直往前找节点, 直到找到正常等待的节点排队到它后面.
parkAndCheckInterrupt()使线程进入waiting状态, 如果发现被唤醒, 检查是不是被中断了并且清除状态.
acquire()方法总结
- 尝试直接插队获取资源, 如果不成功进入同步队列排队.
- 调用park()进入waiting状态, 等待前驱节点调用unpark()或者interrupt()唤醒自己.
interrupt()唤醒拿不到资源继续进入waiting状态
. - 被唤醒后尝试获取资源, 如果获取不到资源进入2流程, 获取到资源就执行后续代码
(如果等待过程被中断过此时会调用selfInterrupt()将中断补上)
.
public final boolean release(int arg)
该方法是独占模式下线程释放共享资源的入口.
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放资源, 自定义函数实现
Node h = head;
if (h != null && h.waitStatus != 0)//拿到头结点
unparkSuccessor(h);//唤醒等待队列中的下一个线程
return true;
}
return false;
}
1. tryRelease(arg)方法
需要我们实现的独占资源释放函数.
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2. unparkSuccessor(node) 方法
唤醒等待队列中的下一个线程
private void unparkSuccessor(Node node) {
//当前线程节点的状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//设置当前线程的节点状态为0, 因为已经释放资源
Node s = node.next; //找到下一个需要唤醒的节点
if (s == null || s.waitStatus > 0) {//下一个节点为空或者已经放弃等待就取消唤醒操作
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//从后往前找有效的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒有效节点
}
下一个有效的线程被唤醒后处在acquireQueued()的自旋流程中, 然后进入资源判断获取(if (p == head && tryAcquire(arg))).
public final void acquireShared(int arg)
此方法是共享模式下线程获取共享资源的顶层入口. 它会获取指定量的资源(state), 获取成功后直接返回, 获取失败进入等待队列, 直到获取到资源(整个过程忽略中断的影响
).参考ReentrantReadWriteLock设计.
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//改方法需要自定义同步器实现. 返回语义负数表示失败, 0或者大于零表示获取成功.
doAcquireShared(arg);//小于零进入等待队列, 获取资源后返回
}
1. doAcquireShared(arg)方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入队列的尾部, 模式为共享. addWaiter()方法参考上面介绍
boolean failed = true;//成功失败标识
try {
boolean interrupted = false;//是否中断标识
for (;;) {//CAS自旋
final Node p = node.predecessor();//获取前驱节点
if (p == head) {//前驱节点为头结点, 尝试获取资源(此处有可能是前驱节点唤醒了自己)
int r = tryAcquireShared(arg);//获取资源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//将head指向自己, 此时r>0, 还有剩余资源唤醒后续排队线程
p.next = null; // help GC
if (interrupted)// 中断标识
selfInterrupt();//补上中断
failed = false;
return;
}
}
//不满足唤醒条件, 调用park()进入waiting状态, 等待unpark(). 如果等待的过程被中断, 线程会从park()中醒过来, 发现拿不到资源后继续进入park()中等待.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2. setHeadAndPropagate(Node, int)方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);/head指向自己
//如果还有剩余量, 继续唤醒下一个排队的线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
acquireShared()方法总结
- tryAcquireShared()方法尝试获取资源, 成功直接返回, 如果不成功进入同步队列排队.
- 调用park()进入waiting状态, 等待前驱节点调用unpark()或者interrupt()唤醒自己.
- 被唤醒后尝试获取资源, 如果获取不到资源进入2流程, 获取到资源就执行后续代码.
其实同acquir()方法一样, 只不过该方法在自己拿到资源后回去唤醒后继线程
public final boolean releaseShared(int arg)
该方法是共享模式下线程释放共享资源的入口. 跟独占模式下的资源释放方法release()很相似, 不同的是独占模式一般是完全释放资源(state=0)后才允许去唤醒其他线程, 而共享模式往往不会这么控制, 具体实现要看自定义同步器的逻辑.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放资源, 该方法需要自定义共享同步器实现.
doReleaseShared();//唤醒后继节点
return true;
}
return false;
}
1. tryReleaseShared()方法
需要我们自己实现的共享资源释放方法.
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
2. doReleaseShared()方法
该方法是用来唤醒后继节点的.
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//唤醒后继节点
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) //head节点如果发生变化即退出自旋
break;
}
}
releaseShared()方法总结
- tryReleaseShared()方法进行共享资源的释放.
- doReleaseShared()方法用来唤醒后继节点.
以上是几个AQS常用的资源获取和释放的基本方法, 其实还有一些方法和上面分析的方法略有不同, 如下:
- 独占式获取资源
1. acquireInterruptibly(int arg): 类似于acquire()方法, 不同的地方是该方法响应外界对线程的中断信号, 并且抛出InterruptedException()异常.
2. tryAcquireNanos(int arg, long nanosTimeout) : 类似于acquire()方法, 同样响应中断抛出InterruptedException()异常, 并且该方法有获取超时时间.
- 共享式获取资源
1. acquireSharedInterruptibly(int arg): 类似acquireInterruptibly()方法的共享实现, 同样响应中断抛出InterruptedException()异常.
2. tryAcquireSharedNanos(int arg, long nanosTimeout)(): 类似tryAcquireNanos()方法的共享实现, 同样响应中断抛出InterruptedException()异常, 并且该方法有获取超时时间.
测试案例
1. ExclusiveLock(自定义独占锁)
ExclusiveLock是互斥的不可重入锁实现, 对锁资源State的操作只有0和1两个状态, 0代表未锁定,1代表锁定. 按照上面的分析, 我们需要实现AQS的tryAcquire()和tryRelease()方法.
public class ExclusiveLock implements Lock {
//自定义内部类同步器
private static class ExclusiveSync extends AbstractQueuedSynchronizer {
//判断是否是锁定状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//尝试获取资源, 如果成功直接返回. 获取成功返回true, 否则返回false.
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)){//状态变更必须为CAS原子操作, 保证原子性
setExclusiveOwnerThread(Thread.currentThread());//同样也是原子操作
return true;
}
return false;
}
//尝试释放资源
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0){
throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
//创建自定义同步器的实现
private final ExclusiveSync sync = new ExclusiveSync();
//获取资源, 同acquire()语义一样, 获取不到进入同步队列等待成功返回
@Override
public void lock() {
sync.acquire(1);
}
//判断锁是否被占有
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//获取资源, 立刻返回结果
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
//
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
//释放资源
@Override
public void unlock() {
sync.release(1);
}
}
1. ShareLock(自定义共享锁)
ShareLock为一个共享同步器的实现, 设计同一时刻可以有两个线程获取到资源, 超过两个进行同步队列阻塞. 按照上面的分析, 我们实现AQS的tryAcquireShared()和tryReleaseShared()方法.
public class ShareLock implements Lock {
public static class ShareSync extends AbstractQueuedSynchronizer{
//定义同步器的初始状态为2
ShareSync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
@Override
protected int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current + reduceCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
private final ShareSync sync = new ShareSync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return null;
}
}