多线程实现生产者消费者
2020-02-23 本文已影响0人
tanbin_tech
共享的buffer容器对象要处理两个问题
- 取:如果buffer里没有元素,那么就释放锁去等待
- 加:如果buffer里容量已经满了,那么也要释放锁去等待
经典的实现
import java.util.LinkedList;
import java.util.Queue;
public class SimpleCP {
public static void main(String[] args) throws InterruptedException{
Buffer buffer = new Buffer(2);
Thread producer =new Thread(()->{
try {
int i = 0;
while(true){
buffer.add(i);
System.out.println("product the value: " + i);
i++;
Thread.sleep(1000);
}
} catch(InterruptedException e){
e.printStackTrace();
}
});
Thread consumer = new Thread(()->{
try {
while(true){
int value = buffer.get();
System.out.println("consumer the value: " + value);
Thread.sleep(1000);
}
}catch (InterruptedException e){
e.printStackTrace();
}
});
producer.start();
consumer.start();
//主线程等待子线程完成再往下执行
producer.join();
consumer.join();
}
static class Buffer {
int size;
Queue<Integer> buffer;
public Buffer(int size) {
this.size = size;
this.buffer = new LinkedList<>();
}
public void add(int value) throws InterruptedException {
synchronized (this) {
//经典的搭配,和wait配合的循环99%是while
//获得锁之后还要检查一次是否满足条件,存在仍然不满足的可能
while (buffer.size() >= size) {
wait(); //释放锁
}
buffer.add(value);
notify(); //唤醒一个等待线程
}
}
public int get() throws InterruptedException {
synchronized (this) {
while (buffer.size() == 0) {
wait();
}
int value = buffer.poll();
notify();
return value;
}
}
}
}
使用BlockingQueue 实现
相对于自己实现同步的容器对象,java已经提供了一种成熟的容器接口BlockingQueue,我们使用它的实现类 LinkedBlockingDeque
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class BlockingCP {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<Integer> buffer = new LinkedBlockingDeque<>(2);
Thread producer = new Thread(()->{
try {
int i = 0;
while(true){
buffer.put(i);
System.out.println("product the value: " + i);
i++;
Thread.sleep(1000);
}
} catch (InterruptedException e){
e.printStackTrace();
}
});
Thread consumer = new Thread(()->{
try {
while(true){
int value = buffer.take(); //(1)
System.out.println("consume the value: " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e){
e.printStackTrace();
}
});
consumer.start();
producer.start();
consumer.join();
producer.join();
}
}
(1)take方法和poll的区别
- take():取走
BlockingQueue
里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止 - poll(time):取走
BlockingQueue
里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
(2)BlockingQueue
添加元素的几个方法:
- add:插入成功则返回true,否则抛出
IllegalStateException
异常 - offer:插入成功返回true,否则返回false
- offer(E e, long timeout, TimeUnit unit):插入元素成功则返回true,如果空间不足就等待指定时间,超时返回false
- put:插入元素,如果没有容器没有剩余空间则等待(阻塞)