生产者消费者实现2
2019-02-17 本文已影响0人
Ethan_Walker
1.用Object#wait/notifyAll 实现
package com.multithread.condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* created by Ethan-Walker on 2019/2/14
*/
public class ConsumerProducer {
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition(); // 生产者线程等待队列
private Condition notEmpty = lock.newCondition(); // 消费者线程等待队列
private String[] items;
private int count;
private int takePos, putPos;
public ConsumerProducer(int maxSize) {
this.items = new String[maxSize];
}
public void produce(String a) {
lock.lock();
try {
while (items.length == count) { // 缓存已满
notFull.await(); // 当前线程加到 生产者线程阻塞队列中
}
// 生产一个值
items[putPos] = a;
putPos = (putPos + 1) % items.length;
count++;
notEmpty.signal(); // 唤醒消费者线程中的一个
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String consume() {
lock.lock();
String res = null;
try {
while (count == 0) {// 缓存为空
notEmpty.await(); //
}
res = items[takePos];
takePos = (takePos + 1) % items.length;
count--;
notFull.signal(); // 唤醒生产者线程的一个
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
return res;
}
}
public static void main(String[] args) throws InterruptedException {
ConsumerProducer cp = new ConsumerProducer(10);
// 10 个生产者,每个生产者生产3个产品
Thread[] producers = new Thread[10];
for (int i = 0; i < 10; i++) {
producers[i] = new Thread(new Producer(cp), "producer-" + i);
}
//5个消费者,消费所有的产品
Thread[] consumers = new Thread[5];
for (int i = 0; i < 5; i++) {
consumers[i] = new Thread(new Consumer(cp), "consumer-" + i);
consumers[i].start();
}
for (int i = 0; i < 10; i++) {
producers[i].start();
}
}
}
// 每个生产者可以生产 3 个产品
class Producer implements Runnable {
private ConsumerProducer cp;
public Producer(ConsumerProducer cp) {
this.cp = cp;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(3000); // 生产一个产品花费的时间
cp.produce(Thread.currentThread().getName() + ":" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//每个消费者可以重复消费
class Consumer implements Runnable {
private ConsumerProducer cp;
public Consumer(ConsumerProducer cp) {
this.cp = cp;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);// 消费者消费需要占用一定的时间
} catch (InterruptedException e) {
e.printStackTrace();
}
String pro = cp.consume();
System.out.println(Thread.currentThread().getName() + " 消耗了: " + pro);
}
}
}
3. 用 BlockingQueue 实现
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ProducerConsumerPattern {
public static void main(String args[]) throws InterruptedException {
//Creating shared object
BlockingQueue sharedQueue = new ArrayBlockingQueue(8);
// 5个生产者,每个生产3个产品
Thread[] producers = new Thread[5];
for (int i = 0; i < 5; i++) {
producers[i] = new Thread(new Producer(sharedQueue), "producer-" + i);
producers[i].start();
}
Thread.sleep(5000);
// 2个消费者,每个可以重复消费
Thread[] consumers = new Thread[2];
for (int i = 0; i < 2; i++) {
consumers[i] = new Thread(new Consumer(sharedQueue), "consumer-" + i);
consumers[i].start();
}
}
}
//Producer Class in java
class Producer implements Runnable {
private final BlockingQueue sharedQueue;
public Producer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
static AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(2000);
sharedQueue.put("product-" + count.incrementAndGet());
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
//Consumer Class in Java
class Consumer implements Runnable {
private final BlockingQueue sharedQueue;
public Consumer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " Consumed: " + sharedQueue.take());
Thread.sleep(1500);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}