JUC源码分析-集合篇(八):SynchronousQueue
SynchronousQueue 是一个同步阻塞队列,它的每个插入操作都要等待其他线程相应的移除操作,反之亦然。SynchronousQueue 像是生产者和消费者的会合通道,它比较适合“切换”或“传递”这种场景:一个线程必须同步等待另外一个线程把相关信息/时间/任务传递给它。在之后的线程池源码分析中我们也会见到它,所以理解本章对我们之后的线程池讲解也会有很大帮助。
概述
SynchronousQueue(后面称SQ)内部没有容量,所以不能通过
peek
方法获取头部元素;也不能单独插入元素,可以简单理解为它的插入和移除是“一对”对称的操作。为了兼容 Collection 的某些操作(例如contains
),SQ 扮演了一个空集合的角色。
SQ 的一个典型应用场景是在线程池中,Executors.newCachedThreadPool() 就使用了它,这个构造使线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
SQ 为等待过程中的生产者或消费者线程提供可选的公平策略(默认非公平模式)。非公平模式通过栈(LIFO)实现,公平模式通过队列(FIFO)实现。使用的数据结构是双重队列(Dual queue)和双重栈(Dual stack)(后面详细讲解)。FIFO通常用于支持更高的吞吐量,LIFO则支持更高的线程局部存储(TLS)。
SQ 的阻塞算法可以归结为以下几点:
- 使用了双重队列(Dual queue)和双重栈(Dual stack)存储数据,队列中的每个节点都可以是一个生产者或是消费者。(有关双重队列,请参考笔者另外一篇文章:JUC源码分析-集合篇(四):LinkedTransferQueue,双重栈与它原理一致)
- 已取消节点引用指向自身,避免垃圾保留和内存损耗
- 通过自旋和 LockSupport 的 park/unpark 实现阻塞,在高争用环境下,自旋可以显著提高吞吐量。
数据结构
SynchronousQueue 继承关系SQ 有三个内部类:
-
Transferer:内部抽象类,只有一个
transfer
方法。SQ的put
和take
被统一为一个方法(就是这个transfer
方法),因为在双重队列/栈数据结构中,put
和take
操作是对称的,所以几乎所有代码都可以合并。 -
TransferStack:继承了内部抽象类 Transferer,实现了
transfer
方法,用于非公平模式下的队列操作,数据按照LIFO的顺序。内部通过单向链表 SNode 实现的双重栈。 -
TransferQueue:继承了内部抽象类 Transferer,实现了
transfer
方法,用于公平模式下的队列操作,数据按照FIFO的顺序。内部通过单向链表 QNode 实现的双重队列。
SNode & QNode
SNode 是双重栈的实现,内部除了基础的链表指针和数据外,还维护了一个int
型变量mode
,它是实现双重栈的关键字段,有三个取值:0代表消费者节点(take),1代表生产者节点(put),2 | mode
(mode为当前操作者模式:put or take)代表节点已被匹配。此外还有一个match
引用,用于匹配时标识匹配的节点,节点取消等待后match
引用指向自身。
QNode 是双重队列的实现,通过isData
实现双重队列。这个在JUC源码分析-集合篇(四):LinkedTransferQueue 一篇中有讲解,在此就不再赘述。
源码解析
SQ 的 put/take 操作完全是由transfer
方法实现,以put
方法为例,
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
可以看到调用了内部变量 transferer 的transfer
的方法。其它例如offer、take、poll
都与之类似,所以接下来我们主要针对transfer
方法,来分析 SQ 公平模式和非公平模式的不同实现。
TransferStack.transfer()
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//根据所传元素判断为生产or消费
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())//head已经被匹配,修改head继续循环
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//构建新的节点s,放到栈顶
//等待s节点被匹配,返回s.match节点m
SNode m = awaitFulfill(s, timed, nanos);
//s.match==s(等待被取消)
if (m == s) { // wait was cancelled
clean(s);//清除s节点
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { //head节点还没有被匹配,尝试匹配 try to fulfill
if (h.isCancelled()) // already cancelled
//head已经被匹配,修改head继续循环
casHead(h, h.next); // pop and retry
//构建新节点,放到栈顶
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
//cas成功后s的match节点就是s.next,即m
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {//尝试匹配,唤醒m节点的线程
casHead(s, mn); //弹出匹配成功的两个节点,替换head pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); //匹配失败,删除m节点,重新循环 help unlink
}
}
} else { //头节点正在匹配 help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {//帮助头节点匹配
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
说明:基本算法是循环尝试以下三种行为之一:
-
如果栈为空或者已经包含了一个相同的 mode,此时分两种情况:如果是非计时操作(
offer、poll
)或者已经超时,直接返回null;其他情况下就把当前节点压进栈顶等待匹配(通过awaitFulfill
方法),匹配成功后返回匹配节点的 item,如果节点取消等待就调用clean
方法(后面单独讲解)清除取消等待的节点,并返回 null。 -
如果栈顶节点(
head
)还没有被匹配(通过isFulfilling
方法判断),则把当前节点压入栈顶,并尝试与head
节点进行匹配,匹配成功后从栈中弹出这两个节点,并返回匹配节点的数据。isFulfilling
源码如下:
/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
- 如果栈顶节点(
head
)已经持有另外一个数据节点,说明栈顶节点正在匹配,则帮助此节点进行匹配操作,然后继续从第一步开始循环。
TransferStack. awaitFulfill()
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//计算截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//计算自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())//当前线程被中断
//取消对给定节点s的匹配节点的等待
s.tryCancel();
SNode m = s.match;//获取给定节点s的match节点
if (m != null)//已经匹配到,返回匹配节点
return m;
if (timed) {
//超时处理
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();//超时,取消s节点的匹配,match指向自身
continue;
}
}
if (spins > 0)
//spins-1
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
//设置给定节点s的waiter为当前线程
s.waiter = w; // establish waiter so can park next iter
else if (!timed)//没有设定超时,直接阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)//阻塞指定超时时间
LockSupport.parkNanos(this, nanos);
}
}
说明:如果当前操作是一个不计时操作,或者是一个还未到超时时间的操作,就构建新的节点压入栈顶。然后调用此方法自旋/阻塞等待给定节点s
被匹配。
当调用此方法时,所传参数节点s
一定是在栈顶,节点真正阻塞前会先自旋,以防生产者和消费者到达的时间点非常接近时也被 park。
当节点/线程需要阻塞时,首先设置waiter
字段为当前线程,然后在真正阻塞之前重新检查一下waiter
的状态,因为在线程竞争中,需要确认waiter
没有被其他线程占用。
从主循环返回的检查顺序可以反映出中断优先于正常返回。除了不计时操作(poll/offer
)不会检查中断,而是直接在transfer
方法中入栈等待匹配。
TransferQueue.transfer()
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//判断put or take
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { //尾节点滞后,更新尾节点 lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
//为当前操作构造新节点,并放到队尾
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
//推进tail
advanceTail(t, s); // swing tail and wait
//等待匹配,并返回匹配节点的item,如果取消等待则返回该节点s
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s); //等待被取消,清除s节点
return null;
}
if (!s.isOffList()) { // s节点尚未出列 not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;//item指向自身
s.waiter = null;
}
return (x != null) ? (E)x : e;
//take
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || //m.item=m, m cancelled
!m.casItem(x, e)) { // 匹配,CAS修改item为给定元素e lost CAS
advanceHead(h, m); // 推进head,继续向后查找 dequeue and retry
continue;
}
advanceHead(h, m); //匹配成功,head出列 successfully fulfilled
LockSupport.unpark(m.waiter); //唤醒被匹配节点m的线程
return (x != null) ? (E)x : e;
}
}
}
说明:基本算法是循环尝试以下两个动作中的其中一个:
-
若队列为空或者队列中的尾节点(
tail
)和自己的模式相同,则把当前节点添加到队列尾,调用awaitFulfill
等待节点被匹配。匹配成功后返回匹配节点的 item,如果等待节点被中断或等待超时返回null
。在此期间会不断检查tail
节点,如果tail
节点被其他线程修改,则向后推进tail
继续循环尝试。
注:TransferQueue 的awaitFulfill
方法与TransferStack.awaitFulfill
算法一致,后面就不再讲解了。 -
如果当前操作模式与尾节点(
tail
)不同,说明可以进行匹配,则从队列头节点head
开始向后查找一个互补节点进行匹配,尝试通过CAS
修改互补节点的item
字段为给定元素e
,匹配成功后向后推进head
,并唤醒被匹配节点的waiter
线程,最后返回匹配节点的item
。
栈/队列节点清除的对比(clean方法)
在队列和栈中进行清理的方式不同:
对于队列来说,如果节点被取消,我们几乎总是可以以 O1 的时间复杂度移除节点。但是如果节点在队尾,它必须等待后面节点的取消。
对于栈来说,我们可能需要 O(n) 的时间复杂度去遍历整个栈,然后确定节点可被移除,但这可以与访问栈的其他线程并行运行。
下面我们来看一下 TransferStack 和 TransferQueue 对节点清除方法的优化:
TransferStack.clean(SNode s)
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
//找到有效head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
//移除head到past中已取消节点的链接
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
说明:在最坏的情况下可能需要遍历整个栈来解除给定节点s
的链接(例如给定节点在栈底)。在并发情况下,如果有其他线程已经移除给定节点s
,当前线程可能无法看到,但是我们可以使用这样一种算法:
使用s.next
作为past
节点,如果past
节点已经取消,则使用past.next
节点,然后依次解除从head
到past
中已取消节点的链接。在这里不会做更深的检查,因为为了找到失效节点而进行两次遍历是不值得的。
TransferQueue.clean(QNode pred, QNode s)
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
//找到有效head节点
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)//队列为空,直接返回
return;
QNode tn = t.next;
if (t != tail)//tail节点被其他线程修改,重新循环
continue;
//找到tail节点
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))//cas解除s的链接
return;
}
//s是队列尾节点,此时无法删除s,只能去清除cleanMe节点
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))//原cleanMe为空,标记pred为cleanMe,延迟清除s节点
return; // Postpone cleaning s
}
}
说明:方法参数中s
为已经取消的节点,pred
为s
的前继节点。
任何时候在队列中都存在一个不能删除的节点,也就是最后被插入的那个节点(tail
节点)。为了满足这一点,在 TransferQueue 中维护了一个cleanMe
节点引用。当给定s
节点为tail
节点时,首先删除cleanMe
节点引用;然后保存s
的前继节点作为cleanMe
节点,在下次清除操作时再清除节点。这样保证了在s
节点和cleanMe
节点中至少有一个是可以删除的。
小结
本章重点:理解 SynchronousQueue 中双重栈和双重队列的实现;理解 SynchronousQueue 的阻塞算法。