10给女朋友讲讲并发编程-生产者&消费者模式

2021-01-17  本文已影响0人  XueFengDong

一、定义

生产者消费者模型

二、代码实现

@Slf4j
public class MessageQueue {


    public static void main(String[] args) {

        MessageQueue messageQueue = new MessageQueue(2);

        //3个生产者
        for (int i = 0; i < 3; i++) {
            int v = i;
            new Thread(() -> messageQueue.put(new Message(v,"值"+v)),"生产者"+i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //1个消费者
        new Thread(() ->{
            while (true){
                messageQueue.take();
            }
        },"消费者").start();

    }

    /**
     * 存储包裹
     */
    private LinkedList<Message> queue = new LinkedList<>();

    /**
     * 队列容量
     */
    private int capacity;


    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * 从队列中取出包裹,通知生产者继续生产
     * @return
     */
    public Message take(){
        synchronized (queue){
            while (queue.isEmpty()){
                try {
                    //如果队列为空,消费者进入等待中,等待生产者放入包裹后通知消费
                    log.info("队列为空,消费者进入等待中");
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //取出队列头部的第一个包裹
            log.info("取出队列头部的一个包裹");
            Message message = queue.removeFirst();
            //通知生产者继续存入包裹
            log.info("通知生产者继续存入包裹");
            queue.notifyAll();
            return message;
        }
    }


    /**
     * 将包裹放入队列中,通知消费者来提取
     * @param message
     */
    public void put(Message message){
        synchronized (queue){
            //当生产者将队列包裹放满后,进入等待状态,不在存入包裹
            while (queue.size() == capacity){
                try {
                    log.info("生产者队列已满,进入等待状态");
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //在队列的尾部放入消息包裹
            queue.addLast(message);
            log.info("消息已经放入队列中"+message);
            //通知消费者消费包裹
            queue.notifyAll();
        }
    }



}

/**
 * 此类为线程包裹类,需要将消息包裹放入消息队列中,包裹中包含线程的id以及线程产生的数据
 */
class Message{

    private int id;

    private Object value;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public Object getValue() {
        return value;
    }

    public void setValue(Object value) {
        this.value = value;
    }

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}

输出结果:

21:21:21.823  INFO [生产者1] com.dxf.生产者消费者.MessageQueue - 消息已经放入队列中Message{id=1, value=值1}
21:21:21.823  INFO [生产者2] com.dxf.生产者消费者.MessageQueue - 消息已经放入队列中Message{id=2, value=值2}
21:21:21.823  INFO [生产者0] com.dxf.生产者消费者.MessageQueue - 生产者队列已满,进入等待状态
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 取出队列头部的一个包裹
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 通知生产者继续存入包裹
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 取出队列头部的一个包裹
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 通知生产者继续存入包裹
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 队列为空,消费者进入等待中
21:21:22.823  INFO [生产者0] com.dxf.生产者消费者.MessageQueue - 消息已经放入队列中Message{id=0, value=值0}
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 取出队列头部的一个包裹
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 通知生产者继续存入包裹
21:21:22.823  INFO [消费者] com.dxf.生产者消费者.MessageQueue - 队列为空,消费者进入等待中
上一篇下一篇

猜你喜欢

热点阅读