生产者消费者问题

2018-07-18  本文已影响0人  Catcher07

一、概念

生产者-消费者模式是多线程并发协作的经典模式。所谓的生产者-消费者问题,实际上包含两类的线程, 一种是生产者线程用于生产数据,另外一种的消费者线程用于消费数据。为了解耦生产者和消费者之间的关系,通常采用共享内存的方式(共享数据区域)。生产者只需要把生产的数据放到共享数据区域,而不需要关心消费者的行为。消费者只需要到共享数据区域取数据,而不需要关心生产者的行为。共享数据区域应该要具备以下线程并发协作的功能。

在JAVA中,实现生产者消费者问题,可以采用以下三种方式。

二、wait/notify的消息通知机制

wait/notifyAll实现生产者-消费者:

public class ProductorConsumer {


public static void main(String[] args) {

    LinkedList linkedList = new LinkedList();
    ExecutorService service = Executors.newFixedThreadPool(15);
    for (int i = 0; i < 5; i++) {
        service.submit(new Productor(linkedList, 8));
    }

    for (int i = 0; i < 10; i++) {
        service.submit(new Consumer(linkedList));
    }

}

static class Productor implements Runnable {

    private List<Integer> list;
    private int maxLength;

    public Productor(List list, int maxLength) {
        this.list = list;
        this.maxLength = maxLength;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (list) {
                try {
                    while (list.size() == maxLength) {
                        System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                        list.wait();
                        System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                    list.add(i);
                    list.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}


static class Consumer implements Runnable {

    private List<Integer> list;

    public Consumer(List list) {
        this.list = list;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (list) {
                try {
                    while (list.isEmpty()) {
                        System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                        list.wait();
                        System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Integer element = list.remove(0);
                    System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                    list.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

}

三、await/signalAll

使用Lock中Condition的await/signalAll实现生产者-消费者

public class ProductorConsumer {

private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();

public static void main(String[] args) {
    LinkedList linkedList = new LinkedList();
    ExecutorService service = Executors.newFixedThreadPool(15);
    for (int i = 0; i < 5; i++) {
        service.submit(new Productor(linkedList, 8, lock));
    }
    for (int i = 0; i < 10; i++) {
        service.submit(new Consumer(linkedList, lock));
    }

}

static class Productor implements Runnable {

    private List<Integer> list;
    private int maxLength;
    private Lock lock;

    public Productor(List list, int maxLength, Lock lock) {
        this.list = list;
        this.maxLength = maxLength;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (list.size() == maxLength) {
                    System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                    full.await();
                    System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                list.add(i);
                empty.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}


static class Consumer implements Runnable {

    private List<Integer> list;
    private Lock lock;

    public Consumer(List list, Lock lock) {
        this.list = list;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (list.isEmpty()) {
                    System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                    empty.await();
                    System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                }
                Integer element = list.remove(0);
                System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                full.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

}

四、BlockingQueue

使用BlockingQueue实现生产者-消费者

public class ProductorConsumer {

    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(queue));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(queue));
        }
    }


    static class Productor implements Runnable {

        private BlockingQueue queue;

        public Productor(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer element = (Integer) queue.take();
                    System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

参考文章

https://juejin.im/post/5aeec675f265da0b7c072c56

上一篇下一篇

猜你喜欢

热点阅读