多线程专家首页投稿(暂停使用,暂停投稿)Java学习笔记

Java 生产者消费者实现 —— BlockingQueue

2017-09-01  本文已影响942人  被称为L的男人

前言

对着《Java 编程思想》,通过wait - notifyAll实现了生产者消费者模式。今天用BlockingQueue实现一下。

BlockingQueue

简单实现

生产者和消费者,共用一个BlockingQueue。为什么BlockingQueue能够实现生产者-消费者模型呢?对于puttake两个操作,注释如下:

/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *
 * @param e the element to add
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
void put(E e) throws InterruptedException;
/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element becomes available.
 *
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;

Apple.java,生产和消费的对象。

public class Apple {
    
    private int id;
    
    public Apple(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Apple [id=" + id + "]";
    }
}

生产者:

public class Producer {
    BlockingQueue<Apple> queue;
    
    public Producer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public boolean put(Apple apple) {
        return queue.offer(apple);
    }
}

消费者:

public class Consumer {
    BlockingQueue<Apple> queue;
    
    public Consumer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public Apple take() throws InterruptedException {
        return queue.take();
    }
}

测试:

public class TestConsumer {
    
    public static void main(String[] args) {

        final BlockingQueue<Apple> queue = new LinkedBlockingDeque<Apple>(100);
        
        // 生产者
        new Thread(new Runnable() {
            
            int appleId = 0;
            Producer producer = new Producer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        producer.put(new Apple(appleId++)); 
                        producer.put(new Apple(appleId++)); 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消费者
        new Thread(new Runnable() {
            Consumer consumer = new Consumer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println(consumer.take().getId());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出:

生产者生产2个Apple,消费者立即消费掉。

改进

上述代码存在一些问题:

改进后的代码如下:

Apple类未更改。

Producer变为抽象类,并使用泛型。里面新增线程池,用于运行消费者线程。

public abstract class Producer<E> {
    protected BlockingQueue<E> queue;
    protected ExecutorService threadPool = Executors.newCachedThreadPool();
    public static final int DEFAULT_QUEUE_LENGTH = 10000;
    
    public Producer(int capacity) {
        initQueue(capacity);
    }
    
    public BlockingQueue<E> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<E> queue) {
        this.queue = queue;
    }

    public boolean put(E apple) {
        return queue.offer(apple);
    }
    
    private void initQueue(int capacity) {
        if (queue == null) {
            synchronized (this) {
                if (queue == null) {
                    queue = new LinkedBlockingDeque<E>(capacity < 0 ? DEFAULT_QUEUE_LENGTH : capacity);
                }
            }
        }
    }
    
    protected void consumerThread(int consumerCount, Consumer<E> consumer) {
        for (int i = 0; i < consumerCount; i++) {
            threadPool.execute(consumer);
        }
    }
}

Consumer也变成抽象类,使用泛型,并实现了Runnable接口。其中run方法的实现逻辑是:从阻塞队列中取出一个对象,并调用抽象方法consume。该方法是具体的消费者实现的消费逻辑。

public abstract class Consumer<E> implements Runnable{
    BlockingQueue<E> queue;
    
    /**
     * 数据逐个处理
     * @param data
     */
    protected abstract void consume(E data);
    
    @Override
    public void run() {
        while (true) {
            try {
                E data = take();
                try {
                    consume(data);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public Consumer(BlockingQueue<E> queue) {
        this.queue = queue;
    }
    
    public E take() throws InterruptedException {
        return queue.take();
    }
}

AppleProducer:Apple的生产者,使用非延迟加载的单例模式,指定阻塞队列的长度、消费者线程数量。

public class AppleProducer extends Producer<Apple>{
    
    // 并没有延迟加载
    public static AppleProducer INSTANCE = new AppleProducer(DEFAULT_QUEUE_LENGTH, 1); 

    private AppleProducer(int capacity, int consumerCount) {
        super(capacity);
        AppleConsumer consumer = new AppleConsumer(queue);
        consumerThread(consumerCount, consumer);
    }
}

AppleConsumer:Apple的消费者,要实现具体的消费方法consume。这里只是在控制台输出对象信息。

public class AppleConsumer extends Consumer<Apple>{

    public AppleConsumer(BlockingQueue<Apple> queue) {
        super(queue);
    }

    @Override
    protected void consume(Apple data) {
        System.out.println(data);
    }
}

测试:这里只需要获取AppleProducer,调用put方法添加对象即可!在队列中有对象Apple时,会有线程取出Apple,自动调用AppleConsumer的consume方法。

public class TestConsumer {
    
    public static void main(String[] args) throws InterruptedException {

        AppleProducer producer = AppleProducer.INSTANCE;
        for (int i = 0; i < 60; i++) {
            producer.put(new Apple(i));
        }
    }
}

有待改进的地方

上一篇下一篇

猜你喜欢

热点阅读