Java工程师知识树

Java基础-并发编程-生产者与消费者

2021-02-21  本文已影响0人  HughJin

Java工程师知识树 / Java基础


问题描述

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。

生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。

生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

Java实现生产者与消费者的几种方式

  1. synchronized + Object.wait() / Object.notifyAll()方法
  2. Lock + Condition.await() / Condition.signalAll()方法
  3. BlockingQueue阻塞队列方法
  4. Semaphore信号量

synchronized + Object.wait() / Object.notifyAll()方法

package com.study.thread.procon;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 生产与消费的仓库
 * 生产者与消费者模式synchronized实现
 */
public class Storage {

    private static final int MAX = 10;//仓库储量
    private List<Object> dataList = new LinkedList<>();//仓库产品
    //生产者
    public void producer() {
        while (true) {//持续生产
            synchronized (this) {
                if (dataList.size() >= MAX) {
                    System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
                            + "】无需再生产");
                    try {
                        wait();//不生产了
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        dataList.add(new Object());
                        System.out.println("【生产者" + Thread.currentThread().getName()
                                + "】生产一个产品,现库存" + dataList.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    notifyAll();//唤醒全部等待线程
                }
            }
        }
    }
    //消费者
    public void consumer() {
        while (true) {//持续消费
            synchronized (this) {
                if (dataList.size() <= 0) {
                    System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
                                + "】不能再继续消费");
                    try {
                        wait();//不消费了
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        dataList.remove(0);
                        System.out.println("【消费者" + Thread.currentThread().getName()
                                + "】消费一个产品,现库存" + dataList.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    notifyAll();//唤醒全部等待线程
                }
            }
        }
    }

    public static void main(String[] args) {
        Storage storage = new Storage();
        new Thread(storage::producer).start();
        new Thread(storage::consumer).start();
    }
}

Lock + Condition.await() / Condition.signalAll()方法

package com.study.thread.procon;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 生产与消费的仓库
 * 生产者与消费者模式 Lock 实现
 * 还可以使用 tryLock与isHeldByCurrentThread组合使用
 */
public class Storage1 {

    private static final int MAX = 10;//仓库储量
    private List<Object> dataList = new LinkedList<>();//仓库产品
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void producer() {
        while (true) {
            try {
                lock.lock();
                if (dataList.size() >= MAX) {
                    System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
                            + "】无需再生产");
                    try {
                        condition.await();//不生产了
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        dataList.add(new Object());
                        System.out.println("【生产者" + Thread.currentThread().getName()
                                + "】生产一个产品,现库存" + dataList.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    condition.signalAll();//唤醒全部等待线程
                }
            }finally {
                lock.unlock();
            }
        }
    }

    public void consumer() {
        while (true) {
            try {
                lock.lock();
                if (dataList.size() <= 0) {
                    System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
                                + "】不能再继续消费");
                    try {
                        condition.await();//不消费了
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        dataList.remove(0);
                        System.out.println("【消费者" + Thread.currentThread().getName()
                                + "】消费一个产品,现库存" + dataList.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    condition.signalAll();//唤醒全部等待线程
                }
            }finally {
                lock.unlock();
            }
        }
    }


    public static void main(String[] args) {
        Storage storage = new Storage();
        for (int i = 0; i < 4; i++) {
            new Thread(storage::producer).start();
        }
        for (int i = 0; i < 4; i++) {
            new Thread(storage::consumer).start();
        }
    }
}

BlockingQueue阻塞队列方法

阻塞队列:
当阻塞队列为空时,从阻塞队列中取数据的操作会被阻塞。
当阻塞队列为满时,往阻塞队列中添加数据的操作会被阻塞。

阻塞队列名称 说明
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
DelayQueue 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue 一个不存储元素的阻塞队列。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。
package com.study.thread.procon;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 *  生产与消费的仓库
 *  生产者与消费者模式 BlockingQueue 实现
 */
public class Storage2 {

    private static final int MAX = 10;//仓库储量
    private BlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<Object>();//仓库产品
    //生产者
    public void producer() {
        while (true) {
            if (blockingQueue.size() >= MAX) {
                System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
                            + "】无需再生产");
            } else {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    blockingQueue.add(new Object());
                    System.out.println("【生产者" + Thread.currentThread().getName()
                            + "】生产一个产品,现库存" + blockingQueue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    //消费者
    public void consumer() {
        while (true) {
            if (blockingQueue.size() <= 0) {
                System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
                            + "】不能再继续消费");
            } else {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    blockingQueue.remove(0);
                    System.out.println("【消费者" + Thread.currentThread().getName()
                            + "】消费一个产品,现库存" + blockingQueue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Storage storage = new Storage();
        for (int i = 0; i < 4; i++) {
            new Thread(storage::producer).start();
        }
        for (int i = 0; i < 4; i++) {
            new Thread(storage::consumer).start();
        }
    }
}

Semaphore信号量

package com.study.thread.procon;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * 生产与消费的仓库
 * 生产者与消费者模式synchronized实现
 */
public class Storage4 {

    private static final int MAX = 10;//仓库储量
    private List<Object> dataList = new LinkedList<>();//仓库产品
    //创建一个用Static和volatile关键字修饰的信号量实例,传参为0,表示初始计数为0
    private static volatile Semaphore semaphore = new Semaphore(0);

    //生产者
    public void producer() {
        while (true) {//持续生产
            if (semaphore.availablePermits() >= MAX) {
                System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
                        + "】无需再生产");
            } else {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    dataList.add(new Object());
                    //释放一个许可,并将其返还给信号量,也就是将 permits+1
                    semaphore.release();
                    System.out.println("【生产者" + Thread.currentThread().getName()
                            + "】生产一个产品,现库存" + dataList.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //消费者
    public void consumer() {
        while (true) {//持续消费
            if (semaphore.availablePermits() <= 0) {
                System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
                        + "】不能再继续消费");
            } else {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    dataList.remove(0);
                    semaphore.release(-1);
                    System.out.println("【消费者" + Thread.currentThread().getName()
                            + "】消费一个产品,现库存" + dataList.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Storage storage = new Storage();
        for (int i = 0; i < 40; i++) {
            new Thread(storage::producer).start();
        }
        for (int i = 0; i < 40; i++) {
            new Thread(storage::consumer).start();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读