Java编程的逻辑 -- 并发章 -- 线程的基本协作机制
线程的基本协作
多线程间除了竞争访问同一资源外,也经常需要相互协作的去执行一些任务。而对于协作的基本机制用的最多的无疑是wait/notify。
协作的场景
- 生产者消费者模式:共享队列,一个负责放一个负责取。如果队列长度有限且满了之后则等待。
- 同时开始:类似运动员比赛,多个线程同时开始执行。
- 等待结束:主线程分发给各个子任务去执行任务,主任务开始执行前需要等待各个子任务执行完毕。
- 异步结果:
- 集合点
wait/notify
wait和notify方法都是Object类提供给所有子类进行线程协作的一种实现机制。
wait:
public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;
一个带时间参数,表示最多等待这么长时间。
一个不带,默认为0,表示无限期等待。
如果在wait的过程中线程被中断,则会抛出InterruptedException。我们在之前关于Thread类的的博文中也提到过这个。
wait在等什么?
我们之前说过的每个对象都有一把锁和一个等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,如果获取不到则会把当前线程加入等待队列中,其实,除了对于锁的等待队列,每个对象还有另一个等待队列,即条件队列,该队列用于线程间的协作。调用
wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object(等待哪个对象就用哪个对象)的notify方法:
public final native void notify();
public final native void notifyAll();
notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区別是,它会移除条件队列中所有的线程并全部唤醒。
public class WaitThread extends Thread {
private volatile boolean fire = false;
@Override
public void run() {
try {
synchronized (this) {
while(!fire) {
wait();
}
}
System.out.println("fired");
} catch(InterruptedException e) {
}
}
public synchronized void fire() {
this.fire = true;
notify();
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}
代码中的协作的条件变量式fire,两个线程都要访问该fire变量,容易出现竞态条件所以相关代码都被synchronized保护了。
需要特别注意的是: wait和notify方法的调用只能再synchronized代码块中。如果在调用wait/notify方法时,当前线程没有对象锁的话,那么会抛出java.lang.IllegalMonitor-StateException
。
wait的具体过程:
- 把当前线程放入条件等待队列,++释放对象锁++,阻塞等待,线程状态变为
WAITING
或
TIMED_WAITING
- 等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁
- 如果能够获得锁,线程状态变为
RUNNABLE
,并从wait调用中返回。 - 否则,该线程加入对象锁等待队列,线程状态变为
BLOCKED
,只有在获得锁后才会从wait调用返回。
- 如果能够获得锁,线程状态变为
这里我们一定要区别好两个等待队列,一个是线程没有分配到cpu时间片进入到的对象锁等待队列。另一个则是线程执行遇到条件不满足的情况进入条件等待队列。
当线程从条件队列中返回不代表其等待的条件就满足了,也有可能是wait方法限定的时间到达了。我们在使用wait方法的时候当其跳出后还应该再判断一次。一般我们通过while循环的方式来做到。
synchronized (obj) {
while(条件不成立)
obj.wait();
…//执行条件满足后的操作
}
调用notify会把在条件队列中等侍的线程唤醍并从队列中移除,但它不会释放对象锁,也就是说,只有在包含notify的synchronized代码块(被synchronized修饰过了)执行完后,等待的线程才会从wait调用中返回。这一点需要铭记
我们在使用wait时最难的是搞清楚wait到底等的是什么?,而notify通知的又是什么? 我们需要知道,它们被不同的线程调用,并共亨相同的锁和条件等待队列(相同对象的
synchronized代码块内),它们围绕一个共享的条件变量进行协作,这个条件变量是程序自己维护的,当条件不成立时,线程调用wait进入条件等待队列,另一个线程修改条件后调用notify,调用wait的线程被唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用wait的线程角度看,它阻塞等待一个条件的成立。我们在设立多线程协作时,需要想清楚协作的++共享变量++和++条件++是什么,这是协作的核心。**
线程的基本协作示例
我们前面说了线程基本协作的场景,这里书上给出了对于这几种场景的代码示例:
- 生产者消费者模式
- 同时开始
- 等待结束
- 异步结果
- 集合点
生产者/消费者模式
队列:
static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<>(limit);
}
public synchronized void put(E e) throws InterruptedException {
while(queue.size() == limit) {
wait();
}
queue.add(e);
notifyAll();
}
public synchronized E take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll();
return e;
}
}
生产者:
static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
while(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch (InterruptedException e) {
}
}
}
消费者:
static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int)(Math.random()*100));
}
} catch(InterruptedException e) {
}
}
}
主程序:
public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
new Producer(queue).start();
new Consumer(queue).start();
}
在生产者消费者模式中,put等待的是队满而take等待的却是队空。但他们都会进入相同的对象的条件等待队列中。由于等待的条件不同,但两者的共享变量却都是该队列,所以此处不能使用notify,因为notify只能唤醒一个线程,而由于线程调度的机制。唤醒的如果是同类线程的话则起不到协调的作用,所以在共用同一个条件队列而等待的条件却相反时应该使用notifyAll。
Java提供了专门的阻塞队列,包括:
- 接口 BlockingQueue和BlockingDeque
- 基于数组的实现类 ArrayBlockingQueue
- 基于链表的实现类 LinkedBlockingQueue和LinkedBlockingDeque
- 基于堆的实现类 PriorityBlockingQueue
在实际场景中,应该优先使用这些实现类。
同时开始
同时开始的按例好比运动员比赛,一个主线程(裁判)决定着各个子线程(运动员)何时开始。
协作对象fired:
static class FireFlag {
private volatile boolean fired = false;
public synchronized void waitForFire() throws InterruptedException {
while(!fired) {
wait();
}
}
public synchronized void fire() {
this.fired = true;
notifyAll();
}
}
运动员:
static class Racer extends Thread {
FireFlag fireFlag;
public Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
this.fireFlag.waitForFire();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}
裁判:
public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag fireFlag = new FireFlag();
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(fireFlag);
racers[i].start();
}
Thread.sleep(1000);
fireFlag.fire();
}
等待结束
Thread类的join方法的实现其实就是借助于wait方法。其主要代码:
while (isAlive()) {
wait(0);
}
该代码意义是:只要该等待线程活着就会一直等待,join的结束依赖与线程运行结束的时候Java系统调用notifyAll来通知该等待线程。
使用join有时比较麻烦需要等待各个子线程结束。这里书上给出的例子采用了另一种写法,主线程与子线程协作的是一个数,这个数表示未完成的线程的个数,初始值为子线程个数,主线程需要等待该值变为0,每个子线程结束后需要将该值减一,当为0时调用notifyAll。
协作对象MyLatch:
public class MyLatch {
private int count;
public MyLatch(int count) {
this.count = count;
}
public synchronized void await() throws InterruptedException {
while(count > 0) {
wait();
}
}
public synchronized void countDown() {
count--;
if(count <= 0) {
notifyAll();
}
}
}
子线程:
static class Worker extends Thread {
MyLatch latch;
public Worker(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
//simulate working on task
Thread.sleep((int) (Math.random() * 1000));
this.latch.countDown();
} catch (InterruptedException e) {
}
}
主线程:
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
MyLatch latch = new MyLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
MyLatch也可以应用在“同时开始”的场景,初始值设为1。
public class RacerWithLatchDemo {
static class Racer extends Thread {
MyLatch latch;
public Racer(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
MyLatch latch = new MyLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}
Java中提供了一个专门的同步类CountDownLatch,在实际开发中应该使用它。
异步结果
在主从模式中,手工创建线程比较麻烦。一种常见的模式是异步调用,异步调用一般返回一个名为Future的对象,通过它可以获得最终的结果。在Java中表示子任务的接口是Callable。如下是书上例子:
子任务:Callable
public interface Callable<V> {
V call() throws Exception;
}
异步调用的结果:Future
public interface MyFuture <V> {
V get() throws Exception ;
}
通过该接口的get方法返回真正的结果。如果结果还没有计算完成,get方法会阻寒直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。
方便主线程调用子任务的类 MyExecutor
public <V> MyFuture<V> execute(final Callable<V> task)
通过该方法主线程就不需要在创建并管理子线程了。可以方便的获取到异步调用的结果。如下:
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
// 子任务
Callable<Integer> subTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//…执行异步任务
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
//异步调用子任务,返回一个MyFuture对象
MyFuture<Integer> future = executor.execute(subTask);//内部创了个子线程去执行
//…执行其他操作
try {
//获取异步调用的结果
Integer result = future.get();
System.out.println(result);
} catch(Exception e) {
e.printStackTrace();
}
}
所以重点是,MyExecutor类的execute方法是怎么实现的呢 ?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程。
MyFuture类的execute具体实现:
public <V> MyFuture<V> execute(final Callable<V> task) {
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V>() {
@Override
public V get() throws Exception {
synchronized (lock) {
while(!thread.isDone()) {
try {
lock.wait();
} catch (InterruptedException e) {
}
}
if(thread.getException() != null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}
execute方法启动一个执行子线程,并返回携带执行结果的MyFuture对象。MyFuture的方法会阻塞等待知道子线程运行结束返回结果。
执行子线程 ExecuteThread
static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
@Override
public void run() {
try {
result = task.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}
Java中也已经包含了一套完善的方案,有:
- 表示异步结果的接口Future和实现类FutureTask。
- 用于执行异步任务的接口Executor,以及有更多功能的子接口ExecutorService。
- 用于创建Executor和ExecutorService的工厂方法类Executors。
集合点
和之前等待结束和同时开始案例相似。各线程分开行动,各自到大一个集合点,在集合点需要集齐所有线程,交换数据然后再进行下一步动作。协作的共享变量n,初始值为线程总数,当有一个线程到达集合点n减一。直到变为0即最后一个也到达,通过notifyAll来唤醒所有条件等待线程。
协作对象:AssemblePoint
协作对象public class AssemblePoint {
private int n;
public AssemblePoint(int n) {
this.n = n;
}
public synchronized void await() throws InterruptedException {
if(n > 0) {
n--;
if(n == 0) {
notifyAll();
} else {
while(n != 0) {
wait();
}
}
}
}
}
主线程:AssemblePointDemo
public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint ap;
public Tourist(AssemblePoint ap) {
this.ap = ap;
}
@Override
public void run() {
try {
//模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));
//㼿㼿
ap.await();
System.out.println("arrived");
//…㼿㼿㼿㼿㼿㼿㼿㼿㼿
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
final int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint ap = new AssemblePoint(num);
for(int i = 0; i < num; i++) {
threads[i] = new Tourist(ap);
threads[i].start();
}
}
}
Java中有一个专门的同步工具类CyclicBarrier可以替代该AssemblePoint类。
总结
该节主要介绍了Java中线程间协作的基本机制wait/notify,协作关键要想淸楚协作的共享变贵和条件是什么。Java中有专门为协作而建的阻塞队列、同步工具类,以及Executors框架。