大飞老师带你看线程(并发容器—PriorityBlockingQ
本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。
概述
按api上的解释,PriorityBlockingQueue 是有一个带有优先级级别的无界阻塞队列,不支持null元素入列,并且要求队列对象必须为可以比较对象。这点跟PriorityQueue类 类似,区别是PriorityBlockingQueue 带有阻塞功能。
PriorityBlockingQueue 出列具有优先级之分,每次出列返回优先级最高的元素。其底层通过是二叉树最小堆实现,这也导致了遍历队列时,获取到的元素是无序的。
![](https://img.haomeiwen.com/i11401799/451e97c78b5c62e2.png)
内部结构:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue; //存数据
private transient int size; //队列大小
private transient Comparator<? super E> comparator; //队列比较器
private final ReentrantLock lock; //线程安全锁
private final Condition notEmpty; //锁条件
...
}
基本使用
需求:学号越大,优先级越高
public class Student implements Comparable<Student>{
private int num; //学号
private String name;
public Student(int num, String name){
this.num = num;
this.name = name;
}
public int compareTo(Student o) {
return o.num - this.num;
}
public String toString() {
return "Student{num=" + num +", name='" + name +"}";
}
}
public class App {
public static void main(String[] args) throws InterruptedException {
//初始化队列,容量为3
PriorityBlockingQueue<Student> queue = new PriorityBlockingQueue<Student>(3);
queue.offer(new Student(1, "zhangsan"));
queue.offer(new Student(4, "zhaoliu"));
queue.offer(new Student(3, "wangwu"));
queue.offer(new Student(2, "lisi")); //超过3,自动拓展
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take()); //超过容量,阻塞
}
}
上面代码可以大体看出PriorityBlockingQueue的特性,阻塞,无界,带优先级的队列。
源码解析
构造器
//空参数,使用默认的初始容量: 11
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
//单参数, 指定初始容量
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
//2参数数, 指定初始容量与额外的比较器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
入列-offer
PriorityBlockingQueue 是一个无界的队列,所以随便插入不需要进行边界限制,一直返回true
public boolean offer(E e) {
//不允许null入列
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
int n, cap; //操作临时变量
Object[] array;
//判断当前队列元素是否超过队列容量,超过扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap); //尝试扩容,可能会失败
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
//无比较器使用自然排序
siftUpComparable(n, e, array); //根据优先级插入数据
else
//使用指定的比较器
siftUpUsingComparator(n, e, array, cmp);//根据优先级插入数据
size = n + 1;
notEmpty.signal(); //队列非空,唤醒出列阻塞线程
} finally {
lock.unlock(); //释放锁
}
//入列成功返回true
return true;
}
入列一个核心点队列扩容: tryGrow:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //出于性能考虑先释放锁
Object[] newArray = null; //扩容后的数组
//allocationSpinLock 自旋锁变量,默认为0,需要经过原子cas判断之后才会改值此处控制单线程进入if语句
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//扩容大小,<64, +2, 如果大于64, + 50%, 封顶为int最大值-8
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
//扩容出新数组,此处需要queue==array, 原因:已有线程扩容成功不必再扩容
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
//线程不满足allocationSpinLock 自旋锁变量的if判断,表示扩容失败,让出cpu
if (newArray == null)
Thread.yield();
//数据拷贝,不允许同时出列入列,需要获取锁
lock.lock();
//queue==array 如果已经扩容, 此处扩容放弃
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
总结一下上代码, tryGrow的最终的目的是进行扩容,先释放锁是出于性能考虑,比较扩容操作需要消耗一定时间,而期间无法进行入列出列,那队列的并发性就大大打折扣了。但是,如果释放锁之后,那么扩容的安全就需要另辟蹊径了。
所以tryGrow方法采用了cas原子操作方式实现:
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
.....
}
allocationSpinLock 变量最初为0, UNSAFE.compareAndSwapInt方法比较与置换allocationSpinLock =1, 因为compareAndSwapInt原子操作性,导致同一时刻只有一个线程进入执行if语句块。这样也可以达到加锁目的。
无法进入if语句块的线程,则执行Thread.yield();语句,让cpu, 别占着茅坑不拉屎。
if (newArray == null)
Thread.yield();
当幸运线程执行if语句块之后,新队列空间已经开辟好了, 接下来就是数据拷贝,这里就又得注意的,为防止数据拷贝出乱子,又得争夺锁,保证队列数据安全,所有需要重新加锁。
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
入列另外一个核心点,数据存储:siftUpComparable / siftUpUsingComparator
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
PriorityBlockingQueue使用的是二叉树最小堆的方式进行存储数据。
要想理解上面代码, 先得明白二叉树最小堆操作概念:
1>二叉树最小堆:是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于其左子节点和右子节点的值。
2>使用数组来实现二叉堆最小堆,如果把最新入列元素下标设置 i,那么该节点的父节点是i/2。
3>如果父节点的值大于子节点子需要进行交互。
有这个概念之后,我们来看 siftUpComparable代码,进入siftUpComparable方法,先算出父parent的索引,再判断父节点跟入列数据比对,如果父节点数据小于入列数据跳过,寻找上一个父节点。再重复刚刚判断即可,如果父节点数据大于入列数据,则交换。
出列-take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加锁
E result;
try {
//出列失败,等待
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
出列核心方法dequeue:
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0]; //出列剔除数组第一个,也是最小堆的树根
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//树根出列之后,需要向上调整堆结构,弄出新的最小堆。
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
总结
PriorityBlockingQueue是一个无界阻塞队列,出队的元素是优先级最高的元素,而优先级的规则可以自己指定,如果没指定默认使用自然趴下规则。
PriorityBlockingQueue内部通过使用一个二叉树最小堆算法来维护内部数组,这个数组是可扩容的,当前元素个数>=最大容量时候会通过算法(小于64+2 大于64 + 50%)扩容。
想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html