Java 多线程(九):ArrayBlockingQueue 与
2018-07-11 本文已影响16人
聪明的奇瑞
什么是阻塞队列?
- 阻塞队列与我们平常接触到的普通队列(ArrayList)的最大不同点在于阻塞队列的添加和删除方法都是阻塞的
- 阻塞添加:当阻塞队列元素已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行元素加入操作
- 阻塞删除:当队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作
BlockingQueue
- BlockingQueue 能够解决多线程中如何高效安全的传输数据的问题,帮助我们快速搭建高质量的多线程程序
- 通过队列,可以使得数据由队列的一端输入,从另一端输出
- 适用场景:生产者线程在一端生成,消费者线程在另一端消费
BlockingQueue 主要方法
-
插入方法:
- add(E e):
- 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
- 成功返回 true,失败抛 IllegalStateException 异常
- offer(E e,long timeout,TimeUnit unit):
- 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
- 可设置超时时间,该方法可以中断
- 成功返回 true,如果队列已满,返回 false
- put(E e):
- 将元素插入到队尾,如果队列已满则一直等待(阻塞)
- add(E e):
-
删除方法:
- remove(Object o):
- 移除指定元素,成功返回 true,失败返回 false
- poll(long timeout, TimeUnit unit):
- 获取并移除此队列的头元素,在指定等待的时间前一直等到获取元素
- take():
- 获取并移除队列头元素,若没有元素则一直阻塞
- remove(Object o):
-
检查方法:
- element():
- 获取但不移除队列的头元素,没有元素则抛异常
- peek():
- 获取但不移除队列的头,若队列为空则返回 null
- element():
BlockingQueue 基本使用
- 该例子中使用 ArrayBlockingQueue,生产者(Producer)将字符串插入共享队列中,消费者将它们取出
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
- 生产者通过 put() 方法将元素插入共享队列中
public class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
}catch (Exception e){
e.printStackTrace();
}
}
}
- 消费者通过 take() 方法从队列中取出元素,take() 方法会阻塞直到获取元素为止
public class Consumer implements Runnable {
private BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}catch (Exception e){
e.printStackTrace();
}
}
}
BlockingQueue 接口实现类与源码分析
ArrayBlockingQueue
- 分析:
- 基于数组的阻塞队列实现,内部维护了一个数组用于缓存队列中的数据对象
- 有两个 Integer 类型的索引,指向添加、获取下一个元素的位置的索引
- 并通过一个重入锁 ReentrantLock 和两个 Condition 条件来实现阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/**获取数据的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;
/**添加数据的索引,主要用于 put, offer, or add 方法*/
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并非访问的锁 */
final ReentrantLock lock;
/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
private final Condition notEmpty;
/**notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
private final Condition notFull;
/**
迭代器
*/
transient Itrs itrs = null;
}
image
- 分析:
- add 方法实际上是调用了 offer 方法
- enqueue(E x) 方法内部通过 putIndex 索引直接将元素添加到数组 item 中,当 putIndex 索引大小等于数组长度时,需要将 putIndex 重新设置为 0,这是因为当前队列元素总是从队头获取,从队尾添加
//add方法实现,间接调用了offer(e)
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//offer方法
public boolean offer(E e) {
checkNotNull(e);//检查元素是否为null
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
if (count == items.length)//判断队列是否满
return false;
else {
enqueue(e);//添加元素到队列
return true;
}
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
//获取当前数组
final Object[] items = this.items;
//通过putIndex索引对数组进行赋值
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//队列中元素数量加1
//唤醒调用take()方法的线程,执行元素获取操作。
notEmpty.signal();
}
SJziX.md.png
- 分析:
- put 方法是一个阻塞方法,如果队列元素已满,那么当前线程将会被 notFull 条件对象挂起加入到等待队列中,直到有空挡才会唤醒执行添加操作
//put方法,阻塞时可中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);//如果队列没有满直接添加。。
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
- 基于链表的阻塞队列,内部维护着一个链表构成的缓冲队列,用于缓存队列中的数据对象
- 在正常情况下链表阻塞队列的吞吐量要高于数组的阻塞队列(ArrayBlockingQueue),因为其内部实现添加和删除操作使用了两个 ReentrantLock 来控制并发执行(插入、获取各有一个锁),而 ArrayBlockingQueue 内部只使用一个 ReentrantLock 控制并发
- 它与 ArrayBlockingQueue 的 API 几乎一致但内部实现原理不太相同
- 当创建一个 LinkedBlockingQueue 时,默认阻塞队列中元素的数量大小为 Interger.MAX_VALUE
LinkedBlockingQueue 和 ArrayBlockingQueue 的区别
- 队列大小有所不同:ArrayBlockingQueue 必须指定队列大小,而 LinkedBlockingQueue 默认为 Integer.MAX_VALUE(当元素添加速度大于移除速度时,需要注意一下,以免内存溢出)
- 实现结构不同:ArrayBlockingQueue 采用数组实现、而 LinkedBlockingQueue 采用链表实现
- 由于 ArrayBlockingQueue 采用数组存储队列元素,因此再插入、删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 每次插入都会生成一个新的结点(Node)对象,这会影响日后 GC 垃圾回收
- ArrayBlockingQueue 中添加、删除操作只使用一个锁(ReentrantLock),而 LinkedBlockingQueue 添加、删除操作各使用一个锁,因此 LinkedBlockingQueue 的并发吞吐量大于 ArrayBlockingQueue