并发实验(一)生产者与消费者模型
2018-01-16 本文已影响0人
杭州痞老板
(一)基于wait/notify机制实现阻塞的BlockingStack
wait/notify机制专门用于线程间通信:
1)只有持有锁才能调用该锁的wait、notify方法
2)在本线程中调用wait()方法,将等待其他线程的通知(其他线程调用notify()方法或notifyAll()方法)
3)在本线程中调用notify()方法或notifyAll()方法,将通知对象等待池中的线程结束等待
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomerAndProducerPractice {
// 实验
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
MyBlockingStack stack = new MyBlockingStack();
for(int l1=0;l1<100;l1++) {
service.submit(new Producer(stack));
service.submit(new Customer(stack));
}
service.shutdown();
}
//生产者
static class Customer implements Runnable{
private Logger logger = LoggerFactory.getLogger(Customer.class);
private volatile MyBlockingStack stack;
Customer(MyBlockingStack stack){
this.stack=stack;
}
@Override
public void run() {
for(int l1=0;l1<11;l1++) {
try {
stack.put(new Bread());
} catch (InterruptedException e) {
logger.error(l1+"_"+e.getMessage(),e);
return;
}
}
}
}
//消费者
static class Producer implements Runnable{
private Logger logger = LoggerFactory.getLogger(Producer.class);
private volatile MyBlockingStack stack;
Producer(MyBlockingStack stack){
this.stack=stack;
}
@Override
public void run() {
for(int l1=0;l1<11;l1++) {
try {
stack.take();
} catch (InterruptedException e) {
logger.error(l1+"_"+e.getMessage(),e);
return;
}
}
}
}
//面包
static class Bread{}
//容器:BlockingQueue简易替代品
static class MyBlockingStack{
private Logger logger = LoggerFactory.getLogger(MyBlockingStack.class);
Bread[] stack = new Bread[10];
int size=0;
public synchronized void put(Bread bread) throws InterruptedException {
while(size==10) {
this.wait();
}
stack[size]=bread;
size++;
logger.info("生产了1个:还有"+size+"个面包");
this.notify();
}
public synchronized Bread take() throws InterruptedException {
while(size<=0) {
this.wait();
}
size--;
logger.info("消费了1个:还有"+size+"个面包");
this.notify();
return stack[size];
}
}
}
(二)基于BlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomerAndProducerPractice2 {
// 实验
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
BlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(10);
for(int l1=0;l1<100;l1++) {
service.submit(new Producer(queue));
service.submit(new Customer(queue));
}
service.shutdown();
}
//生产者
static class Customer implements Runnable{
private Logger logger = LoggerFactory.getLogger(Customer.class);
private volatile BlockingQueue<Bread> queue;
Customer(BlockingQueue<Bread> queue){
this.queue=queue;
}
@Override
public void run() {
for(int l1=0;l1<11;l1++) {
try {
//阻塞的put方法
queue.put(new Bread());
logger.info("生产了1个:容器中有"+queue.size()+"个");
} catch (InterruptedException e) {
logger.error(l1+"_"+e.getMessage(),e);
return;
}
}
}
}
//消费者
static class Producer implements Runnable{
private volatile BlockingQueue<Bread> queue;
private Logger logger = LoggerFactory.getLogger(Customer.class);
Producer(BlockingQueue<Bread> queue){
this.queue=queue;
}
@Override
public void run() {
for(int l1=0;l1<11;l1++) {
try {
//阻塞的take方法
queue.take();
logger.info("消费了1个:容器中有"+queue.size()+"个");
} catch (InterruptedException e) {
logger.error(l1+"_"+e.getMessage(),e);
}
}
}
}
//面包
static class Bread{}
}
(三)阻塞容器的对比
// MyBlockingStack 使用监视器锁+锁的wait/notify方法实现线程同步
// 俗称 悲观并发
static class MyBlockingStack{
private Logger logger = LoggerFactory.getLogger(MyBlockingStack.class);
Bread[] stack = new Bread[10];
int size=0;
public synchronized void put(Bread bread) throws InterruptedException {
while(size==10) {
this.wait();
}
stack[size]=bread;
size++;
logger.info("生产了1个:还有"+size+"个面包");
this.notifyAll();
}
public synchronized Bread take() throws InterruptedException {
while(size<=0) {
this.wait();
}
size--;
logger.info("消费了1个:还有"+size+"个面包");
this.notifyAll();
return stack[size];
}
}
// ArrayBlockingQueue使用ReentrantLock+Conditon来实现线程同步
// ReentrantLock代替监视器锁
// Conditon.await/signal 代替锁对象的wait/notify
// ReentrantLock可以对应多个Contion,只唤醒某个contion上等待锁的线程
// 利用CAS操作+线程挂起 实现线程同步--->俗称乐观并发
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//线程在notEmpty上等待
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒在notfull上等待的线程
notFull.signal();
return x;
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
//线程在notFull上等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤醒在notEmpty上等待的线程
notEmpty.signal();
}