Java面试宝典

如何实现非阻塞式生产者消费者?

2020-11-25  本文已影响0人  架构师奶爸

这道题想考察什么?

1、是否了解生产者消费者模式?
2、并发编程的同步与通信问题怎么解决?

考察的知识点

1、生产者消费者模式的实现
2、并发编程中线程同步
3、线程之间的协作与通信

考生应该如何回答

1、磨刀不误砍柴工,刚看到这道题时,我们不要急着下手,先看看具体要做什么。实现生产者消费者模式?这不分分钟的事,老规矩,不就生产者 + 消费者 + 堵塞队列嘛!Stop!!!仔细看看前面的定语,非堵塞式,那完了,堵塞队列不能用,这下就有很多同学措手不及,该怎么搞呢?所以我在文章一开始,就提示了要认真审题,咱们一步一步地去实现。2、生产者消费者模式在日常生活中,生产者消费者模式特别常见。比如说我们去麦当劳吃饭,在前台点餐,付完钱后并不是直接给你汉堡薯条啥的,而是给你一张小票,你需要前去取餐处等待,后厨加工完的餐食都直接放入取餐处,机器叫号提醒,客户凭小票取餐。

image

上面取餐的场景其实就是一个典型的生产者消费者模型,具备3个部分:生产者、消费者、缓冲区。后厨就相当于生产者,客户就是消费者,而取餐台是两者之间的一个缓冲区。再转到我们平时开发过程中,经常会碰到这样子的场景:某个模块负责产生数据,这些数据由另一个模块来负责处理。产生数据的模块,就称为生产者,而处理数据的模块,就称为消费者。当然如果只抽象出生产者和消费者,还不是正儿八经的生产者消费者模式,还需要一个缓冲区,生产者生产数据到缓冲区,消费者从缓冲区拿数据去消费。服务器端经常使用的消息队列设计就是参照生产者消费者模型。但这个时候有的同学就会好奇的问一句,干嘛需要缓冲区呢,生产完直接给消费者不是更加简单吗?在复杂的系统中,这中间的缓冲区必不可少,作用明显。

3、阻塞与非堵塞到底什么是堵塞?通俗的话来讲,就是一件事没干完,就只能在这等待,不允许去做其他的事。在程序世界里,阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。而非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程,线程让出CPU。
4、有了上面两个理论基础,我们再去考量一下这个问题。因为要求是非堵塞式,所以缓冲区我们就不能使用便捷的堵塞队列,只能使用一般的集合代替,类似ArrayList等。很明显会引起两个问题。第一,并发问题。类似ArrayList这些普通集合是线程不安全的,当生产者、消费者线程同时操作(数据入队、数据出队)缓冲区时,必然会引起ConcurrentModificationException。当然这个问题解决方案有很多种,比如使用synchronized、ReentrantLock等锁机制,也可以使用线程安全的集合,当然线程安全的集合底层也是锁机制。第二,线程通信问题。非堵塞式的意思就是生产者与消费者去操作缓冲区,只是尝试去操作,至于能不能得到想要的结果,他们是不管的,并不会像堵塞队列那样死等。那么生产者与消费者之间的协作与通信,比如缓冲区没数据时通知生产者去生产;缓冲区有数据后,通知消费者去消费;当缓冲区数据满了让生产者休息。这里我们可以使用wait、notify/notifyAll方法。这些方法使用过程中注意以下几点基本就可以了。

5、经过上面的理论准备与分析,实现一个非堵塞式生产者消费者模式,轻而易举。不多说了,咱们一边撸码一边再论。

/**
 * 实现非堵塞式生产者消费者模式
 */     
public class ProducerConsumerDemo {

    /**
     * 定义队列最大容量,指缓冲区最多存放的数量
     */
    private static int MAX_SIZE = 3;

    /**
     * 缓冲区队列,ArrayList为非堵塞队列,线程不安全
     * static修饰,全局唯一
     */
    private static final List<String> list = new ArrayList<>();

    public static void main(String[] args) {
        //创建生产者线程
        Producer producer = new Producer();
        //创建消费者线程
        Consumer consumer = new Consumer();
        //生产者线程开启
        producer.start();
        //消费者线程开启
        consumer.start();
    }

    /**
     * 生产者线程
     */
    static class Producer extends Thread {

        @Override
        public void run() {
            //具体实现...
        }
    }

    /**
     * 消费者线程
     */
    static class Consumer extends Thread {

        @Override
        public void run() {
            //具体实现...
        }
    }
}

类的大致结构如上所示,很简单也很清晰。我们实现了两个线程,一个代表生产者,一个代表消费者,缓冲区使用非阻塞式队列ArrayList,所以它是线程不安全的,为了能更加清晰的看出生产者消费者执行流程,缓冲区大小设置成较小的3。
接着看看生产者线程的具体实现。

/**
 * 生产者线程
 */
static class Producer extends Thread {

        @Override
        public void run() {
            //使用while循环执行run方法
            while (true) {
                try {
                    //生产者 sleep 300ms, 消费者 sleep 500ms,模拟两者的处理能力不均衡
                    Thread.sleep(300);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }

                //第1步:获取队列对象的锁,与消费者持有的锁是同一把,保证线程安全
                synchronized (list) {
                    //第2步:判断缓冲区当前容量
                    //第2.1步:队列满了就不生产,等待
                    while (list.size() == MAX_SIZE) {
                        System.out.println("生产者 -> 缓冲区满了,等待消费...");
                        try {
                            //使用wait等待方法,内部会释放当前持有的锁
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //第2.2步:队列未满就生产一个产品
                    list.add("产品");
                    System.out.println("生产者 -> 生产一个产品,当前队列大小:" + list.size());

                    //唤醒其他线程,这里其他线程就是指消费者线程
                    list.notify();
                }
            }
        }
    }

生产者线程负责生产数据。只要开始执行生产流程,第一步先获取list对象锁,也就意味着当前只有生产者线程可操作缓冲区,保证线程安全。第二步它会先检查一下当前缓存区的容量,如果缓存区已经满了,那生产者无需再去生产新的数据,调用wait方法进行等待,这个过程会释放list对象锁。如果缓冲区没满,就直接生产一个产品,并通过notify方法唤醒消费者线程。
再看看消费者线程的具体实现。

/**
 * 消费者线程
 */
static class Consumer extends Thread {

        @Override
        public void run() {
            //使用while循环执行run方法
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }

                //第1步:获取队列对象的锁,与生产者持有的锁是同一把,保证线程安全
                synchronized (list) {
                    //第2步:判断缓冲区当前容量
                      //第2.1步:队列空了,等带
                    while (list.size() == 0) {
                        System.out.println("消费者 -> 缓冲区空了,等待生产...");
                        try {
                            //使用wait等待方法,内部会释放当前持有的锁
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //第2.2步:队列不为空,消费一个产品
                    list.remove(0);
                    System.out.println("消费者 -> 消费一个产品,当前队列大小:" + list.size());

                    //唤醒其他线程,这里其他线程就是指生产者线程
                    list.notify();
                }
            }
        }
    }

消费者线程负责消费数据,它的实现与生产者相似。第一步也是先获取list对象锁,避免并发异常。第二步检查当前缓冲区容量时,这里与生产者正好相反。如果缓冲区已经空了,没有数据可消费了,它会使用wait方法进行等待。如果缓冲区没空,则去消费一个产品,并且调用notify方法唤醒生产者线程去生产。
执行ProducerConsumerDemo的main方法,抓取方法执行日志。从logcat日志可以看出,生产者与消费者互相协作,有条不紊的进行生产与消费操作,没有引起并发异常问题。

//logcat日志(截取了部分)
生产者 -> 生产一个产品,当前队列大小:1
消费者 -> 消费一个产品,当前队列大小:0
生产者 -> 生产一个产品,当前队列大小:1
生产者 -> 生产一个产品,当前队列大小:2
消费者 -> 消费一个产品,当前队列大小:1
生产者 -> 生产一个产品,当前队列大小:2
消费者 -> 消费一个产品,当前队列大小:1
生产者 -> 生产一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
生产者 -> 生产一个产品,当前队列大小:3
生产者 -> 缓冲区满了,等待消费...
消费者 -> 消费一个产品,当前队列大小:2
...
上一篇下一篇

猜你喜欢

热点阅读