多线程篇七(Condition等待/唤醒机制及死磕源码)
前言
前面我们一直在研究Lock的使用以及Lock相关的源码,但是我们有没有发现一个问题呢?线程进入synchronized关键字修饰的同步方法或者同步代码块,我们可以通过锁对象来调用Object的wait和notify或者notifyAll方法实现等待/通知机制,因为当前线程获取到了锁对象的monitor监听器对象。
那么如果我们使用Lock锁,又如何的实现等待通知机制呢?通过这个问题我们来看下一个更加强大的通过Lock和Condition实现的等待/通知机制。
正文
一、Condition简介
1、Condition接口已知实现类
在jdk中Condition接口有两个实现类
(1)、AbstractQueuedLongSynchronizer.ConditionOnject
(2)、AbstractQueuedSynchronizer.ConditionObject
AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer都是抽象类,ConditionObject是他们的一个内部类,由于这些抽象类没有提供获取ConditionObject对象的方法,要想使用就需要在子类中的成员属性或成员方法中去new一个ConditionObject实例。
在JUC源码中是通过在ReentrantLock或者ReentrantReadWriteLock中的同步器类Sync(即即AbstractQueuedSynchronizer的子类)中提供了一个newCondition方法来获取Condtion实例的。然后在持有Sync对象的类中通过sync.newCondition()来获取Condition实例。
public Condition newCondition() {
return sync.newCondition();
}
2、Condition接口中方法的介绍
Condition接口中提供了一组以await开头的方法功能类似于object.wait(),以及一组以signal开头的方法功能了类似于object.notify/notifyAll
// 使当前线程处于等待状态,直到被指定的Condition对象唤醒或线程中断
void await() throws InterruptedException;
// 使当前线程处于等待状态,直到被指定的Condition对象唤醒不响应中断
void awaitUninterruptibly();
// 使当前线程处于等待状态,直到被指定的Condition对象唤醒或线程中断又或者等待时间到
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 使当前线程处于等待状态,直到被指定的Condition对象唤醒或线程中断又或者超时
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 使当前线程处于等待状态,直到被指定的Condition对象唤醒或线程中断又或者等待的时间到达指定的时间
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒在指定Condition对象上处于等待的一个线程
void signal();
// 唤醒在指定Condition对象上处于等待的所有线程
void signalAll();
二、Condition的基本使用
1、Condtion的常用方法及使用范式
public class ConditionTemplate {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void waitC() throws InterruptedException {
// 获取锁
lock.lock();
try {
// 使当前线程处于等待
condition.await();
}finally {
// 释放锁
lock.unlock();
}
}
public void notifyC() {
// 获取锁
lock.lock();
try {
// 唤醒在指定Condition对象上等待的一个线程
condition.signal();
//condition.signalAll(); 尽量少用
}finally {
// 释放锁
lock.unlock();
}
}
}
在这边啰嗦几句吧,使用Lock和Condition的时候需要注意以下几点
(1)、调用await方法时当前线程必须获取锁。否则在通过该方法释放锁的时候会抛出异常
(2)、调用signal方法时当前线程必须获取锁,否则也会抛出异常
(3)、建议使用signal而少用signalAll
这几个注意点在下面的源码分析中都会讲到,有兴趣的小伙伴可以继续往下观看
2、案例演示-基于Lock和Condition实现一个安全的有界阻塞队列
这里我就不拿官方文档上面的案例来演示了,而是使用自定义一个安全的有界阻塞队列来引出源码的解读同时也是为了方便后面我们更好的阅读并发容器类
说到有界阻塞队列,我们首先想到的就是生产者消费者模式,一类线程往队列里面放入元素,一类线程从队列里面取出元素,如果队列满了则往队列中放入元素的线程就需要等待了直到有线程从队列中取出元素方可被唤醒。队列为空时那么从队列中取出元素的线程就需要等待了直到有线程往队列里面放入元素线程方可被唤醒。
接下来我们来看一看代码实现
/**
* 基于Lock和Condition实现有界阻塞队列
*
* @Author YUBIN
*/
public class MyBlockQueue<T> {
// 定义一个队列容器
private List<T> queue;
// 容器的最大容量
private int maxSize;
// 定义一个可重入锁
private Lock lock = new ReentrantLock();
// 当队列满时的条件等待
private Condition fullCondition = lock.newCondition();
// 当队列为空时的条件等待
private Condition emptyCondition = lock.newCondition();
public MyBlockQueue(int maxSize) {
this.maxSize = maxSize;
queue = new LinkedList<>();
}
/**
* 往队列中放入元素
* @param element
*/
public void put(T element) throws InterruptedException {
// 获取锁
lock.lock();
try {
while (queue.size() == maxSize) {
System.out.println(Thread.currentThread().getName() + "队列已满需等待!");
fullCondition.await();
System.out.println(Thread.currentThread().getName() + "被唤醒了!");
}
System.out.println(Thread.currentThread().getName()+" push " + element);
queue.add(element);
emptyCondition.signal();
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 从队列中获取元素
* @return
*/
public T take() throws InterruptedException {
// 获取锁
lock.lock();
try {
while (queue.size() == 0) {
System.out.println(Thread.currentThread().getName() + "队列是空的需等待!");
emptyCondition.await();
System.out.println(Thread.currentThread().getName() + "被唤醒了!");
}
// 将第一个元素从队列中移除
T element = queue.remove(0);
fullCondition.signal();
System.out.println(Thread.currentThread().getName() + " Pop " + element);
return element;
} finally {
// 释放锁
lock.unlock();
}
}
}
/**
* 自定义有界阻塞式队列测试类
*
* @Author YUBIN
*/
public class MyBlockQueueTest {
public static void main(String[] args) {
MyBlockQueue<Integer> queue = new MyBlockQueue<>(10);
Thread pushThread = new PushThread(queue);
pushThread.setName("Push_Thread");
Thread popThread = new PopThread(queue);
popThread.setName("Pop_Thread");
pushThread.start();
popThread.start();
}
/**
* 往队列中存入数据的线程
*/
private static class PushThread extends Thread {
private MyBlockQueue queue;
public PushThread(MyBlockQueue queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
try {
queue.put(i);
SleepUtils.second(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 从队列中去除数据的线程
*/
private static class PopThread extends Thread {
private MyBlockQueue<Integer> queue;
public PopThread(MyBlockQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Integer value = queue.take();
SleepUtils.second(80);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
上面这段代码还是比较简单的根据注释相信大家都是可以理解的,接下来我们来看一看调用await方法和signal方法我们的Doug Lee大师都做了什么呢?
三、Condition的等待/通知源码解读-ReentrantLock
1、await()这个方法主要实现的功能是将当前线程挂到指定Condition对象的等待队列中去使其处于等待状态,并释放当前线程所持有的锁,当检测到线程中断或者收到signal通知的时候被唤醒
public final void await() throws InterruptedException {
// 判断当前线程是否已被中断
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程封装成Node节点,并将其挂在等待队列的尾部
Node node = addConditionWaiter();
// 释放当前线程所占有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前节点是否在同步队列中
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
如上面源码所示执行步骤:
(1)、判断当前线程是否被中断,已中断则抛出中断异常,反之执行步骤(2)
(2)、调用addConditionWaiter方法将当前线程封装成Node对象并将其挂在Condition等待队列的尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
addConditionWaiter方法流程:
如果是第一次挂的话,Condition还没有被初始化firstWaiter和lastWaiter都是null,如下图①

因此代码会直接从Node node = ......这行开始真正执行
这种形式执行完Condition等待就会变成图②这种形式

当加入第二个第三个或者更多的代码的时候他就会执行第一个if语句了,假如我在加到第四个等待节点的时候发现第三个节点的waitStatus!=-2时,此时就会执行unlinkCancelledWaiters方法来将第三个节点踢出等待队列,并且将lastWaiter设置为第二个节点,如下图所示

private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
(3)、释放当前线程所持有的锁fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
如果释放锁失败获取在调用await方法的时候没有占有锁在调用tryRelease方法就会抛出异常,此时Node的waitStatus就会被设置为1
(4)、判断当前节点是否存在于同步队列,如果不存在则阻塞当前线程,直到有线程将其唤醒,唤醒之后就会执行await方法后面的逻辑。
await源码执行逻辑图如下:

2、signal()方法的功能是将制定的Condition等待队列中的第一个节点以自旋CAS的形式挂到同步队列的尾部
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal()方法执行的步骤
(1)、判断当前调用唤醒方法的线程是否获取锁,没有则抛出异常,反之执行步骤(2)。
(2)、判断当前Condition对象的等待队列中的firstWaiter第一个等待节点A是否为空,为空则结束当前方法,反之调用步骤(3)doSignal(Node first)方法。
(3)、doSignal方法主要做了:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
(3.1)、将当前Conditiion对象对应的等待队列中的首节点设置为节点A的nextWaiter,如果节点A的nextWaiter为空则将等待队列的lastWaiter也设为null
(3.2)、将原firstWaiter节点A的nextWaiter设为null,帮助GC。
(3.3)、判断将等待节点A挂到同步队列的尾部,成功返回true,失败返回false
这个判断主要分为下面这四步:
(3.3.1)、以CAS的形式将节点A的waitStatus的值从-2改为0失败的话则返回false,反之执行步骤2
(3.3.2)、以循环CAS的形式将节点A挂到同步队列的尾部,同时返回同步队列中节点A的prev节点B
(3.3.3)、判断如果节点B的waitStatus>0获取以CAS的形式将节点B的waitStatus设为-1则必须唤醒当前节点A(这一步中将waitStatus设为-1是为了让线程调用了unlock的时候可以唤醒该节点)
(3.3.4)、返回true
如果在3.3这一步返回了false则再次执行3.1
下图为源码的执行流程图

总结
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
一个Condition包含一个等待队列,新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。