生产者-消费者(wait-notifyAll)

2019-12-20  本文已影响0人  追梦小蜗牛
black-sedan-parked-under-tree-1841120.jpg

简介:

生产者、消费者真的是老生常谈了,很多地方都会用到这种思想,可见其重要程度。所以,还是有必要了解一下它的最原始的实现方式的,也就是利用Object的wait和notifyAll方法。

源码:

/**
     * Causes the current thread to wait until another thread invokes the
     * {@link java.lang.Object#notify()} method or the
     * {@link java.lang.Object#notifyAll()} method for this object.
     * In other words, this method behaves exactly as if it simply
     * performs the call {@code wait(0)}.
     * <p>
     * The current thread must own this object's monitor. The thread
     * releases ownership of this monitor and waits until another thread
     * notifies threads waiting on this object's monitor to wake up
     * either through a call to the {@code notify} method or the
     * {@code notifyAll} method. The thread then waits until it can
     * re-obtain ownership of the monitor and resumes execution.
     * <p>
     * As in the one argument version, interrupts and spurious wakeups are
     * possible, and this method should always be used in a loop:
     * <pre>
     *     synchronized (obj) {
     *         while (&lt;condition does not hold&gt;)
     *             obj.wait();
     *         ... // Perform action appropriate to condition
     *     }
     * </pre>
     * This method should only be called by a thread that is the owner
     * of this object's monitor. See the {@code notify} method for a
     * description of the ways in which a thread can become the owner of
     * a monitor.
     *
     * @throws  IllegalMonitorStateException  if the current thread is not
     *               the owner of the object's monitor.
     * @throws  InterruptedException if any thread interrupted the
     *             current thread before or while the current thread
     *             was waiting for a notification.  The <i>interrupted
     *             status</i> of the current thread is cleared when
     *             this exception is thrown.
     * @see        java.lang.Object#notify()
     * @see        java.lang.Object#notifyAll()
     */
    public final void wait() throws InterruptedException {
        wait(0);
    }

    /**
     * Wakes up all threads that are waiting on this object's monitor. A
     * thread waits on an object's monitor by calling one of the
     * {@code wait} methods.
     * <p>
     * The awakened threads will not be able to proceed until the current
     * thread relinquishes the lock on this object. The awakened threads
     * will compete in the usual manner with any other threads that might
     * be actively competing to synchronize on this object; for example,
     * the awakened threads enjoy no reliable privilege or disadvantage in
     * being the next thread to lock this object.
     * <p>
     * This method should only be called by a thread that is the owner
     * of this object's monitor. See the {@code notify} method for a
     * description of the ways in which a thread can become the owner of
     * a monitor.
     *
     * @throws  IllegalMonitorStateException  if the current thread is not
     *               the owner of this object's monitor.
     * @see        java.lang.Object#notify()
     * @see        java.lang.Object#wait()
     */
    public final native void notifyAll();

思路:

首先要想的是生产者和消费者分别需要用java里面的什么来实现,毫无疑问,肯定要用线程了,因为它是最基本的调度单元,可以在其run方法里面处理自定义逻辑。确定了线程之后,还要找一个地方来存储生产者生产出来的产品,首先这个容器要有序的,那就用最简单的ArrayList了。有的时候为了测试方便,我们有可能要协调多个线程,可能需要借助于CountDownLatch来实现。

举例1:

这个例子展示的是,开启多个生产者生产一定量的产品放进队列,等所有的生产者都生产完之后,打印生产了多少个产品。然后,多个消费者再开始消费,消费完之后,再打印,容器里面还剩余多少个产品。以此体验生产和消费的思想,大家可以在自己的IDE上运行一下。

    @Test
    public void test01() throws InterruptedException {
        CountDownLatch countDownLatchProducer = new CountDownLatch(15);
        CountDownLatch countDownLatchConsumer = new CountDownLatch(6);
        for (int i = 0; i < 3; i++) {//定义3个线程
            new Thread(new Producer(i,countDownLatchProducer)).start();
        }
//        Thread.sleep(20000);
        countDownLatchProducer.await();//当所有的生产者子线程的任务都完成之后,就会恢复继续向下执行。
        System.out.println("消费前大小:"+Queue.getList().size());
        for (int i = 0; i < 3; i++) {
            new Thread(new Consumer(countDownLatchConsumer)).start();
        }
//        Thread.sleep(20000);
        countDownLatchConsumer.await();//当所有的消费者子线程的任务都完成之后,就会恢复继续向下执行。
        System.out.println("消费后大小:"+Queue.getList().size());
    }

    class Producer implements Runnable {

        private int i;
        private CountDownLatch countDownLatch;

        public Producer(int i,CountDownLatch countDownLatch) {
            this.i = i;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {//每个生产者线程每次生产5个对象
                    Queue.put(new Bread(i));
                    countDownLatch.countDown();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class Consumer implements Runnable {

        private CountDownLatch countDownLatch;

        public Consumer(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 2; i++) {//每个消费者线程一次消费2个对象
                    Bread bread = Queue.get();
                    countDownLatch.countDown();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
public class Queue {

    private final static List<Bread> list = new ArrayList<>(15);

    public static List getList() {
        return list;
    }

    static void put(Bread bread) throws InterruptedException {//进队列
        synchronized (list) {
            while (list.size() >= 15) {//限制容器大小最大值为15
                list.wait();
            }
            list.add(bread);
            list.notifyAll();
        }
    }

    static Bread get() throws InterruptedException {//出队列
        Bread bread = null;
        synchronized (list) {
            if (list.size() == 0)
                list.wait();
            bread = list.remove(list.size() - 1);
            list.notifyAll();
        }
        return bread;
    }
}
package com.hao.hellolearn.jdk.producerconsumer;

public class Bread {

    private int value;

    public Bread(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }

    public void setValue(int value) {
        this.value = value;
    }
}

举例2:

这个例子展示的是:当容器的容量不足以装下生产者生产的任务的时候,生产者的线程就会阻塞在put方法上,直到消费者开始消费容器中的任务,生产者线程才会恢复执行,可以通过sleep方法来协助测试。

public class WaitNotifyImpl {

    @Test
    public void test01() throws InterruptedException {
        for (int i = 0; i < 3; i++) {//定义3个线程
            new Thread(new Producer(i)).start();
        }
        Thread.sleep(20000);
        System.out.println("消费前大小:"+Queue.getList().size());
        for (int i = 0; i < 3; i++) {
            new Thread(new Consumer()).start();
        }
        Thread.sleep(20000);
        System.out.println("消费后大小:"+Queue.getList().size());
    }

    class Producer implements Runnable {

        private int i;

        public Producer(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {//每个生产者线程每次生产5个对象
                    Queue.put(new Bread(i));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class Consumer implements Runnable {

        public Consumer() {
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 2; i++) {//每个消费者线程一次消费2个对象
                    Bread bread = Queue.get();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

当第一个sleep执行的时候,可以在cmd里面分别执行jps -l 查看运行中的进程的pid,然后用jstack -l pid 来打印出这个进程的堆栈信息,可以看到线程阻塞在object monitor上面。1处的是生产者线程的状态是WAITING是有object.wait 引起的,2处的是TIMED_WAITING是由sleep引起的。


生产者-消费者-jstack.png

总结:

生产者-消费者 生产出来的产品放在一个容器里面,消费者从容器里面取出来;如果生产速率慢,消费速度快,那么当容器为空的时候,消费者会阻塞在当前容器(什么事也做不了,静静的等待),直到容器里面被放进一个产品,然后争抢着。如果消费速率慢,生产速度快,那么当容器满了的时候生产者会阻塞在当前容器(什么事也做不了,静静的等待),直到容器里面的某一个产品被消费掉。它的本质其实就是线程之间的协作。
觉察即自由。

上一篇 下一篇

猜你喜欢

热点阅读