DelayQueue源码学习总结
为了加深印象,边看源码边写总结吧,我写的不好,如果你万一搜到这篇,可以马上关闭了。去看其他的文章哈。
1 DelayQueue的类信息及内部属性:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();
1.通过类信息我们发现,DelayQueue继承了AbstractQueue和实现了BlockingQueue,也就是说具备阻塞队列的所有属性。
2.类定义上家了泛型<E extends Delayed>,也就说他只接收实现Delayed接口的元素。
3.lock 锁应用的是ReentrantLock,可重入锁,它所有的入队出队等等操作用的都是用这个锁。
4.PriorityQueue<E> q:优先队列,用这个队列存储元素,优先队列就是基于最小二叉堆的实现,底层存储方式是数组,可以看看PriorityQueue的源码,注意这句,children of queue[n] are queue[2n+1] and queue[2(n+1)].意思是数组n上的左子树和右子树的位置分别是2*n+1和2*n+2,也就是这个树是通过数组实现的。你要写堆排序或者top k的的时候,可以借鉴他的源码。
想了解PriorityQueue,请参考PriorityQueue介绍,想了解堆排序,请参考堆排序
好了,用优先队列的目的我们就知道了,就是想用他的每次get出来的都是最小元素的功能白。
5.Thread leader 这个在注释里面着重介绍了是Leader-Follower模式,其实目的就是解决线程争用的问题,这个后面分析具体代码的时候再说吧。
6.Condition available:大家应该都知道,就是等待某个条件被唤醒白,ReentrantLock+Condition的用法如果不明白,建议找个源码解读,对着源码一起理解理解,这里放个经典的图:
2 代码解读
2.1 offer方法
反正不管是add、put、offer(E e, long timeout, TimeUnit unit)调用的都是offer(e)这个方法。就看这个就行了,下面贴下源代码:
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); //调用PriorityQueue的offer方法
if (q.peek() == e) {
leader = null;
available.signal(); //唤醒condition上的线程
}
return true;
} finally {
lock.unlock();
}
}
--------下面是PriorityQueue的代码---------------
/**
* Inserts the specified element into this priority queue.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be
* compared with elements currently in this priority queue
* according to the priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1); //扩容
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e); //调整二叉小顶堆结构
return true;
}
/**
* Inserts item x at position k, maintaining heap invariant by
* promoting x up the tree until it is greater than or equal to
* its parent, or is the root.
*
* To simplify and speed up coercions and comparisons. the
* Comparable and Comparator versions are separated into different
* methods that are otherwise identical. (Similarly for siftDown.)
*
* @param k the position to fill
* @param x the item to insert
*/
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x); //如果设置了comparator 就用设置的比较方法
else
siftUpComparable(k, x); //如果没有设置就x定义的comparator 方法
}
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; //为啥这么一个技术公式就能计算出来父节点
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
@SuppressWarnings("unchecked")
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
代码很简单,就是先要抢锁(抢不到会怎么办?需要你去看ReentrantLock源码哦)执行,然后调用PriorityQueue的offer方法去调整堆结构了。
好了,我们分析下这个调整方法siftUp,非常有意思,
1.1 最小堆首先是一个二叉树,父节点的值要小于他的左子节点和右子节点,用数组表示,假设节点在数组的下标是n,那它的左右子节点再数组的位置分别为2n+1和2n+2。
1.2 每次增加元素时,都是放到数组的最后一个位置,那这个位置肯定是某个节点左子节点或者右子节点。
1.3 现在我们已知子节点的位置了,我们要找出来父节点的位置,完了比较下大小,调整堆结构,把每个值都放到他合理的位置上去。
完了咱们看代码,咦,这代码怎么看也不像是把元素放到最后一个位置,完了交换啊。反正我比较笨,看了好久才看懂,尤其是这句: int parent = (k - 1) >>> 1; 大家知道>>>相当于无符号右移,相当于除2,k传入的是数组当前的长度size,就是要插入元素的位置,可是这个位置既有可能是左子节点,又有可能是右子节点,怎么可能这一个计算公式就算出来他的父节点呢?其实这个跟int型的除法机制有关系,我们知道,子节点的位置要不是在偶数节点上,要不再奇数几点上,举个例子:假设节点位置是3,那他的左子节点就是2*3+1=7,右子节点是2*3+2=8,现在开始根据代码里面的算法反推,(7-1)/2=3,(8-1)/2=3.5,注意我们是int型的,那后面这个得到的结果也应该是3.也就说通过这一个公式就能获得父节点了,哈哈哈。
剩下的代码就简单了,就是比较当前父节点和子节点的大小,把要插入的元素调整到合适的位置上去。
2.2 take方法
取出的看take方法吧,其他的poll、poll(long timeout, TimeUnit unit)实现逻辑上都差不多,贴源码:
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //可中断的锁
try {
for (;;) {
E first = q.peek(); //取root
if (first == null)
available.await(); //没有当前线程wait
else {
long delay = first.getDelay(NANOSECONDS); //取得时间差值 ---getDelay是插入的对象实现的方法
if (delay <= 0)
return q.poll(); //符合条件 从优先队列里面取出root节点 返回 代码在下面贴出来了
first = null; // don't retain ref while waiting
if (leader != null) //当前线程不是leader,wait住,保证只有一个线程在搞事情
available.await();
else { //队列中数据既不为空 root节点还不符合条件 leader ==null
Thread thisThread = Thread.currentThread();
leader = thisThread; //设置当前线程为leader
try {
available.awaitNanos(delay); //当前线程休眠指定的时间
} finally { //return前执行的方法
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); //当前线程处理结束 并且队列的数据不空 唤醒condition上等待的一个线程
lock.unlock();
}
}
--------下面是PriorityQueue的代码---------------
@SuppressWarnings("unchecked")
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0]; //取root节点
E x = (E) queue[s]; //取最后一个子节点
queue[s] = null;
if (s != 0)
siftDown(0, x); //重新排序
return result;
}
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x); //如果设置了comparator 就用设置的比较方法
else
siftDownComparable(k, x); //如果没有设置就x定义的comparator 方法
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; //计算非叶子节点数 // loop while a non-leaf
while (k < half) { //k=0 开始的循环
int child = (k << 1) + 1; // assume left child is least 找到他的左子树
Object c = queue[child];
int right = child + 1; //找到右子树
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) //左子树比右子树大
c = queue[child = right]; //定位到右子树
if (key.compareTo((E) c) <= 0) //要找到位置的值跟左右子树中小的那个比较,如果小 就退出
break;
queue[k] = c;//把小的那个节点向上层移动
k = child;//移动的位置空了哈,作为下一次循环的起点
}
queue[k] = key;
}
@SuppressWarnings("unchecked")
private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
我把源码统一注释了一遍,总体来说,就是你要先抢到锁,才能有机会去取最小堆的root,没有满足条件数据的话,就要看你是不是leader了,如果是,你就休眠有限的时间,再去被唤醒取得资源,如果你不是leader,不好意思,你只能进入condition队列,等待被唤醒了。
那什么情况下,非leader线程会抢到锁呢?这个还是需要你先理解 await()方法,在condition的await方法里面,是先要释放锁,完了才通过LockSupport.park方法去wait住当前线程。既然释放了锁,那其他线程都有机会抢锁了。也就是说,正是由于await的机制,我们会造成线程的争用,所以作者搞了个Leader-Follower机制来避免过多的线程频繁抢锁争用资源。
对于PriorityQueue的siftDown,注释里面也注释了一遍,我在看的过程中恍惚记起了大学是老师讲二叉树的时候的各种简便的计算方式,比如size/2就是非叶子节点数。。。可以看我引用的那篇文章,自己画一下二叉树和数组,对着代码自己琢磨琢磨相信你会有收获的。
1 总结一下
1.1 用了ReentrantLock和Condition,所以你要先了解这两个东西,代码才好分析。并发包的的东西都是一环套一环的啊,下面有个图,顺着这个图撸撸代码,会有不小的收获。
image.png
1.2 用了PriorityQueue,优先队列,二叉小顶堆,堆顶是最小的数据,通过这个队列,我们对入队的元素进行排序,root元素出队后重排序。
1.3 入队的对象必须是实现Delay接口,也就是说必须实现long getDelay(TimeUnit unit);int compareTo(T o);两个方法,即延时方法和比较方法需要子类实现。
一句话总结:延时队列就是基于优先队列、ReentrantLock、Condition实现延时效果的队列。==等于没说