Java基础-并发编程-生产者与消费者
2021-02-21 本文已影响0人
HughJin
问题描述
生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。
生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。
生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。
Java实现生产者与消费者的几种方式
- synchronized + Object.wait() / Object.notifyAll()方法
- Lock + Condition.await() / Condition.signalAll()方法
- BlockingQueue阻塞队列方法
- Semaphore信号量
synchronized + Object.wait() / Object.notifyAll()方法
package com.study.thread.procon;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 生产与消费的仓库
* 生产者与消费者模式synchronized实现
*/
public class Storage {
private static final int MAX = 10;//仓库储量
private List<Object> dataList = new LinkedList<>();//仓库产品
//生产者
public void producer() {
while (true) {//持续生产
synchronized (this) {
if (dataList.size() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
try {
wait();//不生产了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
notifyAll();//唤醒全部等待线程
}
}
}
}
//消费者
public void consumer() {
while (true) {//持续消费
synchronized (this) {
if (dataList.size() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
try {
wait();//不消费了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.remove(0);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
notifyAll();//唤醒全部等待线程
}
}
}
}
public static void main(String[] args) {
Storage storage = new Storage();
new Thread(storage::producer).start();
new Thread(storage::consumer).start();
}
}
Lock + Condition.await() / Condition.signalAll()方法
package com.study.thread.procon;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 生产与消费的仓库
* 生产者与消费者模式 Lock 实现
* 还可以使用 tryLock与isHeldByCurrentThread组合使用
*/
public class Storage1 {
private static final int MAX = 10;//仓库储量
private List<Object> dataList = new LinkedList<>();//仓库产品
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void producer() {
while (true) {
try {
lock.lock();
if (dataList.size() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
try {
condition.await();//不生产了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//唤醒全部等待线程
}
}finally {
lock.unlock();
}
}
}
public void consumer() {
while (true) {
try {
lock.lock();
if (dataList.size() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
try {
condition.await();//不消费了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.remove(0);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//唤醒全部等待线程
}
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Storage storage = new Storage();
for (int i = 0; i < 4; i++) {
new Thread(storage::producer).start();
}
for (int i = 0; i < 4; i++) {
new Thread(storage::consumer).start();
}
}
}
BlockingQueue阻塞队列方法
阻塞队列:
当阻塞队列为空时,从阻塞队列中取数据的操作会被阻塞。
当阻塞队列为满时,往阻塞队列中添加数据的操作会被阻塞。
阻塞队列名称 | 说明 |
---|---|
ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列。 |
LinkedBlockingQueue | 一个由链表结构组成的有界阻塞队列。 |
PriorityBlockingQueue | 一个支持优先级排序的无界阻塞队列。 |
DelayQueue | 一个使用优先级队列实现的无界阻塞队列。 |
SynchronousQueue | 一个不存储元素的阻塞队列。 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列。 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。 |
package com.study.thread.procon;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 生产与消费的仓库
* 生产者与消费者模式 BlockingQueue 实现
*/
public class Storage2 {
private static final int MAX = 10;//仓库储量
private BlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<Object>();//仓库产品
//生产者
public void producer() {
while (true) {
if (blockingQueue.size() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
} else {
try {
TimeUnit.SECONDS.sleep(1);
blockingQueue.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + blockingQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者
public void consumer() {
while (true) {
if (blockingQueue.size() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
} else {
try {
TimeUnit.SECONDS.sleep(1);
blockingQueue.remove(0);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + blockingQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Storage storage = new Storage();
for (int i = 0; i < 4; i++) {
new Thread(storage::producer).start();
}
for (int i = 0; i < 4; i++) {
new Thread(storage::consumer).start();
}
}
}
Semaphore信号量
package com.study.thread.procon;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 生产与消费的仓库
* 生产者与消费者模式synchronized实现
*/
public class Storage4 {
private static final int MAX = 10;//仓库储量
private List<Object> dataList = new LinkedList<>();//仓库产品
//创建一个用Static和volatile关键字修饰的信号量实例,传参为0,表示初始计数为0
private static volatile Semaphore semaphore = new Semaphore(0);
//生产者
public void producer() {
while (true) {//持续生产
if (semaphore.availablePermits() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.add(new Object());
//释放一个许可,并将其返还给信号量,也就是将 permits+1
semaphore.release();
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者
public void consumer() {
while (true) {//持续消费
if (semaphore.availablePermits() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.remove(0);
semaphore.release(-1);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Storage storage = new Storage();
for (int i = 0; i < 40; i++) {
new Thread(storage::producer).start();
}
for (int i = 0; i < 40; i++) {
new Thread(storage::consumer).start();
}
}
}