如何实现非阻塞式生产者消费者?
这道题想考察什么?
1、是否了解生产者消费者模式?
2、并发编程的同步与通信问题怎么解决?
考察的知识点
1、生产者消费者模式的实现
2、并发编程中线程同步
3、线程之间的协作与通信
考生应该如何回答
1、磨刀不误砍柴工,刚看到这道题时,我们不要急着下手,先看看具体要做什么。实现生产者消费者模式?这不分分钟的事,老规矩,不就生产者 + 消费者 + 堵塞队列嘛!Stop!!!仔细看看前面的定语,非堵塞式,那完了,堵塞队列不能用,这下就有很多同学措手不及,该怎么搞呢?所以我在文章一开始,就提示了要认真审题,咱们一步一步地去实现。2、生产者消费者模式在日常生活中,生产者消费者模式特别常见。比如说我们去麦当劳吃饭,在前台点餐,付完钱后并不是直接给你汉堡薯条啥的,而是给你一张小票,你需要前去取餐处等待,后厨加工完的餐食都直接放入取餐处,机器叫号提醒,客户凭小票取餐。
image上面取餐的场景其实就是一个典型的生产者消费者模型,具备3个部分:生产者、消费者、缓冲区。后厨就相当于生产者,客户就是消费者,而取餐台是两者之间的一个缓冲区。再转到我们平时开发过程中,经常会碰到这样子的场景:某个模块负责产生数据,这些数据由另一个模块来负责处理。产生数据的模块,就称为生产者,而处理数据的模块,就称为消费者。当然如果只抽象出生产者和消费者,还不是正儿八经的生产者消费者模式,还需要一个缓冲区,生产者生产数据到缓冲区,消费者从缓冲区拿数据去消费。服务器端经常使用的消息队列设计就是参照生产者消费者模型。但这个时候有的同学就会好奇的问一句,干嘛需要缓冲区呢,生产完直接给消费者不是更加简单吗?在复杂的系统中,这中间的缓冲区必不可少,作用明显。
- 解耦。这是最显而易见的,如果生产者直接将数据交给消费者,那么这两个类必然会有依赖,消费者的改动都会影响到生产者。当两者之间加入缓存区之后,生产者与消费者之间彻底解耦了,各有所职,互不依赖。
- 平衡生产与消费能力。在多线程的环境下,如果生产者生产数据速度很快,消费者来不及消费,那么缓冲区便是生产者数据暂存的地方,生产者生产完一个数据后直接丢在缓冲区,便可以去生产下一个数据,消费者自己从缓冲区拿数据慢慢处理,这样生产者无需因为消费者的处理能力弱而浪费资源。当然,反之也一样。
3、阻塞与非堵塞到底什么是堵塞?通俗的话来讲,就是一件事没干完,就只能在这等待,不允许去做其他的事。在程序世界里,阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。而非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程,线程让出CPU。
4、有了上面两个理论基础,我们再去考量一下这个问题。因为要求是非堵塞式,所以缓冲区我们就不能使用便捷的堵塞队列,只能使用一般的集合代替,类似ArrayList等。很明显会引起两个问题。第一,并发问题。类似ArrayList这些普通集合是线程不安全的,当生产者、消费者线程同时操作(数据入队、数据出队)缓冲区时,必然会引起ConcurrentModificationException。当然这个问题解决方案有很多种,比如使用synchronized、ReentrantLock等锁机制,也可以使用线程安全的集合,当然线程安全的集合底层也是锁机制。第二,线程通信问题。非堵塞式的意思就是生产者与消费者去操作缓冲区,只是尝试去操作,至于能不能得到想要的结果,他们是不管的,并不会像堵塞队列那样死等。那么生产者与消费者之间的协作与通信,比如缓冲区没数据时通知生产者去生产;缓冲区有数据后,通知消费者去消费;当缓冲区数据满了让生产者休息。这里我们可以使用wait、notify/notifyAll方法。这些方法使用过程中注意以下几点基本就可以了。
- wait() 和 notify() 使用的前提是必须先获得锁,一般配合synchronized关键字使用,即在synchronized同步代码块里使用 wait()、notify/notifyAll() 方法。
- 当线程执行wait()方法时候,会释放当前持有的锁,然后让出CPU,当前线程进入等待状态。
- 当notify()方法执行时候,会唤醒正处于等待状态的线程,使其继续执行,notify()方法不会立即释放锁,锁的释放要看同步代码块的具体执行情况。notifyAll()方法的功能也是类似。
- notify()方法只唤醒一个等待线程并使该线程开始执行。所以如果有多个线程等待,这个方法只会唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。notifyAll()方法会唤醒所有等待线程,至于哪一个线程将会第一个处理取决于操作系统的实现。
5、经过上面的理论准备与分析,实现一个非堵塞式生产者消费者模式,轻而易举。不多说了,咱们一边撸码一边再论。
/**
* 实现非堵塞式生产者消费者模式
*/
public class ProducerConsumerDemo {
/**
* 定义队列最大容量,指缓冲区最多存放的数量
*/
private static int MAX_SIZE = 3;
/**
* 缓冲区队列,ArrayList为非堵塞队列,线程不安全
* static修饰,全局唯一
*/
private static final List<String> list = new ArrayList<>();
public static void main(String[] args) {
//创建生产者线程
Producer producer = new Producer();
//创建消费者线程
Consumer consumer = new Consumer();
//生产者线程开启
producer.start();
//消费者线程开启
consumer.start();
}
/**
* 生产者线程
*/
static class Producer extends Thread {
@Override
public void run() {
//具体实现...
}
}
/**
* 消费者线程
*/
static class Consumer extends Thread {
@Override
public void run() {
//具体实现...
}
}
}
类的大致结构如上所示,很简单也很清晰。我们实现了两个线程,一个代表生产者,一个代表消费者,缓冲区使用非阻塞式队列ArrayList,所以它是线程不安全的,为了能更加清晰的看出生产者消费者执行流程,缓冲区大小设置成较小的3。
接着看看生产者线程的具体实现。
/**
* 生产者线程
*/
static class Producer extends Thread {
@Override
public void run() {
//使用while循环执行run方法
while (true) {
try {
//生产者 sleep 300ms, 消费者 sleep 500ms,模拟两者的处理能力不均衡
Thread.sleep(300);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
//第1步:获取队列对象的锁,与消费者持有的锁是同一把,保证线程安全
synchronized (list) {
//第2步:判断缓冲区当前容量
//第2.1步:队列满了就不生产,等待
while (list.size() == MAX_SIZE) {
System.out.println("生产者 -> 缓冲区满了,等待消费...");
try {
//使用wait等待方法,内部会释放当前持有的锁
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//第2.2步:队列未满就生产一个产品
list.add("产品");
System.out.println("生产者 -> 生产一个产品,当前队列大小:" + list.size());
//唤醒其他线程,这里其他线程就是指消费者线程
list.notify();
}
}
}
}
生产者线程负责生产数据。只要开始执行生产流程,第一步先获取list对象锁,也就意味着当前只有生产者线程可操作缓冲区,保证线程安全。第二步它会先检查一下当前缓存区的容量,如果缓存区已经满了,那生产者无需再去生产新的数据,调用wait方法进行等待,这个过程会释放list对象锁。如果缓冲区没满,就直接生产一个产品,并通过notify方法唤醒消费者线程。
再看看消费者线程的具体实现。
/**
* 消费者线程
*/
static class Consumer extends Thread {
@Override
public void run() {
//使用while循环执行run方法
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
//第1步:获取队列对象的锁,与生产者持有的锁是同一把,保证线程安全
synchronized (list) {
//第2步:判断缓冲区当前容量
//第2.1步:队列空了,等带
while (list.size() == 0) {
System.out.println("消费者 -> 缓冲区空了,等待生产...");
try {
//使用wait等待方法,内部会释放当前持有的锁
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//第2.2步:队列不为空,消费一个产品
list.remove(0);
System.out.println("消费者 -> 消费一个产品,当前队列大小:" + list.size());
//唤醒其他线程,这里其他线程就是指生产者线程
list.notify();
}
}
}
}
消费者线程负责消费数据,它的实现与生产者相似。第一步也是先获取list对象锁,避免并发异常。第二步检查当前缓冲区容量时,这里与生产者正好相反。如果缓冲区已经空了,没有数据可消费了,它会使用wait方法进行等待。如果缓冲区没空,则去消费一个产品,并且调用notify方法唤醒生产者线程去生产。
执行ProducerConsumerDemo的main方法,抓取方法执行日志。从logcat日志可以看出,生产者与消费者互相协作,有条不紊的进行生产与消费操作,没有引起并发异常问题。
//logcat日志(截取了部分)
生产者 -> 生产一个产品,当前队列大小:1
消费者 -> 消费一个产品,当前队列大小:0
生产者 -> 生产一个产品,当前队列大小:1
生产者 -> 生产一个产品,当前队列大小:2
消费者 -> 消费一个产品,当前队列大小:1
生产者 -> 生产一个产品,当前队列大小:2
消费者 -> 消费一个产品,当前队列大小:1
生产者 -> 生产一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
...