Java 多线程(五)- 理解 Condition 和 条件变量
在关于 ReentrantLock 的文章中,提到 Lock 接口作为内置 Monitor 锁的补充,提供了更灵活的接口,其中 lock / unlock 对于内置锁的 synchronized,那么内置锁的 监控条件 对应 Lock 的什么呢?就是 newCondition 返回的 Condition。Condition 和 内置锁的监控条件都被叫做 条件变量。
条件变量
作用
条件变量最主要的作用是用来管理线程执行对某些状态的依赖性。想象一下:一个线程是某个队列的消费者,它必须要等到队列中有数据时才能执行,如果队列为空,则会一直等待挂起,直到另外一个线程在队列中存入数据,并通知先前挂起的线程,该线程才会唤醒重新开始执行。
这个例子中,队列是否空/满 是线程执行所依赖的状态,而这个状态是多个线程需要访问的,所以需要加锁互斥访问,这种加锁模式与其他同步加锁略有不同,锁在操作的执行过程中需要被释放与重新获取的。管理依赖共享变量的线程执行通常用如下的编程模式:
获取锁;
while (条件状态不满足) {
释放锁;
线程挂起等待,直到条件满足通知;
重新获取锁;
}
临界区操作;
释放锁;
条件变量为了管理这种依赖性,需要做两件事情:
- 提供 await / wait 接口,挂起当前线程,并将线程放入条件队列 管理,同时释放锁;
- 提供 signal / notify 接口,唤醒等待的线程,重新抢锁运行;
在编程模式里为什么要使用 while 而不是 if,已经在之前的 Monitor 内置锁中有所阐述。
条件队列
条件队列来源于:它使得一组线程(等待线程集合)能够通过某种方式来等待特条件变成真。传统队列的元素是一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。
内置锁中的条件队列
前面的文章说过,每个 Java 对象都是一个 Monitor Object 模式的对象,可以当作一个 Monitor 锁。每个对象同样可以作为一个条件队列,提供了 wait / notify / notifyAll 方法构成内部条件队列的 API。
Object.wait 会自动释放内置锁,并请求操作系统挂起当前线程,从而使其他线程能够获得内置锁,修改依赖的对象状态。当挂起的线程醒来时,它将在返回之前重新获取锁。
使用 wait / notify 组合接口管理状态依赖性比“轮询和休眠”更加简单和高效。
轮询是指在 while 循环里不断检查条件状态,如果条件状态满足,则进行以下处理,这会浪费很多 CPU 时钟进行判断。
休眠是指在 while 循环里检查条件状态,如果状态不满足,则 sleep 一段时间,线程醒来后则再次判断。它比轮询节约 CPU 时间片,但比条件变量低效,而且 sleep 的时间间隔难以把握,会依赖状态改变后也不会立即醒来,响应性也比条件队列差。
但是在功能实现上,这几种方式并没有差别,也就是说:
如果某个功能无法通过“轮询和休眠”来实现,那么条件队列也无法实现
条件谓词
要想正确使用条件队列,关键是找出对象在哪个条件谓词上等待。条件谓词并不依赖于条件变量的接口,它是使某个操作称为状态依赖操作的前提条件。如下图的代码块:
图1 使用内置锁条件变量其中2处的 isFull 函数就是一个条件谓词,表示“队列已满”时,需要等待。
三元关系
在条件等待中存在一种重要的三元关系,包括加锁,wait 方法和一个条件谓词。
在条件谓词中包含一个或多个线程共享的状态变量,需要一个锁来保护。因此在测试条件谓词之前必须要先持有锁。锁对象与条件队列对象必须是同一个对象,他们之间的三元关系如下:
每一次 wait 调用都会隐式地与特定的条件谓词关联起来;
当调用某个特定条件谓词的 wait 时,调用着必须已经持有与条件队列相关的锁;
并且这个锁必须保护着构成条件谓词的状态变量。
内置 Monitor 条件变量缺陷
过早唤醒
虽然锁 / 条件谓词 / 条件队列之间的三元关系不是很复杂,但 wait 方法的返回并不一定意味着线程正在等待的条件谓词已经成真。考虑图 2 的阻塞队列代码段:
图2 某阻塞队列代码片段假设有 A,B 两条线程阻塞在 put 函数,C 线程调用 take,获取并推出队列中一个数据,同时调用 notifyAll 唤醒 A,B 线程;若 A 线程获取内置锁,B 阻塞在锁获取中,A 又向队列压入一个数据,此时队列又满了;A 释放锁后,B 获取锁,但是队列已满,条件谓词判断失败,再次 wait 阻塞。
信号丢失
这里的信号丢失是指:线程正在等待一个已经(或者本应该)发生过的唤醒信号。错误的编程模式通常会造成信号丢失。考虑图 3 的阻塞队列代码段:
图2 某阻塞队列代码片段假设有 A 线程阻塞在 put 函数,B 线程阻塞在 take 函数,C 线程调用 take,然后使用 notify 接口唤醒其中一个线程;不巧的是 B 线程被唤醒,B 检查队列仍然为空,继续等待阻塞,此时应该被唤醒的 A 只能等待下一个唤醒。
Condition
Condition VS Monitor 条件变量
分析内置 Monitor 条件变量的过早唤醒和信号丢失,它们其实有一个共同的原因:多个线程在同一个条件队列上等待不同的条件谓词。如果想编写一个带有多个条件谓词的并发对象,或者想除了条件队列可见性意外的更多控制权,就可以使用显示的 Lock 和 Condition 而不是内置锁和条件队列。
与内置条件队列不同,对于每一个 Lock,可以有任意数量的 Condition 对象,因此对于不同的条件谓词,对于同一个锁,可以用不同的 Condition 对象来控制。
同时类似于 Lock 和内置锁的差异,Condition 也提供了丰富的接口等待挂起(可轮询,可中断,可超时等)。接口如下所示:
// wait 接口
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
// notify
void signal();
void signalAll();
Condition 对象会继承相关的 Lock 公平性,对于公平的锁,线程会依照 FIFO 顺序从 await 中释放。
特别注意:在 Condition 对象中,与 wait,notify, notifyAll 方法对应的分别是 await,signal 和 signalAll。但是实现 Condition 的类必然继承自 Object,因为它也包含了 wait 和 notify 方法。所以使用时一定要确保正确的版本。
分析代码
深究下,Condition 是如何管理队列的,它为什么会继承 Lock 的公平性,Condition 是如何阻塞拥有锁的线程。介绍完 Condition 后,可能会冒出更多的问题,为了学习 Condition,不妨以 AQS 的 ConditionObject 作为代码分析对象理解理解。
Node 队列节点
ConditionObject 复用了和 AQS 的队列节点 Node(具体可查看上篇文章),不同的:
- waitStatus 值为 CONDITION(-2), 表示该节点在条件队列上。
- nextWaiter 指向条件队列的下一个节点。
同时在 ConditionObject 内保存有队列的首尾指针:
- firstWaiter,指向队列的第一个Node
- lastWaiter,指向队列最后一个Node
下文为了区分两个不同的队列,使用以下名词:
- 同步队列:AQS 中的锁等待队列
- 条件队列:ConditionObject 中的条件队列
await
简单起见,我们分析方法 awaitUninterruptibly,代码片段如下图所示
图4 awaitUninterruptibly- 1972行,addConditionWaiter 方法会在 ConditionObject 队列尾部插入一个代表当前线程的 Node,状态为 CONDITION;
- 1973行,因为要调用 await 接口之前一定已经获得锁,所以当前线程在同步队列中一定是首节点,AQS.fullyRelease 释放当前锁,恢复同步队列后续节点执行,返回当前的许可状态用于重新申请锁;
- 1975行,AQS.OnSyncQueue 用来判断当前线程节点是否在同步队列中。为了防止被误唤醒,此处采用 while 进行轮询判断;
- 1976行,使用 LockSupport.park 挂起当前线程;
- 1980行,被其他线程唤醒后,调用 AQS.acquireQueued 重新尝试获取锁,如果获取失败则被加入同步队列,AQS.acquireQueued 会调用 AQS.tryAcquire 获取准入许可,所以 ConditonObject 继承了 AQS 的公平性。
signal
ConditionObject 的 signal 方法比较简单,主要代码被封装在 doSignal,该方法如下图所示:
图5 doSignal- 1874行,为转移线程节点做准备,将 nextWaiter 设置为 null,在同步队列,该字段无用,设置为 null 后,利于以后垃圾回收;
- 1875行,关键是 transferForSignal,它主要干以下这些事:
- 使用 CAS 设置节点状态为 0;
- 调用 AQS.enq 将 node 重新压入同步队列;
- 修改 node 的前继节点状态为 SIGNAL;
- 如果前继节点已经取消等待,恢复该 node 代表的线程
编程实践
以下代码是结合 Lock 和 Condition 实现容量为100的阻塞线程:
class BoundedBuffer<V> {
final Lock lock = new ReentrantLock();//锁对象
final Condition notFull = lock.newCondition();//写线程条件变量
final Condition notEmpty = lock.newCondition();//读线程条件变量
final LinkedList<V> items = new LinkedList<V>();
final int totalCount = 100;
public void put(V x) throws InterruptedException {
lock.lock();
try {
while (totalCount >= items.size())//如果队列满了
notFull.await();//阻塞写线程
items.addLast(x);
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
}
public V take() throws InterruptedException {
lock.lock();
try {
while (items.size() == 0)//如果队列为空
notEmpty.await();//阻塞读线程
V x = items.removeFirst();
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}
代码中 notFull 代表了写线程条件变量,notEmpty 代表了读线程条件变量,在 put 的时候写入数据,signal 只会唤醒等待在 notEmpty 的线程;对应的 take 取出数据后,唤醒的也只会是等待在 notFull 的线程。
Condition 比内置锁的条件队列做的更加细致,能够很好的解决过早唤醒和信号丢失的问题。
内容来源
Java 并发编程实战