并发包同步器的核心AQS-深入-条件队列
2021-04-06 本文已影响0人
于情于你
ConditionObject主要是为并发编程中的同步提供了等待通知的实现方式,可以在不满足某个条件的时候挂起线程等待(await)。直到满足某个条件的时候在唤醒线程(signal/signalAll)。
结构
API层
将当前线程的锁释放,并加入条件等待队列
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 用当前线程创建一个新节点,并加入等待队列
Node node = addConditionWaiter();
// 释放锁,并唤醒后面节点的线程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当前节点不在同步队列的时候挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 线程被唤醒,是否被中断了,如果没中断了退出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 尝试获取锁,获取锁成功interruptMode改为REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理状态是取消的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根据interruptMode响应
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
REINTERRUPT,退出等待时再次中断。THROW_IE,退出等待时抛出InterruptedException
将等待时间最长的线程,从条件等待队列,加入等待锁的同步队列
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
内部方法调用层
往等待队列中添加节点
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果最后一个节点是取消状态,则清空等待队列中所有的被取消的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建一个节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 队列是空,则把当前节点当作firstWaiter,否则加到最后一个节点后面
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
节点加入等待队列
等待队列转移到同步队列
private void doSignal(Node first) {
// 从指定节点开始把节点转移到同步队列并删除节点,直到有一个节点转移成功或者等待队列空了
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 把节点的waitStatus更新成0,如果更新失败则退出
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将节点加入到同步队列,如果节点被取消操作了,或者更新节点状态失败,那么就唤醒当前节点的线程
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
image.png