阻塞队列(二)(ArrayBlockingQueue )
2019-11-14 本文已影响0人
逗逼程序员
前言
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其大小在构造函数来决定,确认之后就不能改变了,其内部按照先进先出的原则对元素进行排序,其中put方法和take方法 为添加和删除的阻塞方法。下面通过一个生产者-消费者模式来了解其使用方式:
生产者-消费者模型
public class BlockQueueDemo {
private static final BlockingQueue<Person> QUEUE = new ArrayBlockingQueue<>(50);
public static void main(String[] args) {
new Thread(new Producer(QUEUE)).start();
new Thread(new Consumer(QUEUE)).start();
System.out.println("主线程结束-----------------");
}
}
class Person {
private String name;
private String cardno;
public Person(String name, String cardno) {
this.name = name;
this.cardno = cardno;
}
@Override
public String toString() {
return "name:" + this.name + ",cardno:" + this.cardno;
}
}
class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
//阻塞放入随机产生的对象Person
queue.put(new Person(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
System.out.println("Produce Person-------Queue Size : " + queue.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
//睡眠等待队列被生产者填充完毕
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
try {
//阻塞获取队列中的Person对象
System.out.println("Consumer Person :" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ArrayBlockingQueue的线程阻塞是通过重入锁ReenterLock和Condition条件队列实现的, 所以ArrayBlockingQueue 阻塞等待队列 存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列,而非公平锁,当队列可用时,阻塞的线程将进入争夺资源的竞争中,也就是谁先抢到谁先执行,没有固定的先后顺序,
ArrayBlockingQueue原理概要
ArrayBlockingQueue 的内部通过一个可重入锁ReentrantLock 和两个Condition 条件对象来实现阻塞,内部成员变量如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//底层存储数组
final Object[] items;
//获取数据索引
int takeIndex;
//添加数据索引
int putIndex;
//队列元素个数
int count;
//控制并发访问锁
final ReentrantLock lock;
//notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作
private final Condition notEmpty;
//notFull条件对象,用于通知put方法队列未满,可执行添加操作
private final Condition notFull;
//迭代器
transient Itrs itrs = null;
}
从成员变量上看,ArrayBlockingQueue 内部通过数组对象items来存储数据。通过ReentrantLock 来同时控制添加线程和移除线程的并发访问。而对于notEmpty 条件对象则是用于存放等待或唤醒take 方法的线程。告诉他们队列已有元素,可以执行获取元素。
下面分析一下 阻塞的添加方法:
public void put(E e) throws InterruptedException {
//判断新增元素不为null
checkNotNull(e);
//获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//元素个数等于数组长度无法添加
while (count == items.length)
//当前线程挂起,添加到等待队列等待唤起
notFull.await();
//直接进去队列
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
put 方法是一个阻塞方法,如果队列元素已满,那么当前线程会被notFull 条件对象挂起加到等待队列中,直到队列有空才会唤起添加操作。
1.png