java并发编程(6):BlockingQueue相关源码详解
BlockingQueue的类继承结构如下,其主要实现类有:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue。
BlockingQueue的类继承结.png1、ArrayBlockingQueue
ArrayBlockingQueue是由数组组成的有界阻塞队列。队列中元素按照FIFO的方式进行排序。默认情况下线程以非公平的方式获取队列中的数据。
1.1、主要属性
//队列元素的容器,以数组作为队列的实现
final Object[] items;
//下一个获取(take、poll、peek、remove)元素数据的数组索引
int takeIndex;
//下一个存储(put、offer、add)元素的数组索引
int putIndex;
//队列中元素的个数
int count;
//锁
final ReentrantLock lock;
//队列不为空的信号量,即当插入数据到队列的时候,
//会通过notEmpty信号量通知其他等待获取队列数据的线程
private final Condition notEmpty;
//队列不满的信号量,即当线程从队列中取出数据后,
//会通过notFull信号量通知其他线程可以继续向队列中存储元素
private final Condition notFull;
1.2、构造函数
//初始化队列的容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//初始队列的容量,及是否为公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//初始化队列容量,是否为公平锁、及初始队列中的元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
//获取锁
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
//将初始化的元素放入队列中
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
1.3、入队操作
//offer:非阻塞入队操作
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列中元素的数量已经达到队列容量的上线,则直接返回false,否则将元素enqueue()
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//put:阻塞入队操作
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若队列空间达到容量上线,则调用notFull.await()等待其他线程取走只是一个队列中的数据
while (count == items.length)
notFull.await();
//当队列不满的时候,直接将数据入队
enqueue(e);
} finally {
lock.unlock();
}
}
//offer:超时时间内入队数据
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若队列已满
while (count == items.length) {
//无需等待超时,则直接返回false
if (nanos <= 0)
return false;
//等待超时时间nanos后再检查队列是否还是满的
nanos = notFull.awaitNanos(nanos);
}
//若队列不满则做入队操作
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//将数据插入队尾
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知其他等待获取数据的线程
notEmpty.signal();
}
1.4、出队操作
//poll:非阻塞获取队列中的数据
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列为空,则返回null;否则返回队头数据
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//take:阻塞获取队列中数据
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空,则在notEmpty信号量上等待其他线程将数据放入队列
while (count == 0)
notEmpty.await();
//获取队列中数据
return dequeue();
} finally {
lock.unlock();
}
}
//poll:在超时时间内获取数据
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空,如无需等待,则直接返回null;否则等待nanos超时时间
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//直接获取队头的数据,但不改变takeIndex值,即可重复peek()获取队头的数据
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
//获取队头数据
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知其他在notFull上等待的线程
notFull.signal();
return x;
}
1.5、移除数据
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
//移除的是队头的数据?则直接移除队头数据,并修改takeIndex值
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
//将队列中的数据前移
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
//队尾数据
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
2、LinkedBlockingQueue
LinkedBlockingQueue是基于单向链表的阻塞队列。
2.1、基本属性
//链表中的节点
static class Node<E> {
E item;
//后继节点
Node<E> next;
Node(E x) { item = x; }
}
//队列的容量
private final int capacity;
//当前队列中元素的个数
private final AtomicInteger count = new AtomicInteger();
//队列的头结点
transient Node<E> head;
//队列的尾节点
private transient Node<E> last;
//出队(take,poll等)的锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队等待的信号量
private final Condition notEmpty = takeLock.newCondition();
//入队锁
private final ReentrantLock putLock = new ReentrantLock();
//入队等待的信号量
private final Condition notFull = putLock.newCondition();
2.2、入队方法
//阻塞入队数据
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//当前队列已满?则在notFull信号量上等待,直到有其他线程从队列中获取元素
while (count.get() == capacity) {
notFull.await();
}
//当队列未满时,将元素入队,并增加队列元素计数count
enqueue(node);
c = count.getAndIncrement();
//若此时队列还未满,则在notFull中发信号通知下个在notFull等待的线程进行数据的入队操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//当队列已满时,在notEmpty发信号,通知其他等待此信号的线程
if (c == 0)
signalNotEmpty();
}
//超时时间内入队
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队列已满则且无需等待,则直接返回false,否则在notFull上等待超时
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//数据入队及元素计数
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
//非阻塞入队操作,入队成功返回true,否则为false,不进行阻塞操作
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
2.3、出队方法
//阻塞等待出队
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//超时等待出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//非阻塞等待出队
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3、PriorityBlockingQueue
PriorityBlockingQueue为有优先级的阻塞队列实现。
3.1、基本属性
//默认初始化的队列数组容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//数组最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//队列数组,实际存储的是树型的数据,不过用数组来存储树
//实际结构为二叉树,而树的父节点一定比子节点大,但左右子节点没有大小的要求
private transient Object[] queue;
//队列中元素的个数
private transient int size;
//元素优先级比较器
private transient Comparator<? super E> comparator;
//对外操作的锁
private final ReentrantLock lock;
//notEmpty等待队列
private final Condition notEmpty;
//用户空间分配的自旋锁
private transient volatile int allocationSpinLock;
//用于序列化和反序列化的队列
private PriorityQueue<E> q;
3.2、入队操作
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//容量已满,需要扩容?
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
//将数据插入树中
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
//将元素向数的上层移动
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
//获取父节点下标
int parent = (k - 1) >>> 1;
Object e = array[parent];
//若当前数据比父节点大,则无需上移,否则将当前节点上移
if (key.compareTo((T) e) >= 0)
break;
//将父节点下移
array[k] = e;
k = parent;
}
array[k] = key;
}
3.3、出队操作
//阻塞出队操作,在此只分析take,其他如poll,peek类似
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
//取出独立头部位置,即去除树中的根节点数据
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
//调整树结构
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
4、SynchronousQueue
SynchronousQueue为无缓冲的阻塞队列,其主要用来在两个线程之间交换数据。因其数据时无缓冲的。即当生产者生产数据的速度大于消费者消费数据的速度,会导致生产者线程长时间阻塞,导致服务不可用。
4.1、Transferer接口
SynchronousQueue中通过Transferer对数据进行交换。SynchronousQueue的出队及入队方法基本都是调用Transferer#transfer()方法进行处理的。而Transferer实现有两种,一个是公平交换的,一个是非公平交换的。
abstract static class Transferer<E> {
//e:交换的数据,当为null时,表示要获取数据;当不为null时,表示插入数据
//timed:true:运行超时;false:不允许超时
//nanos:超时时间
abstract E transfer(E e, boolean timed, long nanos);
}
4.2、TransferStack交换堆
TransferStack是非公平的交换堆,其不是某个消费者或生产者等待的时间越久就越先获取交换权。
TransferStack主要实现源码:
static final class TransferStack<E> extends Transferer<E> {
//模式,表示当前操作是消费数据还是生产数据
//REQUEST:需要消费数据
//DATA:需要生产数据
//FULFILLING:表示一对生产者和消费者已经匹配上了
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
//判断节点状态是否为:FULFILLING
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
//TransferStacks的节点数据结构
static final class SNode {
//堆中下个节点
volatile SNode next; // next node in stack
//和当前节点匹配的节点
volatile SNode match; // the node matched to this
//当前在节点等待的线程
volatile Thread waiter; // to control park/unpark
//节点数据:null表示为消费者节点;否则为生产者节点
Object item; // data; or null for REQUESTs
//当前节点模式:REQUEST或DATA或FULFILLING
int mode;
SNode(Object item) {
this.item = item;
}
//cas设置打当前节点的next值
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//尝试将当前节点与s节点配对
boolean tryMatch(SNode s) {
//cas设置当前节点的math节点
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
//唤醒与当前节点配对的math节点线程
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
}
//交换堆的头结点
volatile SNode head;
//cas设置头结点
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
//数据交换处理,循环以下三个处理,直到数据交换成功或超时
//1、若头结点为空或头结点的模式与当前节点一样,即都是生产者或消费者,
//则直接将当前节点插入堆的头结点,并等待其他和当前节点能匹配的节点,
//匹配上就返回匹配的节点,若节点取消则返回null
//2、若头结点的模式与当前节点模式互相补足,即能配对,即一个生产者一个消费者,
//则向堆中头部插入当前节点,并设置当前节点状态为fulfilling,设置匹配节点的math节点,
//然后同时弹出当前节点与其math的节点,还会唤醒与其math的节点。
//3、若堆的头结点已经与其他节点配对,
//则进行配对处理并将配对的两个节点弹出
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
//头结点为空或头节点与当前节点模式一致
if (h == null || h.mode == mode) { // empty or same-mode
//若不需要超时,则直接将当前节点放入堆的头节点,或直接返回null
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
//需要超时,则将节点插入头部并等待超时
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
//匹配成功,将两个节点弹出
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//头结点未匹配上?
} else if (!isFulfilling(h.mode)) { // try to fulfill
//头结点已取消?则更新头节点为下个节点
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
//更新头结点为当前节点,并设置当前节点模式为已配对模式
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
//节点匹配
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//在超时时间内等待匹配上节点
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//等待的截止时间点
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//自旋一定次数后线程阻塞自己等待配对线程唤醒
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
4.3、TransferQueue交换堆
TransferQueue是公平的交换堆,其等待越久的会先对其进行匹配操作,从而达到公平交换。
TransferQueue主要实现源码:
static final class TransferQueue<E> extends Transferer<E> {
//交换器中节点数据结构
static final class QNode {
//下个节点
volatile QNode next; // next node in queue
//节点数据
volatile Object item; // CAS'ed to or from null
//在节点等待的线程
volatile Thread waiter; // to control park/unpark
//当前节点是否为数据节点
final boolean isData;
}
//头节点
transient volatile QNode head;
//尾节点
transient volatile QNode tail;
//已经取消的节点引用,可能这个节点还未从队列中移除
transient volatile QNode cleanMe;
//cas将nh设置为新的头节点
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
//cas将nt设置为新的尾节点
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
//数据交换,处理流程同TransferStack大同小异
//1、
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
}
5、LinkedTransferQueue
LinkedTransferQueue是基于单链表实现的阻塞队列,节点数据入队时可以有几种模式,如AYNC、SYNC、TIMED、NOW等。不同模式应用于不同的场景。LinkedTransferQueue相较于LinkedBlockingQueue,可以直接将数据交给等待的节点,而无需入队,性能更高;LinkedTransferQueue相较于SynchronousQueue则多了存储元素的缓冲队列,可实现非阻塞的数据入队操作,避免了SynchronousQueue因消费者慢于生产者而导致的生产者阻塞问题。
5.1、主要属性
//cpu数是否大于1
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
//在阻塞等待之前自旋的次数
private static final int FRONT_SPINS = 1 << 7;
//节点被其他节点自旋操作的次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
//sweepVotes的阀值
static final int SWEEP_THRESHOLD = 32;
//头结点
transient volatile Node head;
//尾节点
private transient volatile Node tail;
//累计到一定次数再清除无效node
private transient volatile int sweepVotes;
//节点状态
//实时模式,立即进行数据交换
private static final int NOW = 0; // for untimed poll, tryTransfer
//异步模式,若数据交换不成功,则将数据入队
private static final int ASYNC = 1; // for offer, put, add
//同步模式,会一直等到交换成功
private static final int SYNC = 2; // for transfer, take
//超时模式,等待一定的超时时间后才失败
private static final int TIMED = 3; // for timed poll, tryTransfer
//等待的节点数据结构
static final class Node {
//是否为数据节点
final boolean isData; // false if this is a request node
//节点数据
volatile Object item; // initially non-null if isData; CASed to match
//下个等待的节点
volatile Node next;
//在当前节点等待的线程
volatile Thread waiter; // null until waiting
//cas设置下个节点
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//cas设置节点数据
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
5.2、数据交换操作
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//p节点未被匹配到?
if (item != p && (item != null) == isData) { // unmatched
//节点模式不一样,无法匹配,直接跳出循环
if (isData == haveData) // can't match
break;
//模式不一样,则cas尝试匹配
if (p.casItem(item, e)) { // match
//匹配成功,则cas将下个节点从队列中移除
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
//无下个节点了?
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
//唤醒配对的节点
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//模式为非立即匹配?
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
//尝试入队
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
//同步操作?则等待匹配
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
//将节点入队,放入队尾
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
//队列为空,直接入队
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
//当前节点与队尾几点模式不同,无法入队?
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
//非队尾?
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
//cas插入队尾
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
//将数据添加到队尾
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
//等待数据节点匹配,直到超时
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 如果s元素的值不等于e,说明它被匹配到了
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
// 如果当前线程中断了,或者有超时的到期了
// 就更新s的元素值指向s本身
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
// 如果自旋次数小于0,就计算自旋次数
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
//自旋处理
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
//需要超时等待,则阻塞当前线程等待
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
5.3、入队及出队操作
入队出队(put、offer、add,take、poll等)操作,基本都是调用xfer进行数据交换操作,只是不同操作模式不同。
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E poll() {
return xfer(null, false, NOW, 0);
}