2.BlockingQueue综合分析
BlockingQueue
BlockingQueue
是一个线程安全的阻塞队列,一般是FIFO(先进先出),是各种具体队列实现的接口,适用于Java编程中生产者-消费者场景,可以构造有界队列及无界队列。提供了添加元素的接口方法add(), put(), offer()
,获取元素方法take(), poll()
,具体的队列实现,我们看下文。
BlockingQueue
最合适的使用场景就是为线程间执行提供同步手段,作为线程执行数据的存放容器。
其实这里的无界队列容量也会默认填充一个最大值
Integer.MAX_VALUE
,下文案例中会有代码体现
1、ArrayBlockingQueue
ArrayBlockingQueue
内部是利用数组实现的线程安全的有界阻塞队列,FIFO(先进先出)模式添加删除元素。内部通过ReentrantLock
和两个Condition(notEmpty、notFull)
实现线程安全。但这里ReentrantLock有公平与非公平之分,保证等待的线程获取锁的一个顺序是否是竞争还是按序进行,这里很好理解。至于ReentrantLock在锁一章节中会详细介绍。
两个Condition(notEmpty、notFull)
分别维护等待take(获取元素)和等待put(插入元素)的线程队列,内部也是单链表实现结构,此内容细节也会在Condition一节会有详细介绍,此处做个简单的了解。
两个Condition之间相互唤醒:
final ReentrantLock lock; // 全局锁
private final Condition notEmpty; // 等待获取元素的线程队列
private final Condition notFull; // 等待插入元素的线程队列
- 队列为空时,notEmpty等待,一旦插入元素了,唤醒notEmpty中线程
- 队列满时,notFull等待,一旦取出元素了,唤醒notFull中线程
![](https://img.haomeiwen.com/i12071549/de34feddc8c35e74.jpg)
图中takeIndex与putIndex分别向后移动,保证先进先出。当putIndex移动到index=Array.length长度时,重新指向index=0,因为通过count能够知道前面已经被消费了,可以重新插入数据了。这个时候,takeIndex指向的并不是index=0的位置,因此也需要等到takeIndex重新回指导index=0的时候才能被消费,保证了FIFO。
现在分别看两段底层插入代码片段就一目了然了(其他的出队与入队操作最终都是调用的这里,当然peek除外)。
/** 入队操作,此处会进行加锁及等待以保证线程安全 */
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 若队列已满,阻塞等待
while (count == items.length)
notFull.await();
// 调用最终的入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
/** 入队操作,此处会进行加锁及等待以保证线程安全 */
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 若队列为空,阻塞等待
while (count == 0)
notEmpty.await();
// 调用最终的出队操作
return dequeue();
} finally {
lock.unlock();
}
}
/** 入队操作,私有方法 */
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; // 将元素插入对应的putIndex位置
// 如果此时下一个putIndex==队列长度,则重新回指导队列首
// 此处如何保证队列首元素一定被出队了呢,结合分析上面出发方法put里有一个 while (count == items.length)的逻辑判断保证了这一点
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 唤醒notEmpty队列中的线程
}
/** 出队操作,私有方法 */
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 如果此时下一个putIndex==队列长度,则重新回指导队列首,此处也就是先消费了整个队列的元素才能回指,因此也保证了FIFO
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 唤醒notEmpty队列中的线程
return x;
}
因为继承了Collection,所以ArrayBlockingQueue也实现了自己的迭代器Itrs
2、PriorityBlockingQueue
PriorityBlockingQueue
是一个利用数组(二叉堆)实现的线程安全的无界优先级阻塞队列,这里有两个个核心的概念:二叉堆、优先级
;
不管三七二十一,先看一段代码执行的结果
@Test
public void testOne(){
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue(50, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1-o2;
}
});
queue.add(50);
queue.add(10);
queue.add(3);
queue.add(7);
queue.add(50);
queue.add(49);
queue.add(23);
queue.add(12);
queue.add(19);
queue.add(70);
queue.add(199);
queue.add(19);
for(Object i : queue.toArray()){
System.out.print(i + ", ");
}
}
# 输出:
3, 7, 10, 12, 50, 19, 23, 50, 19, 70, 199, 49
从以上测试及结果分析:
构造队列时指定了容量为50,优先比较规则:o1与o2大小;初步看上去输入无序,输出也无序。
其实这里输出正是满足了二叉堆
特性。那下面先大致介绍一下二叉堆,这有利于后面分析PriorityBlockingQueue
。
二叉堆
二叉堆本质是完全二叉树的一种,只不过它保持着“父节点要么大于子节点 或者 父节点要么小于子节点”的特性,对于二叉堆任何层级的节点都满足该特性,即全局如此。
那么依然看下上面的输出结果:
3, 7, 10, 12, 50, 19, 23, 50, 19, 70, 199, 49
该结构在PriorityBlockingQueue中就是一个数组,用数组描叙就是:
queue[0] = 3, queue[1] = 7, queue[2] = 10, queue[3] = 12, queue[4] = 50, queue[5] = 19, queue[6] = 23, queue[7] = 50, queue[8] = 19, queue[9] = 70, queue[10] = 199, queue[11] = 49
那么跟二叉堆有啥关系呢,其实PriorityBlockingQueue就是利用数组存储的二叉堆,我们继续下面操作
将其直接构造成二叉堆结构就如下(傻瓜式平铺就好了):
![](https://img.haomeiwen.com/i12071549/200d3e8a0c3d4ad1.jpg)
这里就能很自然的看出来该二叉树满足“父节点要么小于子节点”特性,(当然如果将上叙程序改变下比较规则:O2-O1,那么构造出来的二叉堆就会是满足“父节点要么大于子节点”另一个规则了)。
其实上面说的数组存储二叉堆,从上面二叉树结构已经找出他们的关系了,那就是:二叉堆节点在数组中的位置k(k是节点在数组中的下标), 对应的子节点在数组中的位置对应是 2k + 1 和 2k + 2
其实只要是有一定规则的二叉树,当插入一个元素的时候,为了保持规则,都需要进行一定的调整,因为二叉堆无非满足两种规则,因此也就涉及两种调整:向上调整 与 向下调整,这里对应到程序就是heapify()方法里的siftDownComparable和siftDownUsingComparator
了。
/** 构造二叉堆 */
private void heapify() {
Object[] array = queue;
int n = size;
// n指数组中的下标, >>> 或 <<< 表示数据在进行无符号的左右移动, 每移动一位就等于整除2
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
for (int i = half; i >= 0; i--)
// 从最后一颗树开始, 将二叉树的最小值放置在parent位置, 直到全局都如此
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
二叉堆简单介绍这么多,已经不影响后面PriorityBlockingQueue相关内容了。
回到PriorityBlockingQueue
上面把PriorityBlockingQueue核心结构二叉堆介绍了,这里直接从PriorityBlockingQueue的出队、入队方法进行一步步分析讲解
1、入队
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock; // 获取锁,保证安全
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length)) // 判断是否需要扩容
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator; // 获取比较器,若无自定义,取默认
if (cmp == null)
siftUpComparable(n, e, array); // 向上调整
else
siftUpUsingComparator(n, e, array, cmp); // 利用自定义比较器向上调整
size = n + 1;
notEmpty.signal(); // condition唤醒队列中等待的其他线程
} finally {
lock.unlock();
}
return true;
}
这里简要步骤如下:
- 1、获取全局锁,以保障原子操作
- 2、入队前,判断是否需要扩容
- 3、根据比较器进行元素的向上调整,此处就是对二叉堆进行调整
- 4、对condition队列中等待的线程进行唤醒
2、扩容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 先释放锁
Object[] newArray = null;
// CAS方式获取乐观锁(此处释放全局锁,获取乐观锁,主要是为了保证扩容操作的性能)
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
try {
// 小于64,+2,否则 >>1
int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
// 若扩容后超过最大配置,则退一步
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
// 做个校验保证
if (minCap < 0 || minCap > MAX_ARRAY_SIZE){
throw new OutOfMemoryError();
}
// 直接设置最大值
newCap = MAX_ARRAY_SIZE;
}
// 若此时数组还未发生变化(因为其他线程也可能在操作queue),则直接新建newArray
if (newCap > oldCap && queue == array){
newArray = new Object[newCap];
}
} finally {
allocationSpinLock = 0; // 释放乐观锁
}
}
// 如果newArray为空,说明其他线程操作了queue,而这也必然触发扩容,因此让出cpu
// 其实这里有点不好理解,如果newArray==null,说明上面(newCap > oldCap && queue == array)条件不成立,即代表有其他线程操作了queue,而这里其他线程操作queue时候,必然也会遇到当前线程一样的需要扩容操作,因此就直接让出cpu即可。
if (newArray == null)
Thread.yield();
// 否则重新获取锁,进行扩容后的数据copy
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
其实扩容操作最核心的逻辑就是释放全局锁,获取乐观锁,在判断queue是否有其他线程在操作,只有这些都完成之后,在最后才去竞争全局锁,去自己扩容并拷贝原来的数据。
3、siftUpComparable向上调整
比较当前位置K插入值X,与其parant大小,如果过k<parent,则进行调整; 直到 K>=parent为止;
siftDownComparable逻辑相反,此处就不再介绍。
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0) // 若X>parent,则停止,否则进行siftUp处理
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
4、出队
public E poll() {
final ReentrantLock lock = this.lock; // 获取锁
lock.lock();
try {
return dequeue(); // 出队,获取队列首元素
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0]; // 获取并返回第一个元素
E x = (E) array[n]; // 获取最后一个元素
array[n] = null;
Comparator<? super E> cmp = comparator;
// 将获取的最后一个元素放到首位置, 然后进行siftDown堆化操作
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
其实上面注释已经很清楚了,这里只需要理解两个核心步骤即可:
- 取出队列首元素
- 将最后一个元素放置到首元素位置后,进行二叉堆的堆化操作(这里是向下堆化,即直到 K<=parent为止)。
其他remove、take等操作都类似,主要记住一点就是:只要队列元素有变化,就必须对二叉堆进行相应的堆化操作,以保证结构。
5、场景
很明显支持优先队列的场景,即入队元素满足一定的优先级,按此优先级保证消费顺序。
3、DelayQueue
看名字DelayQueue
就知道是延迟队列,但是它内部不是直接通过数组或者链表,而是直接用了PriorityQueue作为自身存储结构,而PriorityQueue采用的是数组结构存储,因此DelayQueue内部也是采用数组存储。通过这么简单的解释就能清楚它有两个特性:延迟+优先级
那么理解<u>延迟+优先级</u>的在其中的作用就比较重要了:
-
延迟:
延迟消费的意思,即从队列中取出元素需要校验<u>延迟时间</u>,满足了才返回。 -
优先级:
按照一定的比较逻辑(此处一般是延迟时间)进行顺序存放,其实这里采用的是二叉堆结构,不理解二叉堆的可以在PriorityBlockingQueue一节中了解。这样才能保证每次取出的元素必然是最靠近延迟时间到达的元素,才能保证消费按照延迟序列进行。
话不多说,线上demo(大部分时候一个简单的demo就能胜过千言万语,无言胜有言嘛)
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<Item> delayQueue = new DelayQueue<>();
delayQueue.add(new Item("a", 2L, System.currentTimeMillis()));
delayQueue.add(new Item("b", 1L, System.currentTimeMillis()));
delayQueue.add(new Item("c", 3L, System.currentTimeMillis()));
delayQueue.add(new Item("d", 6L, System.currentTimeMillis()));
delayQueue.add(new Item("e", 1L, System.currentTimeMillis()));
delayQueue.add(new Item("f", 4L, System.currentTimeMillis()));
delayQueue.add(new Item("g", 10L, System.currentTimeMillis()));
delayQueue.add(new Item("h", 9L, System.currentTimeMillis()));
print(delayQueue);
new Thread(()-> consume(delayQueue),"消费线程1").start();
new Thread(()-> consume(delayQueue),"消费线程2").start();
new Thread(()-> consume(delayQueue),"消费线程3").start();
}
private static void print(DelayQueue<Item> delayQueue) {
Iterator<Item> it = delayQueue.iterator();
System.out.print("二叉堆数组结构: ");
while(it.hasNext()){
System.out.print( it.next().key + ",");
}
}
private static void consume(DelayQueue<Item> delayQueue) {
while (delayQueue.size() > 0) {
try {
System.out.println(Thread.currentThread().getName() + ": " + delayQueue.take().getKey());
} catch (InterruptedException e) {
}
}
}
@AllArgsConstructor
@Data
static class Item implements Delayed{
private String key;
private long exprieTime;
private long beginTime;
@Override
public long getDelay(TimeUnit unit) {
return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - beginTime);
}
@Override
public int compareTo(Delayed o) {
return this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS) ? 1 : 0;
}
}
}
看输出:
二叉堆数组结构: b,e,c,d,a,f,g,h
消费线程1: b
消费线程2: e
消费线程2: a
消费线程1: c
消费线程3: f
消费线程2: d
消费线程1: h
消费线程3: g
哈,看数组结构,就很清楚已经按照了优先级进行处理了,并且这里每次出队一个元素就要进行相应的堆化操作
那么接下来分析代码:
1、核心属性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
理解了上面几个属性的作用,基本上也就理解了<u>DelayQueue</u>的一个设计了
- ReentrantLock lock: 重入锁,保证操作安全
- PriorityQueue<E> q:优先级队列,内部用数组存储延迟队列元素
- Thread leader:指向的是第一个从队列获取元素阻塞等待的线程,目的是为了减少其他区线程不必要的等待时间
- Condition available:唤醒条件,这里入队会唤醒,出队更换leader线程时也换唤醒,具体看后面代码。
2、入队操作
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 元素入队,其实这里是PriorityQueue的入队操作,因此还会涉及到堆化操作,类似思路见PriorityBlockingQueue
q.offer(e);
// 若此时队列中首元素就是刚插入的,则将等待锁指向null,唤醒所有等待的线程。
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
注释简单明了,不多做解释。
3、出队操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 如果队列为空,阻塞等待
if (first == null)
available.await();
// 否则需要校验取出的元素是否满足延迟
else {
long delay = first.getDelay(NANOSECONDS);
// 若延迟满足条件,直接取出返回
if (delay <= 0)
return q.poll();
// 否则置为空,这里也是防止内存溢出吧,
// 因为多个线程会同时保持对first的引用,从而导致无法回收。
first = null;
// 如果leader不为空,当前线程就直接阻塞掉(毕竟leader才能获取元素)
if (leader != null)
available.await();
else {
// 如果leader为空,那就拿当前线程作为leader,然后等待delay时长
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 一般在这里会释放掉leader的
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader为空并且队列不为空的情况下,通知其他的线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
相比入队操作要复杂多了,因为这里涉及到delay的一个判断跟当前等待线程的一个置换。其实这里逻辑很简单,只要保证只有leader线程能获取元素就好了,同时如果是自己获取完,释放leader即可。
4、LinkedTransferQueue
0、概要(异性等待)
LinkedTransferQueue
就像一对异性(读写)朋友,当没有异性在队列中等待自己的话,自己就入队等待其他异性,否则直接匹配牵手成功,不用等待了,解放彼此。
话不多说,还是先来演示个小的demo,从demo入手
1、消费跟生产者同时存在
LinkedTransferQueue<Integer> q = new LinkedTransferQueue();
new Thread(()->{
try {
while (true){
System.out.println(Thread.currentThread().getName() + "准备出队...");
Thread.sleep(1000);
Integer a = q.take();
System.out.println(Thread.currentThread().getName() + "出队完成,值为:" + a);
}
} catch (Exception e) {
e.printStackTrace();
}
},"线程B").start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "准备入队...");
Thread.sleep(2000);
q.transfer(1);
System.out.println(Thread.currentThread().getName() + ": 1入队完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程A").start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "准备入队...");
Thread.sleep(2000);
q.transfer(2);
System.out.println(Thread.currentThread().getName() + ": 2入队完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程C").start();
结果
线程B准备出队...
线程A准备入队...
线程C准备入队...
线程A: 1入队完成
线程B出队完成,值为:1
线程B准备出队...
线程C: 2入队完成
线程B出队完成,值为:2
线程B准备出队...
2、只有生产者
LinkedTransferQueue<Integer> q = new LinkedTransferQueue();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "准备入队...");
Thread.sleep(2000);
q.transfer(1);
System.out.println(Thread.currentThread().getName() + ": 1入队完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程A").start();
结果
线程A准备入队...
3、只有消费者
LinkedTransferQueue<Integer> q = new LinkedTransferQueue();
new Thread(()->{
try {
while (true){
System.out.println(Thread.currentThread().getName() + "准备出队...");
Thread.sleep(1000);
Integer a = q.take();
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + "出队完成,值为:" + a);
}
} catch (Exception e) {
e.printStackTrace();
}
},"线程B").start();
结果
线程B准备出队...
从上面结果可以看出
-
1、当生产者跟消费者都同时存在的情况下,入队出队都能正常进行,只不过要出队一次才能入队,入队一次才能出队
-
2、当只具备一方(生产者或者消费者)时,都不能进行下去,只能等待,哪怕入队也是如此。
以上功能就是LinkedTransferQueue
具备SynchronousQueue
的功能,当然LinkedTransferQueue
还具LinkedBlockingQueues
功能(这一点这里就不做详细演示)。
LinkedTransferQueue
在这点上是一个阻塞模式,跟SynchronousQueue
类似,不存元素;
当生产者入队时,如果发现没有消费者在等待(源码上就是队列的最后一个Node模式是生产模式,那就创建一个新的Node入队,当前生产线程绑定到该Node进行等待);此时如果消费者线程进入,就会直接判断当前获取的node是否是含有数据的(isData==true),如果是,直接取出即可,若不是,走下面的逻辑。
消费者到队列中出队元素时,如果发现队列是空,则会生成一个null节点,然后阻塞等待生产者,后面如果生产者入队时发现有一个消费者元素节点,直接将元素填充到该节点Node上,唤醒该节点的消费者线程,被唤醒的消费者线程直接出队即可。
下面直接那代码分析一下(先重点介绍一下特性):
- transfer(E e):入队操作,若当前存在一个正在等待获取的消费者线程,立刻填充其node节点返回;否则,会生成node结构插入到队列尾部,进入阻塞状态,一直到消费者线程取走该元素
- tryTransfer(E e):入队操作,transfer的尝试操作,存在等待的消费者线程则转交之,否则返回不入队
- tryTransfer(E e, long timeout, TimeUnit unit):在tryTransferr(E e)增加一个等待时间,若timeout时间内都没有消费者线程取数据,则返回,不入队
- hasWaitingConsumer():判断是否存在消费者线程
- getWaitingConsumerCount():获取所有等待获取元素的消费线程数量
![](https://img.haomeiwen.com/i12071549/2f9f1429db140f5a.jpg)
4、源码分析
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) { // 核心逻辑,入队操作
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // 构造一个node节点
retry:
for (;;) { // 循环匹配处理
for (Node h = head, p = h; p != null;) { // 头节点处理:find & match first node
boolean isData = p.isData; // 头节点处理
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
// 这里是关键,如果两者模式一样,则不能匹配,为啥?
// 因为生产者跟消费者操作都需要入队,即队列中存在的是对应的操作动作,如果isData为true,代表当前队列的头节点是一个生产者节点,存在相应的数据,那么消费者线程可以直接匹配获取;如果isData为false,代表当前队列的头节点是消费者节点,那么生产者线程可以直接交付数据;(一句话概括就是:如果队列中正在等我的是另一个线程,就可以直接转移了),其他情况只能入队等待啦。
if (isData == haveData) // can't match
break;
// 这里再尝试匹配一次
if (p.casItem(item, e)) { // match
// 循环处理,这里多个线程在竞争头节点哦,所以cas了下
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// // 唤醒等待的线程
LockSupport.unpark(p.waiter);
// 返回匹配的元素
return LinkedTransferQueue.<E>cast(item);
}
}
// 一次循环没有成功,或者说当前head已经被其他线程匹配掉了,那重新来过
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 终究没有成功,只能入队了(当前头节点类型跟自己一致)
// 如果不是立即返回,那就看异同步了
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
// 尝试入队
Node pred = tryAppend(s, haveData);
if (pred == null)
// 如果当前节点的上一个节点没了,就再尝试去匹配
continue retry; // lost race vs opposite mode
// 同步==阻塞等待,相当入队了,你给我等着吧
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
上面注释应该很清楚,入队出队都是这个方法,其中isData是模式判断,用于是否匹配操作;how是执行方式,是同步、异步等;nanos是超时时间啦。简单总结如下
(1)入队或者出队,先查看队列头的节点模式(判断isData是否跟自己不是一样,不一样就能匹配了);
(2)如果模式不一样,就尝试让他们匹配(这其中会有其他线程也在操作,可能头节点被其他线程匹配走了,所以会持续往下,就是代码中的循环操作,知道队列尾部)
(3)如果模式一样,那就代表没有异性在等待自己了,或者到链表尾了,尝试入队等待吧;
(4)入队时也可能其他线程在操作哦,所以如果链表尾节点被修改了,那就再重新尝试入队,继续循环;
(5)入队成功了,就说明真没希望了,只能等了,等其他线程唤醒自己吧;‘
(6)唤醒了就说明有机会匹配了,仅仅是有机会哈,继续循环匹配吧。
NOW:立即返回,不管有没有匹配到,不做入队操作
如:poll()、tryTransfer(e)
ASYNC:异步操作,入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
如:add(e)、offer(e)、put(e)、offer(e, timeout, unit)
SYNC:同步操作,元素入队后当前线程阻塞,一直等待对方到来被匹配到
如:take()、transfer(e)
TIMED:超时机制,入队会等待一段时间被匹配(等待对方的到来哦),超时还没匹配到就返回
如:poll(timeout, unit)、tryTransfer(e, timeout, unit)
5、LinkedBlockingDeque
LinkedBlockingDeque
是Double Ended Queue的缩写,基于链表实现的线程安全的双端阻塞队列,即可以从头部或者尾部去插入或者获取节点数据。默认构造长度为Integer.MAX_VALUE,当然长度可以在构造时指定。
其内部维护了一个全局独占锁,和两个Condition对象,用来阻塞和唤醒线程,以保证线程安全。
// 独占锁
final ReentrantLock lock = new ReentrantLock();
// 非空Condition
private final Condition notEmpty = lock.newCondition();
// 非满Condition
private final Condition notFull = lock.newCondition();
![](https://img.haomeiwen.com/i12071549/f398b5954e1e2272.jpg)
相应操作支持三种策略:
- block 如果无法立即返回元素,则阻塞直到队列可以获取
- block with timeout 如果无法立即返回元素,则超时阻塞一定时长,等待获取
- special value 如果无法立即返回元素,则返回true、false、null等
- throw exception 如果无法立即返回元素,则抛出指定异常
block | block with timeout | special value | throw exception |
---|---|---|---|
putXxx | offerXxx(TimeUint) | offerXxx(O) | addXxx |
takeXxx | pollXxx(TimeUint) | pollXxx() | getXxx |
peekXxx() | removeXxx |
集中看一下代码
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>{
/** 节点 */
static final class Node<E> {
/** 节点内容 */
E item;
/** 上一个节点 */
Node<E> prev;
/** 下一个节点 */
Node<E> next;
}
/** 队列首节点 */
transient Node<E> first;
/** 队列尾节点 */
transient Node<E> last;
/** 队列节点数=队列长度,对外通过size()方法获取 */
private transient int count;
/** 队列最大容量 */
private final int capacity;
/** 主锁 */
final ReentrantLock lock = new ReentrantLock();
/** 非空条件 */
private final Condition notEmpty = lock.newCondition();
/** 非满条件 */
private final Condition notFull = lock.newCondition();、
/**
* 头部插入一个节点(内容),如果满了返回false
* 私有方法,入队操作方法之一
* 操作方向:队列头部
* 唤醒:notEmpty
*/
private boolean linkFirst(Node<E> node)
/**
* 尾部插入一个节点(内容),如果满了返回false
* 私有方法,入队操作方法之一
* 操作方向:队列尾部
* 唤醒:notEmpty
*/
private boolean linkLast(Node<E> node)
/**
* 头部插入一个节点(内容),如果空了返回false
* 私有方法,出队操作方法之一
* 操作方向:队列头部
* 唤醒:notFull
*/
private E unlinkFirst()
/**
* 尾部移除一个节点(内容),如果空了返回false
* 私有方法,出队操作方法之一
* 操作方向:队列尾部
* 唤醒:notFull
*/
private E unlinkLast()
/**
* 移除指定节点(内容)
* 唤醒:notFull
*/
void unlink(Node<E> x)
/**
* 头部插入一个节点(内容)
* 操作方向:队列头部
* 策略:不成功抛出异常
*/
public void addFirst(E e)
/**
* 尾部插入一个节点(内容)
* 操作方向:队列尾部
* 策略:不成功抛出异常
*/
public void addLast(E e)
/**
* 头部插入一个节点(内容)
* 操作方向:队列头部
* 策略:返回true或者false
*/
public boolean offerFirst(E e)
/**
* 尾部插入一个节点(内容)
* 操作方向:队列尾部
* 策略:返回true或者false
*/
public boolean offerLast(E e)
/**
* 头部插入一个节点(内容)
* 操作方向:队列头部
* 策略:阻塞等待
*/
public void putFirst(E e) throws InterruptedException
/**
* 尾部插入一个节点(内容)
* 操作方向:队列尾部
* 策略:阻塞等待
*/
public void putLast(E e) throws InterruptedException
/**
* 头部插入一个节点(内容)
* 操作方向:队列头部
* 策略:阻塞超时等待
*/
public boolean offerFirst(E e, long timeout, TimeUnit unit)throws InterruptedException
/**
* 尾部插入一个节点(内容)
* 操作方向:队列尾部
* 策略:阻塞超时等待
*/
public boolean offerLast(E e, long timeout, TimeUnit unit)throws InterruptedException
/**
* 头部移除一个节点(内容)
* 操作方向:队列头部
* 策略:失败抛出异常
*/
public E removeFirst()
/**
* 尾部移除一个节点(内容)
* 操作方向:队列尾部
* 策略:失败抛出异常
*/
public E removeLast()
/**
* 头部移除一个节点(内容)
* 操作方向:队列头部
* 策略:返回true或者false
*/
public E pollFirst()
/**
* 尾部移除一个节点(内容)
* 操作方向:队列尾部
* 策略:返回true或者false
*/
public E pollLast()
/**
* 头部移除一个节点(内容)
* 操作方向:队列头部
* 策略:阻塞等待
*/
public E takeFirst() throws InterruptedException
/**
* 尾部移除一个节点(内容)
* 操作方向:队列尾部
* 策略:阻塞等待
*/
public E takeLast() throws InterruptedException
/**
* 头部移除一个节点(内容)
* 操作方向:队列头部
* 策略:阻塞超时等待
*/
public E pollFirst(long timeout, TimeUnit unit)throws InterruptedException
/**
* 尾部移除一个节点(内容)
* 操作方向:队列尾部
* 策略:阻塞超时等待
*/
public E pollLast(long timeout, TimeUnit unit)throws InterruptedException
/**
* 获取首节点(内容),非移除
* 操作方向:队列头部
* 策略:失败抛出异常
*/
public E getFirst()
/**
* 获取尾节点(内容),非移除
* 操作方向:队列尾部
* 策略:失败抛出异常
*/
public E getLast()
/**
* 获取首节点(内容),非移除
* 操作方向:队列头部
* 策略:失败返回null
*/
public E peekFirst()
/**
* 获取尾节点(内容),非移除
* 操作方向:队列尾部
* 策略:失败返回null
*/
public E peekLast()
/**
* 从头开始遍历,移除指定节点
* 策略:返回true或者false
*/
public boolean removeFirstOccurrence(Object o)
/**
* 从尾开始遍历,移除指定节点
* 策略:返回true或者false
*/
public boolean removeLastOccurrence(Object o)
/**
* 默认首部添加
* 策略:失败异常
*/
public void push(E e)
/**
* 默认首部移除
* 策略:失败异常
*/
public E pop()
/** ========================================================== */
/** 剩下的方法基本都是从父类继承,与其他结构类似,此处不做详细介绍了,如: */
/**
* 继承至AbstractQueue,默认尾部添加
* 策略:失败异常
*/
public boolean add(E e)
/**
* 继承至Queue,默认尾部添加
* 策略:返回true或者false
*/
public boolean offer(E e)
/** ========================================================== */
/** 拆分迭代器 */
static final class LBDSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final LinkedBlockingDeque<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
long est; // size estimate
public long estimateSize() { return est; }
/** 此处拆分以步进的方式进行,每次多1,直到整个queue元素全部拆完,下面以代码演示比较直观 */
public Spliterator<E> trySplit()
public void forEachRemaining(Consumer<? super E> action)
public boolean tryAdvance(Consumer<? super E> action)
public int characteristics()
}
}
分割器测试效果
@Test
public void testSplit() throws InterruptedException {
BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(1000);
Spliterator s = deque.spliterator();
for(int i =0; i<100; i++){
deque.put(i);
}
while(true){
System.out.println();
s.trySplit().forEachRemaining(k -> System.out.print(k+","));
}
}
## 输出
0,
1,2,
3,4,5,
6,7,8,9,
10,11,12,13,14,
15,16,17,18,19,20,
21,22,23,24,25,26,27,
28,29,30,31,32,33,34,35,
36,37,38,39,40,41,42,43,44,
45,46,47,48,49,50,51,52,53,54,
55,56,57,58,59,60,61,62,63,64,65,
66,67,68,69,70,71,72,73,74,75,76,77,
78,79,80,81,82,83,84,85,86,87,88,89,90,
91,92,93,94,95,96,97,98,99,
6、LinkedBlockingQueue
<u>LinkedBlockingQueue</u>是FIFO队列,基于链表实现的线程安全的先进先出阻塞队列,即只可以从头部获取元素,尾部插入元素。默认构造长度为Integer.MAX_VALUE,当然长度可以在构造时指定。
明白了上面介绍的LinkedBlockingDeque,理解LinkedBlockingQueue是非常简单的,其实这里LinkedBlockingDeque功能覆盖了LinkedBlockingDeque功能,其putLast(E e) 和takeFirst()就可以模拟其FIFO效果。
当然主要差异是LinkedBlockingDeque是双向链表,而LinkedBlockingQueue是单向链表。
![](https://img.haomeiwen.com/i12071549/d10ad1ff51c37df1.jpg)
其内部维护了一个全局独占锁,和两个Condition对象,用来阻塞和唤醒线程,以保证线程安全。
/** 取元素锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 非空条件 */
private final Condition notEmpty = takeLock.newCondition();
/** 存元素锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 非满条件 */
private final Condition notFull = putLock.newCondition();