【线程通信】生产者-消费者模式

2021-02-18  本文已影响0人  Djbfifjd

一、简述

1️⃣生产者消费者模式并不是 GOF 提出的 23 种设计模式之一,23 种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是编程过程中最常用的一种设计模式。

一个常见的场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

2️⃣举个寄信的例子,大致过程如下:

  1. 把信写好——相当于生产者制造数据。
  2. 把信放入邮筒——相当于生产者把数据放入缓冲区。
  3. 邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区。
  4. 邮递员把信拿去邮局做相应的处理——相当于消费者处理数据。

3️⃣说明

  1. 生产消费者模式可以有效的对数据解耦,优化系统结构。
  2. 降低生产者和消费者线程相互之间的依赖与性能要求。
  3. 一般使用 BlockingQueue 作为数据缓冲队列,是通过锁和阻塞来实现数据之间的同步。如果对缓冲队列有性能要求,则可以使用基于 CAS 无锁设计的 ConcurrentLinkedQueue

二、生产者-消费者模式是一个经典的多线程设计模式

【生产者-消费者模式】为多线程间的协作提供了良好的解决方案。在该模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

Java 中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。

  1. Object 的 wait()/notify()。
  2. Lock 和 Condition 的 await()/signal()。
  3. BlockingQueue 阻塞队列方法。
  4. PipedInputStream/PipedOutputStream

三、wait()/notify()

wait()/nofity() 是基类 Object 的两个方法,也就意味着所有 Java 类都有这两个方法,这样就可以为任何对象实现同步机制。

  1. wait():当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
  2. notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
public class ProducerConsumer {
    private static final int CAPACITY = 5;

    public static void main(String args[]) {
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }
}

生产者

public static class Producer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;
    int i = 0;

    public Producer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.size() == maxSize) {
                    try {
                        System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("[" + name + "] Producing value : +" + i);
                queue.offer(i++);
                queue.notifyAll();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者

public static class Consumer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;

    public Consumer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    try {
                        System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                int x = queue.poll();
                System.out.println("[" + name + "] Consuming value : " + x);
                queue.notifyAll();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

判断 Queue 大小为 0 或者大于等于 queueSize 时须使用while(condition){},不能使用if(condition){}。其中while(condition),它又被叫做“自旋锁”。为防止该线程没有收到notify()调用也从wait()中返回(也称作虚假唤醒),这个线程会重新去检查 condition 条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。

四、使用 Lock 和 Condition 的 await()/signal()

JDK5.0 之后,Java 提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition 接口的 await()/signal() 就是其中用来做同步的两种方法,它们的功能基本上和 Object 的 wait()/nofity() 相同,完全可以取代它们,但是它们和新引入的锁定机制 Lock 直接挂钩,具有更大的灵活性。通过在 Lock 对象上调用newCondition(),将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。下面来看代码:

/**
 * 生产者消费者模式:使用Lock和Condition实现
 * {@link java.util.concurrent.locks.Lock}
 * {@link java.util.concurrent.locks.Condition}
 */
public class ProducerConsumerByLock {
    private static final int CAPACITY = 5;
    private static final Lock lock = new ReentrantLock();
    private static final Condition fullCondition = lock.newCondition();     //队列满的条件
    private static final Condition emptyCondition = lock.newCondition();    //队列空的条件

    public static void main(String args[]) {
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }
}

生产者

public static class Producer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;
    int i = 0;

    public Producer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {

            //获得锁
            lock.lock();
            while (queue.size() == maxSize) {
                try {
                    System.out.println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                    //条件不满足,生产阻塞
                    fullCondition.await();
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
            System.out.println("[" + name + "] Producing value : +" + i);
            queue.offer(i++);

            //唤醒其他所有生产者、消费者
            fullCondition.signalAll();
            emptyCondition.signalAll();

            //释放锁
            lock.unlock();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

消费者

public static class Consumer extends Thread {
    private Queue<Integer> queue;
    String name;
    int maxSize;

    public Consumer(String name, Queue<Integer> queue, int maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            //获得锁
            lock.lock();

            while (queue.isEmpty()) {
                try {
                    System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                    //条件不满足,消费阻塞
                    emptyCondition.await();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            int x = queue.poll();
            System.out.println("[" + name + "] Consuming value : " + x);

            //唤醒其他所有生产者、消费者
            fullCondition.signalAll();
            emptyCondition.signalAll();

            //释放锁
            lock.unlock();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

五、使用 BlockingQueue 阻塞队列方法

JDK1.5 以后新增的java.util.concurrent包新增了 BlockingQueue 接口。并提供了如下几种阻塞队列实现:

  1. java.util.concurrent.ArrayBlockingQueue
  2. java.util.concurrent.LinkedBlockingQueue
  3. java.util.concurrent.SynchronousQueue
  4. java.util.concurrent.PriorityBlockingQueue

实现生产者-消费者模型使用 ArrayBlockingQueue 或者 LinkedBlockingQueue 即可。

这里使用 LinkedBlockingQueue,它是一个已经在内部实现了同步的队列,实现方式采用的是 await()/signal()。它可以在生成对象时指定容量大小。它用于阻塞操作的是 put()/take()。

  1. put():类似于上面的生产者线程,容量达到最大时,自动阻塞。
  2. take():类似于上面的消费者线程,容量为 0 时,自动阻塞。

LinkedBlockingQueue 类的 put() 源码:

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();

public void put(E e) throws InterruptedException {
    putLast(e);
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

整体逻辑如下:

/**
 * 生产者消费者模式:使用{@link java.util.concurrent.BlockingQueue}实现
 */
public class ProducerConsumerByBQ{
    private static final int CAPACITY = 5;

    public static void main(String args[]){
        LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);

        Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
        Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
        Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
        Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
        Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生产者
     */
    public static class Producer extends Thread{
        private LinkedBlockingDeque<Integer> blockingQueue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.blockingQueue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                try {
                    blockingQueue.put(i);
                    System.out.println("[" + name + "] Producing value : +" + i);
                    i++;

                    //暂停最多1秒
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    /**
     * 消费者
     */
    public static class Consumer extends Thread{
        private LinkedBlockingDeque<Integer> blockingQueue;
        String name;
        int maxSize;

        public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.blockingQueue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                try {
                    int x = blockingQueue.take();
                    System.out.println("[" + name + "] Consuming : " + x);

                    //暂停最多1秒
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读