非阻塞同步队列ConcurrentLinkedQueue
关于ConcurrentLinkedQueue网上一些文章的说法上笔者认为存在误区,这些文章上把ConcurrentLinkedQueue归类为无锁队列,然后ArrayBlockingQueue、LinkedBlockingQueue之类的归为有锁队列,但其实ArrayBlockingQueue内部的锁用的是ReentrantLock,底层也是CAS,所以只用无锁和有锁来区分这者,笔者认为不太准确,容易产生误会。
无界的、非阻塞的、线程安全的同步链表队列
ConcurrentLinkedQueue本质是个由Node组成的链表,所以是无界的。
提供出来的方法都是非阻塞方法(不阻塞当前线程),所以是个非阻塞队列。无界链表非阻塞队列。
与ArrayBlockingQueue
、LinkedBlockingQueue
这些阻塞队列的区别在于ConcurrentLinkedQueue
内部没有使用ReentrantLock
之类的轻量级锁(CAS + LockSupport)来实现队列的操作的线程安全,而是直接使用不断循环CAS
操作的方式来保证入队和出队的线程安全。所以笔者认为,两者区别从功能上来说,一个提供阻塞、一个提供非阻塞方法,但从本质的机制上来说,两类队列最终其实都是依靠CAS
来保证线程安全。
所不同的是,阻塞队列是多线程先要去征用一个轻量锁,然后获得锁的线程可以操作队列、其他线程阻塞,通过锁来确保线程一个一个的操作队列。本质上线程在上述过程中有阻塞和让出CPU的现象、也就是发生了线程间上下文切换。
细读
LinkedBlockingQueue
、ReentrantLock
、AQS
的源码可以发现,没抢到锁(也就是没CAS到state)的线程会先进入阻塞状态等待持有锁的线程进行唤醒,阻塞和唤醒用的是LockSupport.park
,LockSupport.unpark
,底层是用一个标志位来让CPU识别进行线程调度与否,这样多线程的情况下,也就出现了让出CPU和线程切换了。
而ConcurrentLinkedQueue
则是多个线程一起操作队列,线程在这个过程中是可以不阻塞的,减少了线程切换。
看到这可能有的同学眼睛亮了,线程切换少了、性能损耗少了,ConcurrentLinkedQueue亚克西,阻塞队列渣渣。我们不能武断的下结论,线程不挂起-唤醒,一直并发运行,真正并行多少是要看CPU核数的,笔者这里猜想非阻塞的ConcurrentLinkedQueue的CPU的使用效率会高一些(少了切换带来的损耗、能更多的用于实际线程的执行),但是CPU面对一堆可执行状态的线程的能力也是有上限的,这会不会带来更高的负载?且ConcurrentLinkedQueue本身的无界特性对于任务积压场景来说也是个大问题。
入队、出队方法的源码
ConcurrentLinkedQueue的源码分析,在认清了其链表为基础的大致结构之后可以从offer
和poll
两个入队和出队方法入手:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
可以看到,offer和poll两个方法内部都是for死循环的结构,入队或出队不成功则反复CAS进行重试,没有基于AQS做的锁机制,做入队和出队操作的线程是一直处于可运行状态的(包括运行和就绪两种状态)。
具体哪个线程是Running状态,则靠操作系统进行线程调度。
由此可知,使用ConcurrentLinkedQueue可以充分的利用CPU,对时间片的使用比较充分、因为少了线程阻塞-唤醒这类切换成本(CPU要花费上千个时间片在一次阻塞-唤醒这样的操作上),也就是延迟比较低、性能会比较高,但是我们也应该看到,当并发线程数大大多于CPU核数时,会带来CPU高负载的风险,CPU使用率也会随之升高。