AQS 之 Condition-的源码分析
2019-03-06 本文已影响0人
断风雨_2669
在使用 Lock 锁的过程中,我们往往会使用到另外一个对象 Condition ,用于等待/通知模式的处理。
Condition 的创建
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
使用 Condition 的前提是获取锁
final ConditionObject newCondition() {
return new ConditionObject();
}
从 newCondition 方法看出 Condition 对象实际上是 AQS 的内部类 ConditionObject ()。
成员变量
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
从内部定义的变量 firstWaiter, lastWaiter 看出, ConditionObject 对象内部维护了一个同样以 Node 为节点的等待队列。
await()
await 操作会使当前线程释放锁并进入等待模式。
public final void await() throws InterruptedException {
if (Thread.interrupted())
// 当前线程中断 抛出中断异常
throw new InterruptedException();
// 将当前线程构造节点插入等待队列尾部
Node node = addConditionWaiter();
// 当前线程释放锁,唤醒同步队列 head 的后置节点
int savedState = fullyRelease(node);
int interruptMode = 0;
// 节点添加到同步队列后 退出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 应该是在其他线程释放锁后被唤醒
// 检查当前线程是否中断,若未中断则返回 0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// node 进入自旋过程尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 移除等待队列中状态非 CONDITION 的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程构造节点并设置状态为 CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 等待队列为空的时候将 firstWaiter 指向 node
firstWaiter = node;
else
// 等待队列非空时将 lastWaiter 尾节点的 nextWaiter 指向 node
t.nextWaiter = node;
// 移动尾节点
lastWaiter = node;
return node;
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 当前线程释放锁,并唤醒同步队列中 head 的后置节点
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 判断节点是否在同步队列上
final boolean isOnSyncQueue(Node node) {
// 节点状态为 CONDITION 或 节点的前置为空 说明节点还在等待队列上
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果节点存在后置节点 next 则说明节点在同步队列上
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 从 tail 尾节点开始遍历同步队列查找 node 节点;若存在返回 true,反之返回 false
return findNodeFromTail(node);
}
await 操作流程如下 :
- 将当前线程构造一个新的 node 节点,状态为 CONDITION 添加到等待队列尾部
- 释放锁,唤醒同步队列 head 的后置节点
- 判断当前 node 节点是否在同步队列中,若不在同步队列上则挂起当前线程,等待其他线程释放锁时被唤醒
- 节点 node 被唤醒后若在同步队列上,则进入自旋过程再次尝试获取锁
signal()
signal 操作激活等待队列中节点
public final void signal() {
// 判断当前线程是否为锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 判断 first 的后置节点是否为空,为空说明等待队列为空
if ( (firstWaiter = first.nextWaiter) == null)
// 等待队列的尾节点置为空
lastWaiter = null;
// 将 first 的后置节点置为空,也即是将 first 节点从等待队列中移除
first.nextWaiter = null;
// 执行信号转移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将节点从等待队列 (condition queue) 转移到 同步队列 (sync queue)
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 将节点状态设置为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将节点添加到同步队列(sync queue)尾部, 此时 p 应该是 node 的前置节点 ws 为 0
Node p = enq(node);
//
int ws = p.waitStatus;
// 将 node 的前置节点状态改为 SIGNAL; 便于节点 p 释放锁的时候唤醒 node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal 操作的流程如下:
- 将等待队列中的节点从队列中移除
- 将等待队列中的节点状态由 CONDITION 改为 0
- 将等待队列中的节点添加到 AQS 的同步队列尾部
signal 的作用 只是将节点从等待队列转移到同步队列中,只有当前线程释放锁后,转移到同步队列的节点才会有机会获取到锁。
如下图所示为 Condition 操作节点的转移过程:
image小结
从 Condition 的 await()、signal() 操作可以看出,其作用等效于 Object 对象的 await(), notify() 方法;