java中的生产者消费者模式的实现

2021-02-21  本文已影响0人  水煮鱼又失败了

1 场景

生产者/消费者是java中的一种线程模型,用来保证同一个资源同一个时刻只被一个生产者或者一个消费者访问。

生产者向存储空间放入数据消费者从存储空间拿出数据
存储空间为空消费者阻塞等待
存储空间已满生产者阻塞等待

2 wait/notify实现

借助在synchronized块中使用wait()/notifyAll()的方式来实现线程的休眠唤醒

synchronized只支持非公平锁

2.1 代码

如下代码,存储空间大小为3,生产者、消费者各执行5次。

通过Thread.sleep(xx),让生产者的速度比消费者的速度些,仅测试用。

import java.util.concurrent.LinkedBlockingQueue;

/**
 * 生产者消费者命名空间
 * <p>
 * wait:告诉当前线程,释放锁,然后开始睡眠等待,此时的状态为Watting,直到有线程进入一样的监视器调用notify或者notifyAll唤醒它
 * notify:随机唤醒一个在一样的对象监视器上等待的线程(notify()非常容易导致死锁)
 * notifyAll:唤醒所有的在一样对象监视器上等待的线程
 **/
public class WaitNotityContext {
    
    /**
     * 队列大小
     */
    private static final Integer MAX_SIZE = 3;
    
    /**
     * 队列
     */
    private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_SIZE);
    
    /**
     * 生产者
     */
    public static class Producter implements Runnable {
        
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                // 测试使用(让生产者生产快一些)
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                String value = "{" + Thread.currentThread().getName() + "生产" + (i + 1) + "}";
                synchronized (QUEUE) {
                    while (QUEUE.size() >= MAX_SIZE) {
                        System.out.println("【" + Thread.currentThread().getName() + "】:队列已满,阻塞" + ",pool size :" + QUEUE.size());
                        try {
                            // 让出当前锁,让其他线程可以拿到锁
                            QUEUE.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    QUEUE.add(value);
                    System.out.println("【" + Thread.currentThread().getName() + "】:放入队列内容:" + value + ",pool size :" + QUEUE.size());
                    
                    // 唤醒全部wait状态的线程
                    QUEUE.notifyAll();
                }
            }
        }
    }
    
    /**
     * 消费者
     */
    public static class Consumer implements Runnable {
        
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                // 测试使用(让生产者生产快一些)
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                synchronized (QUEUE) {
                    while (QUEUE.size() <= 0) {
                        System.out.println("【" + Thread.currentThread().getName() + "】:队列为空,阻塞" + ",pool size :" + QUEUE.size());
                        try {
                            QUEUE.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                    String value = QUEUE.remove();
                    System.out.println("【" + Thread.currentThread().getName() + "】:拿出队列内容:" + value + ",pool size :" + QUEUE.size());
                    
                    // 唤醒全部wait状态的线程
                    QUEUE.notifyAll();
                }
            }
        }
    }
}
public static void main(String[] args) {
    // 初始化
    Consumer consumer = new Consumer();
    Producter producter = new Producter();
    // 启动生产者
    new Thread(producter, "producter01").start();
    new Thread(producter, "producter02").start();
    // 启动消费者
    new Thread(consumer, "consumer01").start();
    new Thread(consumer, "consumer02").start();
}

2.2 结果

输出结果如下:

【producter02】:放入队列内容:{producter02生产1},pool size :1
【producter01】:放入队列内容:{producter01生产1},pool size :2
【consumer01】:拿出队列内容:{producter02生产1},pool size :1
【producter01】:放入队列内容:{producter01生产2},pool size :2
【producter02】:放入队列内容:{producter02生产2},pool size :3
【consumer02】:拿出队列内容:{producter01生产1},pool size :2
【producter02】:放入队列内容:{producter02生产3},pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产2},pool size :2
【producter01】:放入队列内容:{producter01生产3},pool size :3
【consumer01】:拿出队列内容:{producter02生产2},pool size :2
【producter02】:放入队列内容:{producter02生产4},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer01】:拿出队列内容:{producter02生产3},pool size :2
【producter01】:放入队列内容:{producter01生产4},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产3},pool size :2
【producter02】:放入队列内容:{producter02生产5},pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer01】:拿出队列内容:{producter02生产4},pool size :2
【producter01】:放入队列内容:{producter01生产5},pool size :3
【consumer02】:拿出队列内容:{producter01生产4},pool size :2
【consumer01】:拿出队列内容:{producter02生产5},pool size :1
【consumer02】:拿出队列内容:{producter01生产5},pool size :0

3 ReetranLock实现

ReetranLock是Java中JUC并发包 中的可重入锁,支持公平锁非公平锁,默认为非公平锁。其使用方式如下:

// 定义锁
ReentrantLock reentrantLock = new ReentrantLock();

// 获取锁
reentrantLock.lock();
try {
    // 业务逻辑......
}finally {
    // 释放锁
    reentrantLock.unlock();
}

3.1 代码

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReetranLock生产者消费者命名空间
 */
public class ReentrantLockContext {
    
    /**
     * 队列大小
     */
    private static final Integer MAX_SIZE = 3;
    
    /**
     * 队列
     */
    private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(MAX_SIZE);
    
    /**
     * 锁
     */
    private static final Lock LOCK = new ReentrantLock();
    
    /**
     * 等待队列
     */
    private static final Condition CONDITION = LOCK.newCondition();
    
    /**
     * 生产者
     */
    public static class Producter implements Runnable {
        
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                // 测试使用(让生产者生产快一些)
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                String value = "{" + Thread.currentThread().getName() + "生产" + (i + 1) + "}";
                // 获得锁。如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获得锁为止。
                LOCK.lock();
                try {
                    while (QUEUE.size() >= MAX_SIZE) {
                        System.out.println("【" + Thread.currentThread().getName() + "】:队列已满,阻塞" + ",pool size :" + QUEUE.size());
                        // 使当前线程等待,直到被唤醒或中断
                        CONDITION.await();
                    }
                    QUEUE.add(value);
                    System.out.println("【" + Thread.currentThread().getName() + "】:放入队列内容:" + value + ",pool size :" + QUEUE.size());
                    // 唤醒所有等待的线程
                    CONDITION.signalAll();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 释放锁
                    LOCK.unlock();
                }
            }
        }
    }
    
    /**
     * 消费者
     */
    public static class Consumer implements Runnable {
        
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                // 测试使用(让生产者生产快一些)
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                LOCK.lock();
                try {
                    while (QUEUE.size() <= 0) {
                        System.out.println("【" + Thread.currentThread().getName() + "】:队列为空,阻塞" + ",pool size :" + QUEUE.size());
                        try {
                            // 使当前线程等待,直到被唤醒或中断
                            CONDITION.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                    String value = QUEUE.remove();
                    System.out.println("【" + Thread.currentThread().getName() + "】:拿出队列内容:" + value + ",pool size :" + QUEUE.size());
                    // 唤醒所有等待的线程
                    CONDITION.signalAll();
                    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    LOCK.unlock();
                }
            }
        }
    }
}
public static void main(String[] args) {
    // 初始化
    Consumer consumer = new Consumer();
    Producter producter = new Producter();
    // 启动生产者
    new Thread(producter, "producter01").start();
    new Thread(producter, "producter02").start();
    // 启动消费者
    new Thread(consumer, "consumer01").start();
    new Thread(consumer, "consumer02").start();  
}

3.2 结果

【producter01】:放入队列内容:{producter01生产1},pool size :1
【producter02】:放入队列内容:{producter02生产1},pool size :2
【consumer02】:拿出队列内容:{producter01生产1},pool size :1
【producter01】:放入队列内容:{producter01生产2},pool size :2
【consumer01】:拿出队列内容:{producter02生产1},pool size :1
【producter02】:放入队列内容:{producter02生产2},pool size :2
【producter01】:放入队列内容:{producter01生产3},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产2},pool size :2
【producter01】:放入队列内容:{producter01生产4},pool size :3
【consumer01】:拿出队列内容:{producter02生产2},pool size :2
【producter02】:放入队列内容:{producter02生产3},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【producter01】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter01生产3},pool size :2
【consumer01】:拿出队列内容:{producter01生产4},pool size :1
【producter02】:放入队列内容:{producter02生产4},pool size :2
【producter01】:放入队列内容:{producter01生产5},pool size :3
【producter02】:队列已满,阻塞,pool size :3
【consumer02】:拿出队列内容:{producter02生产3},pool size :2
【consumer01】:拿出队列内容:{producter02生产4},pool size :1
【producter02】:放入队列内容:{producter02生产5},pool size :2
【consumer02】:拿出队列内容:{producter01生产5},pool size :1
【consumer01】:拿出队列内容:{producter02生产5},pool size :0
上一篇下一篇

猜你喜欢

热点阅读