再读《生产者与消费者》
多线程经常使用,生产者消费者模式也经常读到,但是实际理解可能还不够透彻,所以想写个日志梳理下知识的脉络,温故而知新。
简单的模型
先从一个例子开始吧,有一些角色我先声明如下:
- 餐厅(Restaurant)--->载体
- 厨师(Chef) --->生产者
- 服务员(WaiterPerson) --->消费者
- 食物(Meaf)--->被消费
我梳理一下它们的工作流程:
-
故事的地点发生在餐厅,它是载体,包括了厨师、服务员、食物。
-
厨师在餐厅做饭,做完饭,饭放在橱窗,通知服务员端走,送给客人吃完;期间,厨师会不断地监控橱窗的食物是否被端走,如果端走则继续做新的食物,否则等待。
-
服务员也不能闲着,它时刻留心着橱窗是否有食物上架,如果没有则继续等待。如果有食物则端走,并通知厨师,我端走食物了,你可以做新食物了。
那么按照上面的步骤,首先我们看看生产者Chef的基本代码
synchronized(this){
while (restaurant.meal != null) {
wait();
}
}
上面代码表示,倘若食物已经做好一份了,厨师不断监控橱窗上面的食物,如果没有被服务员端走(消费),那我厨师就继续等待,多休息一会。注意这里用while而不是if是因为防止多个消费者产生竞争引起并发问题。
System.out.println("饭做好了,订单生成...")。
synchronized(restaurant.waiter){
restaurant.meal=new Meal();
restaurant.waiter.notifyAll();
}
上面代码表示 ,厨师没有等待了,他开始做饭(生产),完成生产食物后,厨师通知(notifyAll)正在橱窗等待食物的服务员,叫他去端菜(消费)。
那消费者Waiter的流程呢?我想过程应该是和生产者恰好是对立的。
synchronized(this){
while(restaurant.meal==null){
wait();
}
}
上面代码表示,服务员不断监控橱窗上面的食物有没有做好,如果没有做好,那我服务员就继续等待。 是吧?和前面的生产者的判断条件刚好对立。
System.out.println("我服务员把饭端走了...")。
synchronized(restaurant.chef){
restaurant.meal=null;
restaurant.chef.notifyAll();
}
上面代码表示 ,服务员被厨师通知端饭(消费)了,于是他开始端饭送个客人,导致橱窗上没有饭了,之后,服务员通知(notifyAll)橱窗口正在等待的厨师去做下一道菜(生产)。
通过上面的例子,我们可以初步了解生产者与消费者的工作模式。但是实际开发场景中,应该有不止一个生产者或者消费者,而且食物应该很多,那么这个时候我们应该引入队列(Queque)这个数据结构来管理它们了。
利用队列管理生产者与消费者
我们可以设想一下,在餐厅中的业务场景,厨师chef应该作为Runable角色可以有多个,我们可以用Excutor.submit(r)提交很多个厨师,让其工作, 而服务员我们也可以有多个,同理,我们也把他放入线程池去运行。 而食物Meal也有多个,并且我们要用一个数据结构存取它,让它作为厨师和服务员两者共同占有的资源又能做好同步处理。在上面的例子中,我们用wait(),notifyAll(),synchronized等方法进行食物的同步与通信。它们有一个明显的缺点,我们发现代码很是耦合,晦涩难懂,暂且不谈性能。
让开发者欣慰的事,JDK中提供了BlockQueque接口来存取“食物”。它是一个阻塞队列的数据结构。在这里,我们需要了解两点;
-
在开发过程中,"食物"常常指是的IO流。如网络编程中,服务端与客户端发送字节流相互通信。现在有netty或者nio等异步非阻塞IO的框架,让并发性能更佳。
-
阻塞是为了保证生产者与消费者步调一致,不要产生大量浪费的食物,消费者吃不完,导致资源耗尽。亦或者消费者盲目的去找生产者要食物,太多消费者拥挤,也会消耗资源。所以在刚刚开始的时候,jdk做了这个BlockQueque来管理食物和生产者和消费者通信。 生产者要生产食物如下面的代码:
@Override public void run() { try { while (!Thread.interrupted()) { Meal meal = new Meal(++count); mBlockQueque.put(meal);// 如果mBlockQueque容量不为empty则阻塞等待。 TimeUnit.SECONDS.sleep(2);//模拟生产耗时任务。 } } catch (InterruptedException e) { System.out.println("Chef sleep end interrupted..."); e.printStackTrace(); } }
上面mBlockQueque.put()为阻塞方法(如果橱窗(队列)还有食物未被领取,则等待不生产食物,否则生产食物并添加至橱窗),如注释上的说明,它的作用类似wait()/add();我们跟踪下源码:
/**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock(); // ---(1)
try {
while (!linkFirst(node))
notFull.await(); ---(2)
} finally {
lock.unlock();
}
}
我解析下上面的代码:put()是一个接口方法,它具体的实现方法之一是putFirst,给链表首位添加一个元素。
- (1)此处有lock-finally-unlock组成的临界区。它的作用类似synchronized,用来同步。它们之间不同的地方是:
一、用synchronized声明锁时,任务A和任务B,都要获取锁O,如果A首先获得锁O,B则一直等待直到A释放锁,B一直阻塞着不能被中断。
二、用lock-finally-unlock声明锁时,任务A和任务B,都要获取锁O,如果A首先获得锁O,B可以等待一段时间,不想等待了,可以自行中断。A如果想释放锁必须在finally后调用unlock。所以说我觉得lock更加灵活。
但是在大多数资源竞争不太激烈的情况下,我们还是用synchronized足够了。
- (2)此处notFull是Condition的实例。它提供更好的性能,通过await()/signal()方法扮演之前的wait()/notify()的角色。 这里代码是指while判断链表是否超过容量,返回false时,则调用await()阻塞等待当前任务线程。
我们分析完生产者chef,我们来看看消费者waiter的改造后的代码:
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Meal meal = mBlockQueque.take();//从队列中remove出一个食物,没有食物则阻塞等待
meal.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上述代码中,mBlockQueque.take()是一个可阻塞方法。它试图从橱窗队列上取食物,如果发现没有食物就阻塞消费者线程。看看take()的具体实现的源码:
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null) // ---(1)
notEmpty.await(); ---(2)
return x;
} finally {
lock.unlock();
}
}
(1)takeFisrt()方法中会有去调用unlinkFirst()去队列返回一个食物,如果有食物,就返回,并调用notFull.signal()唤醒正在阻塞的生产者线程。
(2) notEmpty是另外一个Condition实例,它用来和消费者线程通信。如果发现返回的食物为空,则notEmpty.await()让消费者线程阻塞等待。
至此。我们看到我们把具体的通信交互过程封装到了阻塞队列BlockQueue里。 生产者只需要调用take通信,消费者只需调用put通信。如下图:
写到这里了,那生产者与消费者模式有哪些实际应用呢? 我想线程池应该是应用最广泛的地方。下一篇我将详细介绍线程池的原理。
注:部分参考自《Java 编程思想》