Java生产者/消费者模型的一种实现
2018-05-13 本文已影响49人
Lebens
本文主要介绍java中生产者/消费者模式的实现,对java线程锁机制的一次深入理解。
生产者/消费者模型
生产者/消费者模型要保证,同一个资源在同一时间节点下只能被最多一个线程访问,这个在java中用锁就很容易实现。
下面的例子就是模拟多个生产者生产,多个消费者消费的demo
//抽象生产者
public abstract class AbstractProducer implements Runnable {
abstract void produce() throws InterruptedException;
@Override
public void run() {
try {
while (true) {
produce();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//抽象消费者
public abstract class AbstractConsumer implements Runnable {
abstract void consume() throws InterruptedException;
@Override
public void run() {
try {
while (true) {
consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ConsumerAndProducerDemo {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Condition CONDITION = LOCK.newCondition();
private static final Queue<Product> PRODUCTS = new LinkedList<>();
private static final int SIZE = 4;
public static class Product {
int id;
Product(int id) {
this.id = id;
}
}
//实现消费者
private static class Consumer extends AbstractConsumer {
@Override
void consume() throws InterruptedException {
try {
LOCK.lock();
while (PRODUCTS.isEmpty()) {
CONDITION.await();
}
Product product = PRODUCTS.poll();
Thread.sleep((long) (500 + Math.random() * 1000));
System.out.println(" consume product " + product.id);
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
//实现生产者
private static class Producer extends AbstractProducer {
@Override
void produce() throws InterruptedException {
try {
LOCK.lock();
while (PRODUCTS.size() >= SIZE) {
CONDITION.await();
}
Thread.sleep(1000);
Product product = new Product(ATOMIC_INTEGER.incrementAndGet());
PRODUCTS.add(product);
System.out.println("produce product " + product.id);
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
public static void main(String[] args) {
for (int index = 0; index < 2; index++) {
new Thread(new Producer()).start();
}
for (int index = 0; index < 3; index++) {
new Thread(new Consumer()).start();
}
}
}
上面的demo这么实现
- 启动多个线程模拟多个生产者和多个消费者
- 同时使用了queue用来缓存产品
- 当缓存区没满时生产者生产
- 当缓冲区满时消费者开始消费
线程之间的同步,这里使用了ReentrantLock,ReentrantLock在之前的博客中有介绍过,当然也可以使用Object自带的wait()等方法,实现同步这里就不在修改demo另行实现了。