Java生产者消费者的三种实现

2020-08-28  本文已影响0人  有腹肌的豌豆Z

一、使用synchronize以及wait()、notify() /notifyAll()

/**
 * 使用synchronize以及wait()、notify() /notifyAll()
 */
public class ShareDataV1 {

    /**
     * 原子计数
     */
    public static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 标示
     */
    public volatile boolean flag = true;

    /**
     * 生产队列最大容量
     */
    public static final int MAX_COUNT = 10;

    /**
     * 生产队列容器
     */
    public static final List<Integer> pool = new ArrayList<>();

    /**
     * 生产
     */
    public void produce() {
        // 判断,干活,通知
        while (flag) {
            try {
                // 每隔 1000 毫秒生产一个商品
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 这里有一个锁
            synchronized (pool) {
                //池子满了,生产者停止生产
                //埋个坑,这里用的if
                //TODO 判断
                while (pool.size() == MAX_COUNT) {
                    try {
                        System.out.println("pool is full, wating...");
                        // TODO 线程阻塞
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
                //通知
                // notify() 方法随机唤醒对象的等待池中的一个线程,进入锁池;
                // notifyAll() 唤醒对象的等待池中的所有线程,进入锁池。
                pool.notifyAll();
            }
        }
    }


    /**
     * 消费
     */
    public void consumue() {
        // 判断,干活,通知
        while (flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 这里有一个锁
            synchronized (pool) {
                //池子满了,生产者停止生产
                //埋个坑,这里用的if
                //TODO 判断
                while (pool.size() == 0) {
                    try {
                        System.out.println("pool is empty, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    /**
     * 停止
     */
    public void stop() {
        flag = false;
    }


    public static void main(String[] args) {
        ShareDataV1 shareDataV1 = new ShareDataV1();

        // 开启生产线程
        new Thread(() -> {
            shareDataV1.produce();
        }, "AAA").start();

        // 开启消费线程
        new Thread(() -> {
            shareDataV1.consumue();
        }, "BBB").start();

        // 开启生产线程
        new Thread(() -> {
            shareDataV1.produce();
        }, "CCC").start();

        // 开启消费线程
        new Thread(() -> {
            shareDataV1.consumue();
        }, "DDD").start();

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        shareDataV1.stop();
    }

}

二、使用Lock,Condition的await和signal方法

class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try { 
            while (number != 0) {
                //等待,不能生产
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }

    public void decrement() throws Exception {
        lock.lock();
        try {
            while (number == 0) {
                //等待,不能消费
                condition.await();
            }

            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }
}

public class ProducerConsumer_V2{
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "CC").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "DD").start();
    }
}

三、下面是使用阻塞队列实现生产者消费者模式:

LinkedBlockingQueue

/**
 * 下面是使用阻塞队列实现生产者消费者模式:
 */
public class ShareDataV3 {

    /**
     * 阻塞队列容量
     */
    private static final int MAX_CAPACITY = 10;

    /**
     * 阻塞队列
     */
    private static BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(MAX_CAPACITY);

    /**
     * 标示
     */
    private volatile boolean FLAG = true;

    /**
     * 原子计数
     */
    private AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 生产
     *
     * @throws InterruptedException
     */
    public void produce() throws InterruptedException {

        // 开启可控的死循环 不停的生产
        while (FLAG) {
            // 向队列中添加元素
            boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
            // 插入元素是否成功
            if (retvalue == true) {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列" + atomicInteger.get() + "成功" + "资源队列大小= " + blockingQueue.size());
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列" + atomicInteger.get() + "失败" + "资源队列大小= " + blockingQueue.size());
            }
            // 线程睡眠
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "FLAG变为flase,生产停止");
    }

    /**
     * 消费
     *
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {
        Integer result = null;
        while (true) {
            // 从队列中获取元素
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            // 获取结果判断
            if (null == result) {
                System.out.println("超过两秒没有取道数据,消费者即将退出");
                return;
            }
            // 打印获取结果
            System.out.println(Thread.currentThread().getName() + "\t 消费" + result + "成功" + "\t\t" + "资源队列大小= " + blockingQueue.size());
            Thread.sleep(1500);
        }

    }

    /**
     * 停止
     */
    public void stop() {
        this.FLAG = false;
    }


    public static void main(String[] args) {

        ShareDataV3 v3 = new ShareDataV3();

        // 开启一个线程 执行生产
        new Thread(() -> {
            try {
                v3.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AAA").start();

        // 开启一个线程 执行消费
        new Thread(() -> {
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();

        // 开启一个线程 执行消费
        new Thread(() -> {
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "CCC").start();


        try {
            // 测试 主线程睡眠5秒
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        v3.stop();
    }

}

上一篇下一篇

猜你喜欢

热点阅读