Java中J.U.C提供的阻塞队列BlockingQueue

2019-10-04  本文已影响0人  DoubleFooker

ArrayBlockingQueue<E>

基于数组的有界队列。
基本的使用

/**
 * 实现生产者、消费者模型
 */
public class ArrayBlockingQueueDemo {
    // 队列容量10
    static ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10);

    // 生产者
    private static void enq(String entry) {
        arrayBlockingQueue.add(entry);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static {
        init();
    }

    // 启动线程监听队列数据,消费者
    private static void init() {
        new Thread(() -> {
            System.out.println("消费者启动!");
            while (true) {
//                try {
//                    // 如果队列为空,这个操作会阻塞
//                    String data = arrayBlockingQueue.take();
//                    System.out.println("take 消费数据:" + data);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                // 如果队列为空 返回null 不阻塞
//                String polldata = arrayBlockingQueue.poll();
//                try {
//                    TimeUnit.SECONDS.sleep(1);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                System.out.println("poll 消费数据:" + polldata);
                // 队列为空 会报错
                if (arrayBlockingQueue.size() > 0) {
                    String removeData = arrayBlockingQueue.remove();
                    System.out.println("remove 消费数据:" + removeData);
                }
            }
        }).start();
    }

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            // 如果队列满了,会一直阻塞
            try {
                arrayBlockingQueue.put("data" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
//            // 可设置超时时间
//            try {
//                boolean add3 = arrayBlockingQueue.offer("data" + i, 10, TimeUnit.SECONDS);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            // 如果队列满了,继续添加数据会报异常
//            boolean add = arrayBlockingQueue.add("data" + i);
        }
    }
}

入队方法

出队方法

实现原理

ArrayBlockingQueue属性中包含ReentrantLock和两个Condition,新建队列时对他们进行初始化

// 入队出队加锁保证线程安全 
final ReentrantLock lock;
// 空队列阻塞操作  
private final Condition notEmpty;
// 满队列阻塞操作
private final Condition notFull;
//初始化
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();

当调用take方法时,如果队列为空,使用notEmpty.await()让线程挂起,等到有数据入队时,在enqueue方法通过notEmpty.signal()唤醒线程。
同样道理
当调用put方法时,如果队列满了,使用notFull.await()让线程挂起,等到有数据出队时再notFull.signal()唤醒线程。
而超时操作offer则使用notFull.awaitNanos()实现。

LinkedBlockingQueue<E>

基于链表的有界队列。使用和ArrayBlockingQueue基本一致。
LinkedBlockingQueue包含两个锁

    // 读锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //写锁
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final Condition notFull = putLock.newCondition();

其实现原理跟ArrayBlockingQueue基本一致。只是把入队和出队分开两把锁操作,增加了吞吐量。

区别

上一篇下一篇

猜你喜欢

热点阅读