java多线程(核心篇)第五章
第五章 线程间协作
5.1 等待与通知:wait/notify
-
受保护方法三要素:保护条件、暂停当前线程和目标动作。
image.png
-
等待线程在其被唤醒、继续运行到其再次持有相应对象的内部锁的这段时间内,由于其他线程可能抢先获得相应的内部锁并更新了相关共享变量而导致该线程所需的保护条件又再次不成立。因此Object.wait()调用应该放在循环语句之中,以确保目标动作只有在保护条件成立的情况下才能执行成功。所以,对保护条件的判断以及Object.wait()调用应该放在循环语句之中,以确保目标动作只有在保护条件成立的情况下才能够执行。
-
等待线程对保护条件的判断以及目标动作的执行必须是个原子操作,否则可能产生竟态——目标动作被执行前的那一刻其他线程对共享变量的更新又使得保护条件重新不成立。因此,目标动作的执行必须和保护条件的判断以及Object.wait()调用放在同一个对象所引导的临界区中。
-
通知方法两要素:更新共享变量、唤醒其他线程。
image.png
-
为了使等待线程在其被唤醒之后能够尽快再次获得相应的内部锁,我们要尽可能地将Object.notify()调用放在靠近临界区结束的地方。
-
等待线程和通知线程是同步在同一对象之上的两种线程。
-
Object.wait(long)允许我们指定一个超时时间(单位毫秒)。如果被暂停的等待线程在这个时间内没有被其他线程唤醒,那么Java虚拟机会自动唤醒该线程。
-
使用Object.notify()替代Object.notifyAll()。Object.notify()调用不会导致过早唤醒,因此减少了相应的上下文切换开销。(过早唤醒的线程任然需要继续等待,即再次经历被暂停和唤醒的过程)。
-
Object.notify()可能导致信号丢失这样的正确性问题,而Object.notifyAll()虽然效率不太高,但是其正确性方面有保障。Object.notify()只有在下列条件全部满足的情况下才能够用于替代notify方法。
- 一次通知仅需要唤醒至多一个线程。
- 相应对象的等待集中仅包含同质等待线程。同质等待线程指折现线程使用同一个保护条件,并且这些线程在Object.wait()调用返回之后的处理逻辑一致。最为典型的同质线程是使用同一个Runnable接口实例创建的不同线程(实例)或者从同一个Thread子类的new出来的多个实例。
5.2 Java条件变量
- Condition接口可作为wait/notify的替代品来实现等待/通知,它为解决过早唤醒问题提供了支持。Condition接口定义的await方法、signal方法和signalAll方法分别相当于Object.wait()、Object.notify()和Object.notifyAll()。
-
Condition实例也被称为条件变量或者条件队列,每个Condition实例内部都维护了一个用于存储等待线程的队列。
image.png - Condition.awaitUtil(Date deadline)可以用于实现带超时时间限制的等待,并且该方法的返回值能够区分该方法调用是由于等待超时而返回还是由于其他线程执行了相应条件变量的signal/signalAll方法而返回。(Object.wait无法区分)
5.3 倒计时协调器:CountDownLatch
-
CountDownLatch可以用来实现一个(或者多个)线程等待其他线程完成一组特定的操作之后才继续运作。(Thread.join()实现的是一个线程等待另外一个线程结束)。
-
CountDownLatch.countDown()每被执行一次就会使相应实例的计数器值减少1。
-
CountDownLatch.await()相当于一个受保护方法,其保护条件为”计数器值为0“(代表所有先决操作已执行完毕),目标操作时一个空操作。因此,当计数器值不为0时CountDownLatch.await()的执行线程会被暂停。
-
当计数器的值达到0之后,该计数器的值就不再发生变化。此时,调用CountDownLatch.countDown()并不会导致异常的抛出,并且后续执行CountDownLatch.await()的线程也不会被暂停。因此,CountDownLatch的使用是一次性的:一个CountDownLatch实例只能够实现一次等待和唤醒。
-
CountDownLatch内部封装了对"全部先决操作已执行完毕"这个保护条件 的等待与通知的逻辑,因此客户端代码在使用CountDownLatch实现等待/通知的时候调用await、countDown方法都无须加锁。
-
new CountDownLatch(count)。count指需要被执行次数。
-
CountDownLatch.await(long TimeUnit)允许指定一个超时时间,在该时间内如果相应CountDownLatch实例的计数器值仍然未到达0,那么所有执行该实例的await方法的线程都会被唤醒。
5.4 栅栏
-
有时候多个线程可能需要相互等待对方执行到代码中的某个地方(集合点),这时这些线程才能够继续执行。(例子:大家相约爬山,所有人到齐之后,才能上山)。
-
使用CyclicBarrier实现等待的线程被称为参与方。参与方只需要执行CyclicBarrier.await()就可以实现等待。最后一个线程执行CyclicBarrier.await()会使得使用相应CyclicBarrier实例的其他所有参与方被线程唤醒,而最后一个线程自身并不会被暂停。
-
与 CountDownLatch不同的是,CyclicBarrier实例是可重复使用的;所有参与方被唤醒的时候,任何线程再次执行CyclicBarrier.await()又会被暂停,直到这些线程中的最后一个线程执行了CyclicBarrier.await()。
5.5 生产者——消费者模式
-
由于线程之间无法像函数调用那样通过参数直接传递数据,因此生产者和消费者之间需要一个用于传递产品的传输通道。传输通道相当于生产者和消费者之间需要一个用于传递产品的传输通道(Channel)。
image.png
nextBatch方法的执行线程相当于消费者线程;publish方法的执行线程相当于生产者线程;ArrayBlockingQueue实例channel相当于传输通道。
image.png - 从传输通道中存入一个产品或者取出一个产品时,相应的线程可能因为传输通道中没有产品或者其存储空间已满而被阻塞(暂停)。常见的阻塞方法/操作包括InputStrea.read()、ReentrantLock.lock()、申请内部锁等。
- BlockQueue线程安全。
- 流量控制与信号量(Semaphore)
- PipedOutputStream和PipedInputStream可以用来实现(线程间的直接输出和输入。所谓“直接”是指从应用代码的角度来看,一个线程的输出可作为另外一个线程的输入,而不必借用文件、数据库、网络连接等其他数据交换中介。使用时注意:
- PipedOutputStream和PipedInputStream适合两个线程间使用,即蛇和于单生产者——单消费者的情形。
- 生产者线程发生异常而导致其无法继续提供新的数据时,生产者线程必须主动提前关闭相应的PipedOutputStream实例来实现这种“知会”。
- Exchanger类也可作为传输通道,它对双缓冲技术提供了支持:生产者与消费者各自维护一个缓冲区,双方通过执行Exchanger.exchange(V)来交换各自持有的缓冲区。当消费者在“消费”一个已填充完毕的缓冲区时,生产者可以对待填充区进行填充,从而实现并发。
5.6 线程中断机制
- 中断可被看作由一个线程发送给另外一个线程的一种指示,该指示用于表示发起线程希望目标线程停止其正在执行的操作。(interrupt方法调用会中断线程,线程状态会被置为”中断”标志,但是中断线程并不会造成停止线程的执行,只是仅仅会把线程状态置为”中断”.当线程在执行阻塞操作时(比如调用sleep/wait/yield/join方法时)调用了interrupt()会抛出InterruptException异常并且将该线程的”中断”标志位清空,会将线程的中断标志重新设置为false.)
- Java平台会为每个线程维护一个被称为中断标记的布尔型状态变量用于表示相应线程是否接受到了中断,true表示接受到了中断。
- 通过Thread.currentThread().isInterrupted()调用来获取该线程的中断标记值,也可以通过Thread.interrupted()来获取并重置(也称清空)中断标记值,即Thread.interrupted()会返回当前线程的中断标记值并将当前线程中断标志重置为false。调用一个线程的interrupt()相当于将该线程(目标线程)的中断标记置为true。
- 能够响应中断的方法通常时在执行阻塞操作前判断中断标志,若中断标志值为true则抛出InterruptedException。依照惯例,抛出InterruptedException异常的方法,通常会在其抛出该异常时将当前线程的线程中断标记重置为false。
- 如果发起线程给目标线程发送中断的那一刻,目标线程已经由于执行了一些阻塞方法而被暂停,那么此时Java虚拟机可能会设置目标线程的线程中断标记并将该线程唤醒,从而再次得到响应中断的机会。
5.7 线程停止。
- 以下情况会要求线程主动停止
- 服务或者系统关闭。
- 错误处理
- 用户取消任务
- 线程中断标记可能会被目标线程所执行的某些方法清空,因此从通用性的角度来看线程中断标记并不能作为线程停止标记。
-
由于发起线程在执行wokerThread.interrupt()的时候workerThread可能正在执行task.run(),而task.run()中的代码可能会清除线程中断标记,从而使得workerThread依旧无法终止。
image.png - 为了使线程停止标记的设置能够起作用,我们可能还需要给目标线程发送中断以将其唤醒。使之得以判断线程停止标记。
- 要“优雅”目标线程只有在其处理完所有待处理任务之后才能够终止。
- 结论:发起线程更新目标线程的线程停止标记并给其发送中断,目标线程仅在当前无待处理任务且不会产生新的待处理任务情况下才能使run方法返回。
@Override
public void run() {
try {
// 1. isInterrupted()保证,只要中断标记为true就终止线程。
while (!isInterrupted()) {
// 执行任务...
}
} catch (InterruptedException ie) {
// 2. InterruptedException异常保证,当InterruptedException异常产生时,线程被终止。
}
}