2019-02-04 本文已影响0人
一 SynchronousQueue
- 不存储消息,线程生产消息后休眠等待其他线程消费,被消费后生产者线程才继续往后处理。
1.1 实例化
- 实例化,公平则是按先进先出排序消费,非公平则不确定顺序消费
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
1.2 入队
- 入队函数,实际都是调用
transferer.transfer(e, true, unit.toNanos(timeout))
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
throw new InterruptedException();
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
1.3 出队
- 出队函数,实际都是调用
transferer.transfer(null, true, unit.toNanos(timeout))
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
throw new InterruptedException();
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
public E poll() {
return transferer.transfer(null, true, 0);
1.4 TransferQueue
- 公平锁,按请求先后顺序进行处理
1.4.1 队列存储节点结构
- 单链表
static final class QNode {
volatile QNode next; // next node in queue
// 数据存储,null表示需要数据,非null表示传输数据。
// null->非null表示获取到数据。
// 非null->null表示数据传输完成。
// item=QNode,表示节点被cancel
volatile Object item;
volatile Thread waiter; // 休眠等待的线程
final boolean isData;//true 生产者,false 消费者
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
boolean isCancelled() {
return item == this;
boolean isOffList() {
return next == this;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
nextOffset = UNSAFE.objectFieldOffset
} catch (Exception e) {
throw new Error(e);
1.4.2 实例化
- 初始化链表头尾节点
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
1.4.3 基本操作
- 修改head节点
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
- 修改尾节点
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
- 修改cleanMe节点
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
1.4.4 入队/出队函数transfer()
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//e为null出队,e不为null入队
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 还未实例化
continue; // 自旋等待
if (h == t || t.isData == isData) { //队列为空,或队列节点模式相同
QNode tn = t.next;
if (t != tail) // 尾节点有并发修改,则重新获取
if (tn != null) { //tail不是真正的尾节点,则先更新tail节点再重新获取
advanceTail(t, tn);
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)//若并发导致插入失败,则不为null
s = new QNode(e, isData);//首次执行则初始化节点,
if (!t.casNext(null, s)) //插入队尾,若有并发修改,则重头开始重新处理
advanceTail(t, s); // 更新tail,不关心结果,入队之前会保证tail更新成真正的尾节点
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // 节点被cancel
clean(t, s);
return null;
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // 释放原head节点,更新s为head
if (x != null) // and forget fields
s.item = s;//cancel
s.waiter = null;//清空等待线程
return (x != null) ? (E)x : e;//返回传递的数据
} else { //模式不同,则节点消费
QNode m = h.next; // 从头节点开始
if (t != tail || m == null || h != head)
continue; // 有并发处理,则重头开始重新处理
Object x = m.item;
if (isData == (x != null) || // m已经被消费
x == m || // 节点被cancel
!m.casItem(x, e)) { // 有并发,消费失败
advanceHead(h, m); // m出队,处理下一节点
advanceHead(h, m); // successfully fulfilled
return (x != null) ? (E)x : e;//返回传输的数据
1.4.5 awaitFulfill()
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 非头节点不自旋。
// 头节点则自旋等待对应的生产者或消费者
// 配置超时时间的,maxTimedSpins = cpu核数小于2则不自旋,否则自旋32次
// 未配置超时时间的,自旋 maxTimedSpins * 16
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
Object x = s.item;//获取节点数据
if (x != e)
return x;
if (timed) {//指定超时时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {//超时到达,cancel节点
if (spins > 0)
else if (s.waiter == null)
s.waiter = w;//配置等待线程
else if (!timed)//无超时时间,则一直休眠等待
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);//限时休眠,等待唤醒
1.4.6 clean()
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
QNode t = tail; // Ensure consistent read for tail
if (t == h)//队列已空,则返回
QNode tn = t.next;
if (t != tail)
if (tn != null) {//更新tail为真实的尾节点
advanceTail(t, tn);
if (s != t) { // 非尾节点,断开链表连接
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
// 否则放入cleanMe延迟处理删除
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;//待清理节点
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);//已清理完成
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
1.5 TransferStack
- 非公平消费,新请求不入队,直接先消费模式不一样的head节点。
1.5.1 transfer
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 链表为空,或和head模式相同,则入队
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled()) //清理cancelled节点
casHead(h, h.next); // pop cancelled node
return null;//直接返回null
} else if (casHead(h, s = snode(s, e, h, mode))) {//插入头节点
SNode m = awaitFulfill(s, timed, nanos);//休眠等待目标节点。request等待data,data等待request类型
if (m == s) { // wait was cancelled
return null;
if ((h = head) != null && h.next == s)
casHead(h, s.next); // 修改next节点为头节点
return (E) ((mode == REQUEST) ? m.item : s.item);
} else if (!isFulfilling(h.mode)) { //模式不同,且head节点未被消费,则待插入节点尝试消费head
if (h.isCancelled()) //cancelled节点则清理
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//插入节点,尝试消费head
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // 链表为空,清理i
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
SNode mn = m.next;
if (m.tryMatch(s)) {//尝试消费,消费成功,删除两个节点
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // 并发其他已消费,则只删除被消费的节点
s.casNext(m, mn); // help unlink
} else { // head已被消费,则删除head和消费head的next节点
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // 删除消费head的节点和head
else // lost match
h.casNext(m, mn); // 下一节点不是消费head的,则删除
二 LinkedTransferQueue
- 数据处理
private E xfer(E e, boolean haveData, int how, long nanos)
,所有入队,出队都调用这个函数 - e有值生产数据,haveData=true。e为null表示消费数据,haveData=false。
- how表示xfter的模式,有四种。
// for untimed poll, tryTransfer,节点成功消费或消费失败
private static final int NOW = 0;
// for offer, put, add。节点存储到队列中,可以后续处理
private static final int ASYNC = 1;
// for transfer, take。无不同模式则休眠或自旋等待。
private static final int SYNC = 2;
// for timed poll, tryTransfer。指定限时时间休眠等待,超时在处理失败。
private static final int TIMED = 3;
- xfer函数实现
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))//参数冲突错误
throw new NullPointerException();
Node s = null; // the node to append, if needed
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//(item != null) == isData 数据节点未消费
//item != p 数据节点未释放
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData)
break;// 和新节点模式一样,都是生产或消费数据
if (p.casItem(item, e)) { // 修改item值,消费节点,
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
// 节点数<=1,或q未被消费则不再继续处理
return LinkedTransferQueue.<E>cast(item);//返回数据
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);//节点入队列尾
if (pred == null)//有并发竞争失败,或有相反模式节点可消费
continue retry; // lost race vs opposite mode
if (how != ASYNC)//同步或限时则线程休眠等待
return awaitMatch(s, pred, e, (how == TIMED), nanos);
return e; // NOW或异步类型立即返回
- tryAppend
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {
if (casHead(null, s))//空链表
return s; // initialize
else if (p.cannotPrecede(haveData))
return null; // 模式相反且未被消费,返回null重新处理
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))//遍历直到尾节点,插入尾节点
p = p.next; // 插入失败则重新获取nexit节点,准备插入
else {//插入成功
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&//更新tail节点
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
return p;
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
- awaitMatch
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
if (item != e) { // 节点已被消费
// assert item != s;
s.forgetContents(); // 释放数据引用
return LinkedTransferQueue.<E>cast(item);//返回数据
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // 中断或超时则节点cancel
unsplice(pred, s);
return e;
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
else if (spins > 0) { // spin
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
else {