Java并发之Condition的实现分析
一、Condition的概念
介绍
回忆 synchronized 关键字,它配合 Object 的 wait()、notify() 系列方法可以实现等待/通知模式。
对于 Lock,通过 Condition 也可以实现等待/通知模式。
Condition 是一个接口。
Condition 接口的实现类是 Lock(AQS)中的 ConditionObject。
Lock 接口中有个 newCondition() 方法,通过这个方法可以获得 Condition 对象(其实就是 ConditionObject)。
因此,通过 Lock 对象可以获得 Condition 对象。
Lock lock = newReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
二、Condition的实现分析
实现
ConditionObject 类是 AQS 的内部类,实现了 Condition 接口。
publicclassConditionObject implementsCondition, java.io.Serializable {
privatetransientNode firstWaiter;
privatetransientNode lastWaiter;
...
可以看到,等待队列和同步队列一样,使用的都是同步器 AQS 中的节点类 Node。
同样拥有首节点和尾节点,
每个 Condition 对象都包含着一个 FIFO 队列。
结构图:
等待
调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。
publicfinalvoidawait() throwsInterruptedException {
if(Thread.interrupted())
thrownewInterruptedException();
Node node = addConditionWaiter();
//释放同步状态(锁)
intsavedState = fullyRelease(node);
intinterruptMode = 0;
//判断节点是否放入同步对列
while(!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//如果已经中断了,则退出
if((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if(node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
分析上述方法的大概过程:
将当前线程创建为节点,加入等待队列;
释放锁,唤醒同步队列中的后继节点;
while循环判断节点是否放入同步队列:
没有放入,则阻塞,继续 while 循环(如果已经中断了,则退出)
放入,则退出 while 循环,执行后面的判断
退出 while 说明节点已经在同步队列中,调用 acquireQueued() 方法加入同步状态竞争。
竞争到锁后从 await() 方法返回,即退出该方法。
addConditionWaiter() 方法:
privateNode addConditionWaiter() {
Node t = lastWaiter;
if(t != null&& t.waitStatus != Node.CONDITION) {
//清除条件队列中所有状态不为Condition的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//将该线程创建节点,放入等待队列
Node node = newNode(Thread.currentThread(), Node.CONDITION);
if(t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
returnnode;
}
过程分析:同步队列的首节点移动到等待队列。加入尾节点之前会清除所有状态不为 Condition 的节点。
通知
调用 Condition 的 signal() 方法,可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。
publicfinalvoidsignal() {
//判断是否获取了锁
if(!isHeldExclusively())
thrownewIllegalMonitorStateException();
Node first = firstWaiter;
if(first != null)
doSignal(first);
}
过程:
先判断当前线程是否获取了锁;
然后对首节点调用 doSignal() 方法。
privatevoiddoSignal(Node first) {
do{
if( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(!transferForSignal(first) &&
(first = firstWaiter) != null);
}
过程:
修改首节点;
调用 transferForSignal() 方法将节点移动到同步队列。
finalbooleantransferForSignal(Node node) {
//将节点状态变为0
if(!compareAndSetWaitStatus(node, Node.CONDITION, 0))
returnfalse;
//将该节点加入同步队列
Node p = enq(node);
intws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if(ws > 0|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
returntrue;
}
调用同步器的 enq 方法,将节点移动到同步队列,
满足条件后使用 LockSupport 唤醒该线程。
当 Condition 调用 signalAll() 方法:
publicfinalvoidsignalAll() {
if(!isHeldExclusively())
thrownewIllegalMonitorStateException();
Node first = firstWaiter;
if(first != null)
doSignalAll(first);
}
privatevoiddoSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do{
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while(first != null);
}
可以看到 doSignalAll() 方法使用了 do-while 循环来唤醒每一个等待队列中的节点,直到 first 为 null 时,停止循环。
一句话总结 signalAll() 的作用:将等待队列中的全部节点移动到同步队列中,并唤醒每个节点的线程。
总结
整个过程可以分为三步:
第一步:一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到等待队列中,并释放锁。然后就在 await() 中的一个 while 循环中判断节点是否已经在同步队列,是则尝试获取锁,否则一直阻塞。
第二步:当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法将节点移动到同步队列,并唤醒节点中的线程。
第三步:被唤醒的线程,将从 await() 中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。竞争成功则退出 await() 方法,继续执行。
推荐一个交流学习群:697579751 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多