JAVA 生产者消费者

2019-03-18  本文已影响0人  So_ProbuING

虽然是Android开发,但是还是需要掌握设计模式以及优雅的编程方式,这样才能对开发的项目以及产品更好的服务。对于并发的情况,这里说的并发的情况不一定是网络的并发,在程序中,在批量处理数据或者是在执行复杂的逻辑的时候也需要处理并发的情况。在这种情况下,生产者消费者是一个很好的模型。这个模型可以使得并发任务变的更加合理与高效。

之前在项目中也写过几次生产者消费者模式,但是当时写的都是为了满足特定的需求,对于原理和知识点都是一知半解的。这样对于这种高效的方式要彻底的了解才可以。所以抽时间重新学习了一下,在此记录。
其实说生产者消费者其实就是一个描述,本质还是线程间通信的为题,保证线程同步,来实现保证线程安全访问共享资源

生产者消费者的问题

生产者消费者问题,也可叫有限缓冲问题,是一个经典的进程/线程同步问题
问题抛出如下:

首先,工厂中有一个产品缓冲区,生产者会不停地往缓冲区添加新产品,而消费者则是不停地从缓冲区中取走产品。
问如何保证生产者不会再缓冲区满时加入数据,且消费者不会在缓冲区为空时消耗数据
解答
生产者必须在缓冲区满时休眠,直到消费者消耗了产品时才能被唤醒。
同时,也必须让消费者在缓冲区空时休眠,直到生产者往缓冲区中添加产品时才能唤醒消费者。

生产者消费者问题有以下几种情况

我们这里先写第三种例子:

  1. 不使用java.util.concurrent包的情况下
/**
 * @Auther: wxblack-mac
 * @Date: 2019/3/15 10:47
 * @Description:生产者消费者模式练习 简单不使用java.util.concurrent包
 */
public class ProducerCustomer {
    //定义最大尺寸
    private final int MAX_SIZE = 100;
    //定义队列
    private final Queue<Integer> queue = new LinkedList<Integer>();

    public static void main(String[] args) {
        ProducerCustomer producerCustomer = new ProducerCustomer();
        Producer producer = producerCustomer.new Producer();
        Customer customer = producerCustomer.new Customer();
        new Thread(producer).start();
        new Thread(customer).start();


    }

    /**
     * 生产者
     */
    class Producer implements Runnable {

        @Override
        public void run() {
            while (true) {
                //lock
                synchronized (queue) {
                    if (queue.size() < MAX_SIZE) {
                        //生产者生产
                        int num = new Random().nextInt(100);
                        //将生产的东西放到队列中
                        boolean offerResult = queue.offer(num);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        //生产完毕 唤醒消费者
                        queue.notifyAll();
                        //输出生产者信息
                        System.out.println("生产者:" + Thread.currentThread().getName() + "生产了 " + num + "产品容量" + queue.size());
                    } else {
                        //队列已满 生产者等待
                        try {
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Customer implements Runnable {

        @Override
        public void run() {
            while (true) {
                synchronized (queue) {
                    if (queue.size() > 0) {
                        //消费者消费数据
                        int num = queue.poll();
                        System.out.println("消费者 " + Thread.currentThread().getName() + "消费了产品" + num + "产品容量" + queue.size());
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        //唤醒生产者
                        queue.notifyAll();
                    } else {
                        try {
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

}

在上面的例子中,我们使用两个线程的实现类Runnable来分别作为生产者与消费者,存储的队列只用于普通的队列queue.下面我们在使用BlockingQueue来实现,Blocking是一个先进先出得我队列。是阻塞队列,BlockingQueue支持当获取队列元素为空时,会阻塞等待队列中有元素再返回。同样在添加元素时,如果队列已满,等队列可以放入新元素时在放入。BlockQueue是线程安全的.BlockQueue的一个实现类是ArrayBlockingQueue底层采用数组实现。并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能操作。这样我们在线程中可以不需要实现自己的线程对象锁。

/**
 * @Auther: wxblack-mac
 * @Date: 2019/3/15 11:24
 * @Description:BlockQueue实现
 */
public class ProducerCustomerBlock {

    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(100);
        ProducerCustomerBlock producerCustomerBlock = new ProducerCustomerBlock();
        ProducerB producerB = producerCustomerBlock.new ProducerB(blockingQueue);
        Customer customer = producerCustomerBlock.new Customer(blockingQueue);
        new Thread(producerB).start();
        new Thread(customer).start();
    }

    /**
     * 生产者
     */
    class ProducerB implements Runnable {
        private BlockingQueue<Integer> blockingQueue;

        public ProducerB(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            while (true) {
                //生产
                int nextInt = new Random().nextInt(100);
                //放入队列中
                try {
                    blockingQueue.put(nextInt);
                    System.out.println("生产者" + Thread.currentThread().getName() + "生产了数据" + nextInt + "队列容量" + blockingQueue.size());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Customer implements Runnable {
        private BlockingQueue<Integer> blockingQueue;

        public Customer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            while (true) {
                //消费商品
                try {
                    Integer integer = blockingQueue.take();
                    System.out.println("消费者" + Thread.currentThread().getName() + "消费了" + integer.intValue() + "队列容量" + blockingQueue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读