【线程】生产者消费者模式
2019-02-25 本文已影响5人
浅浅星空
1.synchronized 实现
public class Person {
private int age;
private boolean isIncrease = true;
public synchronized void increase() {
while (!isIncrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
age++;
isIncrease = false;
this.notifyAll();
}
public synchronized void decrease() {
while (isIncrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
age--;
isIncrease = true;
this.notifyAll();
}
public int getAge() {
return age;
}
}
public class Producer implements Runnable {
private Person p;
public Producer(Person p) {
this.p = p;
}
@Override
public void run() {
while (true) {
p.increase();
System.out.println("producer:" + p.getAge());
}
}
}
public class Consumer implements Runnable {
private Person p;
public Consumer(Person p) {
this.p = p;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
while (true) {
p.decrease();
System.out.println("consumer:" + p.getAge());
}
}
}
}
public class Test01 {
public static void main(String[] args) {
Person person = new Person();
Producer producer = new Producer(person);
Consumer consumer = new Consumer(person);
Thread thread1 = new Thread(producer);
Thread thread2 = new Thread(consumer);
thread1.start();
thread2.start();
}
}
2.Condition实现
public class BoundBuffer {
Lock lock = new ReentrantLock();
private Object[] data = new Object[5];
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private int putptr, takeptr, count;
public void put(Object obj) {
try {
lock.lock();
while (count == data.length) {
notFull.await();
}
data[putptr] = obj;
if (++putptr == data.length) putptr = 0;
++count;
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public Object take() {
try {
lock.lock();
while (count == 0) {
notEmpty.await();
}
Object result = data[takeptr];
if (++takeptr == data.length) takeptr = 0;
count--;
notFull.signal();
return result;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
}
class Test02 {
public static void main(String[] args) {
BoundBuffer buffer = new BoundBuffer();
Random random = new Random();
Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
int a = random.nextInt();
System.out.println("put:" + a);
buffer.put(a);
}
}
});
Thread takeThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("take:" + buffer.take());
}
}
});
putThread.start();
takeThread.start();
}
}
3.BlockingQueue
public class Producer implements Runnable {
private BlockingQueue<Object> blockingQueue;
private String name;
public Producer(BlockingQueue blockingQueue, String name) {
this.blockingQueue = blockingQueue;
this.name = name;
}
@Override
public void run() {
while (true) {
try {
blockingQueue.put(name);
System.out.println("生产:" + name + ",剩余:" + blockingQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Consumer implements Runnable {
private BlockingQueue<Object> blockingQueue;
private String name;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
System.out.println("消费:"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class TestQueue {
public static void main(String[] args) {
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<Object>(5);
Thread producer1 = new Thread(new Producer(blockingQueue, "生产1"));
Thread producer2 = new Thread(new Producer(blockingQueue, "生产2"));
Thread consumer1 = new Thread(new Consumer(blockingQueue));
Thread consumer2 = new Thread(new Consumer(blockingQueue));
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}