Java中J.U.C提供的阻塞队列BlockingQueue
2019-10-04 本文已影响0人
DoubleFooker
ArrayBlockingQueue<E>
基于数组的有界队列。
基本的使用
/**
* 实现生产者、消费者模型
*/
public class ArrayBlockingQueueDemo {
// 队列容量10
static ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
// 生产者
private static void enq(String entry) {
arrayBlockingQueue.add(entry);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static {
init();
}
// 启动线程监听队列数据,消费者
private static void init() {
new Thread(() -> {
System.out.println("消费者启动!");
while (true) {
// try {
// // 如果队列为空,这个操作会阻塞
// String data = arrayBlockingQueue.take();
// System.out.println("take 消费数据:" + data);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// // 如果队列为空 返回null 不阻塞
// String polldata = arrayBlockingQueue.poll();
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("poll 消费数据:" + polldata);
// 队列为空 会报错
if (arrayBlockingQueue.size() > 0) {
String removeData = arrayBlockingQueue.remove();
System.out.println("remove 消费数据:" + removeData);
}
}
}).start();
}
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
// 如果队列满了,会一直阻塞
try {
arrayBlockingQueue.put("data" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
// // 可设置超时时间
// try {
// boolean add3 = arrayBlockingQueue.offer("data" + i, 10, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// // 如果队列满了,继续添加数据会报异常
// boolean add = arrayBlockingQueue.add("data" + i);
}
}
}
入队方法
- add:如果队列满了会抛异常,使用的offer入队
- put:队列满了会一直阻塞等待
- offer:直接返回入队结果,可以设置超时时间,超时直接丢弃。
出队方法
- take:队列为空会一直阻塞等待
- remove:队列为空会报NoSuchElementException,使用的poll
- poll:队列为空返回null,不阻塞
实现原理
ArrayBlockingQueue属性中包含ReentrantLock和两个Condition,新建队列时对他们进行初始化
// 入队出队加锁保证线程安全
final ReentrantLock lock;
// 空队列阻塞操作
private final Condition notEmpty;
// 满队列阻塞操作
private final Condition notFull;
//初始化
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
当调用take方法时,如果队列为空,使用notEmpty.await()
让线程挂起,等到有数据入队时,在enqueue
方法通过notEmpty.signal()
唤醒线程。
同样道理
当调用put方法时,如果队列满了,使用notFull.await()
让线程挂起,等到有数据出队时再notFull.signal()
唤醒线程。
而超时操作offer则使用notFull.awaitNanos()
实现。
LinkedBlockingQueue<E>
基于链表的有界队列。使用和ArrayBlockingQueue基本一致。
LinkedBlockingQueue包含两个锁
// 读锁
private final ReentrantLock takeLock = new ReentrantLock();
//写锁
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
其实现原理跟ArrayBlockingQueue基本一致。只是把入队和出队分开两把锁操作,增加了吞吐量。
区别
- 初始化大小不一样。ArrayBlockingQueue必须指定大小,LinkedBlockingQueue不指定默认Integer.MAX_VALUE
- 锁分离。ArrayBlockingQueue入队和出队都只有一把锁,LinkedBlockingQueue分开两把锁做控制。
- 插入删除性能不同,ArrayBlockingQueue使用数组直接进行插入删除,LinkedBlockingQueue使用链表结构,需要构建Node节点进行操作,性能相对有损耗。