Java

Java生产者和消费者实现

2019-03-17  本文已影响0人  zzzhouzhong

0. 前言

生产者消费者是考察多线程的常见问题。最近尝试手写生产者和消费者时,发现这个问题并不止是考察多线程,还可以考察泛型、设计模式等。这里总结下如何手写生产者消费者。

1. 预备知识

2. 设计与整体架构

生产者消费者模式,需要生产者生产产品,消费者消耗产品。因此围绕着产品,我们需要:

分析完,上代码。

↓面向抽象(接口)的编程,无论如何逼格先起来~~

    //产品接口
    interface Product<T> {
        T get();
    }

    //生产者接口
    interface Producer<T> {
        Product<T> produce();
    }

    //消费者接口
    interface Consumer<T> {
        void consume(Product<T> product);
    }

然后是各个实现类,以产生long为例:↓

//产品类
    static class ProductImpl<T> implements Product<T> {
        private T data;

        ProductImpl(T data) {
            this.data = data;
        }

        @Override
        public T get() {
            return data;
        }
    }

    //生产者类
    static class LongProducer implements Producer<Long> {
        @Override
        public Product<Long> produce() {
            costTime(3000);
            long data = System.currentTimeMillis();
            System.out.println("produce:" + data);
            return new ProductImpl<>(data);
        }
    }

    //消费者类
    static class LongConsumer implements Consumer<Long> {
        @Override
        public void consume(Product<Long> product) {
            costTime(2000);
            System.out.println("consume:" + product.get());
        }
    }

    //模拟生产或消费时花费的时间
    private static void costTime(int sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

↓实际上生产者和消费者需要跑在Thread中,因此我们还需要用Runnable包裹一下,并循环生产/消费。利用代理模式,我们达到一个类只做一件事情的目的。其中produceOnce()consumeOnce()待实现

    //利用修饰模式or代理模式包裹的生产者,并实现Runnable接口达到循环生产的目的
    static class RunnableProducer<T> implements Producer<T>, Runnable {
        Producer<T> delegate;
        final Queue<Product<T>> queue;
        volatile boolean canceled = false;

        public void cancel() {
            canceled = true;
        }

        public RunnableProducer(Producer<T> producer, Queue<Product<T>> queue) {
            this.delegate = producer;
            this.queue = queue;
        }

        @Override
        public Product<T> produce() {
            return delegate.produce();
        }

        @Override
        public void run() {
            while (!canceled) {
                produceOnce();
            }
        }

        private void produceOnce() {
            // TODO: 19/3/18
        }
    }

    //利用修饰模式or代理模式包裹的消费者,并实现Runnable接口达到循环消费的目的
    static class RunnableConsumer<T> implements Consumer<T>, Runnable {
        Consumer<T> delegate;
        final Queue<Product<T>> queue;
        volatile boolean canceled = false;

        public void cancel() {
            canceled = true;
        }

        public RunnableConsumer(Consumer<T> consumer, Queue<Product<T>> queue) {
            this.delegate = consumer;
            this.queue = queue;
        }

        @Override
        public void consume(Product<T> product) {
            delegate.consume(product);
        }

        @Override
        public void run() {
            while (!canceled) {
                consumeOnce();
            }
        }

        private void consumeOnce() {
            // TODO: 19/3/18  
        }
    }

↓最后写下测试代码,使用LinkedList作为Queue,跑在ThreadPool中。至此整体架构就完成了。

    public static void main(String[] args) {
        Consumer<Long> consumer = new LongConsumer();
        Producer<Long> producer = new LongProducer();
        testSimpleProducerConsumer(producer, consumer);
    }

    private static <T> void testSimpleProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
        Queue<Product<T>> queue = new LinkedList<>();
        RunnableConsumer<T> realConsumer = new RunnableConsumer<>(consumer, queue);
        RunnableProducer<T> realProducer = new RunnableProducer<>(producer, queue);
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(realConsumer);
        executorService.execute(realProducer);
        costTime(30 * 1000);
        executorService.shutdown();
        realConsumer.cancel();
        realProducer.cancel();
    }

3. 实现


3.1 PlanA: Object的 wait()notify()

常用的同步/互斥就是Objectwait()notify()系列方法了。我们先用它来实现生产和消费的同步。

        private void produceOnce() {
            synchronized (queue) {
                try {
                    //先查询,如果有未消费的就wait(),直到消费者消费完成后发送notify()唤醒
                    while (queue.peek() != null) {
                        queue.wait();
                    }
                    //唤醒后执行生产
                    queue.add(produce());
                    //生产后发送notify()唤醒消费者
                    queue.notify();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private void consumeOnce() {
            synchronized (queue) {
                try {
                    //先查询,如果没有产品就wait()。直到生产者生产后发送notify()唤醒
                    while (queue.peek() == null) {
                        queue.wait();
                    }
                    //唤醒后执行消费
                    consume(queue.poll());
                    //消费完后发送notify()唤醒生产者
                    queue.notify();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

注释都写得很明白啦。跑一波看看结果。

image.png
总结:使用Objectwait()notify()需要自己实现生产者和消费者的配合,要小心各种逻辑的处理。下面使用可阻塞的BlockingQueue代替Queue,实现起来更简单,也更不容易出错。

3.2 PlanB:使用BlockingQueue

        //使用BlockingQueue代替Queue,实现可阻塞的队列
        BlockingQueue<Product<T>> queue;
        private void produceOnce() {
            try {
                Product<T> produce = produce();
                queue.put(produce);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void ConsumeOnce() {
            try {
                Product<T> product = queue.take();
                consume(product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    private static <T> void testBetterProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
        //使用SynchronousQueue实现同步队列,当然也可以选择其他BlockingQueue的实现,但有各种不同的特点
        BlockingQueue<Product<T>> queue = new SynchronousQueue<>();
        BetterRunnableProducer<T> realProducer = new BetterRunnableProducer<>(producer, queue);
        BetterRunnableConsumer<T> realConsumer = new BetterRunnableConsumer<>(consumer, queue);
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(realProducer);
        executorService.execute(realConsumer);
        costTime(30 * 1000);
        executorService.shutdown();
        realConsumer.cancel();
        realProducer.cancel();
    }

    public static void main(String[] args) {
        Consumer<Long> consumer = new LongConsumer();
        Producer<Long> producer = new LongProducer();
//        testSimpleProducerConsumer(producer, consumer);
        testBetterProducerConsumer(producer, consumer);
    }

↓跑一把


image.png

4. 总结

手写生产者消费者看似考察多线程知识,但这既是挑战也是机遇,如果用心思考,可以展示你更多的能力。本文不止是多线程知识,还有设计模式(面向接口的抽象思想,接口隔离原则,单一职责原则,代理模式,装饰器模式等)、泛型的使用等。另外,我们看到RunnableConsumerBetterRunnableConsumer中有大量重复代码,因此可以再抽象一层出来。而且他们都依赖了Queue,并且是构造方法中传入的依赖。这其实并不好,可以使用依赖倒置原则(依赖注入)进一步优化。

5. 其他

参考:
https://www.jianshu.com/p/e29632593057
这边文章还提到了了错过notify信号wait条件变化唤醒同类导致“假死”状态等。

6. 代码

package com.zz.multithreaddemo;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;

public class ProducerConsumerDemo {

    public static void main(String[] args) {
        Consumer<Long> consumer = new LongConsumer();
        Producer<Long> producer = new LongProducer();
        testSimpleProducerConsumer(producer, consumer);
//        testBetterProducerConsumer(producer, consumer);
    }

    private static <T> void testSimpleProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
        Queue<Product<T>> queue = new LinkedList<>();
        RunnableConsumer<T> realConsumer = new RunnableConsumer<>(consumer, queue);
        RunnableProducer<T> realProducer = new RunnableProducer<>(producer, queue);
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(realConsumer);
        executorService.execute(realProducer);
        costTime(10 * 1000);
        executorService.shutdown();
        realConsumer.cancel();
        realProducer.cancel();
    }


    private static <T> void testBetterProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
        //使用SynchronousQueue实现同步队列,当然也可以选择其他BlockingQueue的实现,但有各种不同的特点
        BlockingQueue<Product<T>> queue = new SynchronousQueue<>();
        BetterRunnableProducer<T> realProducer = new BetterRunnableProducer<>(producer, queue);
        BetterRunnableConsumer<T> realConsumer = new BetterRunnableConsumer<>(consumer, queue);
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(realProducer);
        executorService.execute(realConsumer);
        costTime(10 * 1000);
        executorService.shutdown();
        realConsumer.cancel();
        realProducer.cancel();
    }


    //产品接口
    interface Product<T> {
        T get();
    }

    //生产者接口
    interface Producer<T> {
        Product<T> produce();
    }

    //消费者接口
    interface Consumer<T> {
        void consume(Product<T> product);
    }

    //产品类
    static class ProductImpl<T> implements Product<T> {
        private T data;

        ProductImpl(T data) {
            this.data = data;
        }

        @Override
        public T get() {
            return data;
        }
    }

    //生产者类
    static class LongProducer implements Producer<Long> {
        @Override
        public Product<Long> produce() {
            costTime(3000);
            long data = System.currentTimeMillis();
            System.out.println("produce:" + data);
            return new ProductImpl<>(data);
        }
    }

    //消费者类
    static class LongConsumer implements Consumer<Long> {
        @Override
        public void consume(Product<Long> product) {
            costTime(2000);
            System.out.println("consume:" + product.get());
        }
    }

    //模拟生产或消费时花费的时间
    private static void costTime(int sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    //利用修饰模式or代理模式包裹的生产者,并实现Runnable接口达到循环生产的目的
    static class RunnableProducer<T> implements Producer<T>, Runnable {
        Producer<T> delegate;
        final Queue<Product<T>> queue;
        volatile boolean canceled = false;

        public void cancel() {
            canceled = true;
        }

        public RunnableProducer(Producer<T> producer, Queue<Product<T>> queue) {
            this.delegate = producer;
            this.queue = queue;
        }

        @Override
        public Product<T> produce() {
            return delegate.produce();
        }

        @Override
        public void run() {
            while (!canceled) {
                produceOnce();
            }
        }

        private void produceOnce() {
            synchronized (queue) {
                try {
                    //先查询,如果有未消费的就wait(),直到消费者消费完成后发送notify()唤醒
                    while (queue.peek() != null) {
                        queue.wait();
                    }
                    //唤醒后执行生产
                    queue.add(produce());
                    //生产后发送notify()唤醒消费者
                    queue.notify();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //利用修饰模式or代理模式包裹的消费者,并实现Runnable接口达到循环消费的目的
    static class RunnableConsumer<T> implements Consumer<T>, Runnable {
        Consumer<T> delegate;
        final Queue<Product<T>> queue;
        volatile boolean canceled = false;

        public void cancel() {
            canceled = true;
        }

        public RunnableConsumer(Consumer<T> consumer, Queue<Product<T>> queue) {
            this.delegate = consumer;
            this.queue = queue;
        }

        @Override
        public void consume(Product<T> product) {
            delegate.consume(product);
        }

        @Override
        public void run() {
            while (!canceled) {
                consumeOnce();
            }
        }

        private void consumeOnce() {
            synchronized (queue) {
                try {
                    //先查询,如果没有产品就wait()。直到生产者生产后发送notify()唤醒
                    while (queue.peek() == null) {
                        queue.wait();
                    }
                    //唤醒后执行消费
                    consume(queue.poll());
                    //消费完后发送notify()唤醒生产者
                    queue.notify();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class BetterRunnableProducer<T> implements Producer<T>, Runnable {
        //使用BlockingQueue代替Queue,实现可阻塞的队列
        BlockingQueue<Product<T>> queue;
        Producer<T> delegate;
        volatile boolean canceled = false;

        public void cancel() {
            canceled = true;
        }

        BetterRunnableProducer(Producer<T> producer, BlockingQueue<Product<T>> queue) {
            this.delegate = producer;
            this.queue = queue;
        }

        @Override
        public Product<T> produce() {
            return delegate.produce();
        }

        @Override
        public void run() {
            while (!canceled) {
                produceOnce();
            }
        }

        private void produceOnce() {
            try {
                Product<T> produce = produce();
                queue.put(produce);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class BetterRunnableConsumer<T> implements Consumer<T>, Runnable {
        BlockingQueue<Product<T>> queue;
        Consumer<T> delegate;
        volatile boolean canceled = false;

        public void cancel() {
            canceled = true;
        }

        BetterRunnableConsumer(Consumer<T> consumer, BlockingQueue<Product<T>> queue) {
            this.delegate = consumer;
            this.queue = queue;
        }

        @Override
        public void consume(Product<T> product) {
            delegate.consume(product);
        }

        @Override
        public void run() {
            while (!canceled) {
                ConsumeOnce();
            }
        }

        private void ConsumeOnce() {
            try {
                Product<T> product = queue.take();
                consume(product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

上一篇下一篇

猜你喜欢

热点阅读