多线程并发

非阻塞同步队列ConcurrentLinkedQueue

2021-12-01  本文已影响0人  肥兔子爱豆畜子

关于ConcurrentLinkedQueue网上一些文章的说法上笔者认为存在误区,这些文章上把ConcurrentLinkedQueue归类为无锁队列,然后ArrayBlockingQueue、LinkedBlockingQueue之类的归为有锁队列,但其实ArrayBlockingQueue内部的锁用的是ReentrantLock,底层也是CAS,所以只用无锁和有锁来区分这者,笔者认为不太准确,容易产生误会。

无界的、非阻塞的、线程安全的同步链表队列

ConcurrentLinkedQueue本质是个由Node组成的链表,所以是无界的。

提供出来的方法都是非阻塞方法(不阻塞当前线程),所以是个非阻塞队列。无界链表非阻塞队列

ArrayBlockingQueueLinkedBlockingQueue这些阻塞队列的区别在于ConcurrentLinkedQueue内部没有使用ReentrantLock之类的轻量级锁(CAS + LockSupport)来实现队列的操作的线程安全,而是直接使用不断循环CAS操作的方式来保证入队和出队的线程安全。所以笔者认为,两者区别从功能上来说,一个提供阻塞、一个提供非阻塞方法,但从本质的机制上来说,两类队列最终其实都是依靠CAS来保证线程安全。

所不同的是,阻塞队列是多线程先要去征用一个轻量锁,然后获得锁的线程可以操作队列、其他线程阻塞,通过锁来确保线程一个一个的操作队列。本质上线程在上述过程中有阻塞和让出CPU的现象、也就是发生了线程间上下文切换。

细读LinkedBlockingQueueReentrantLockAQS的源码可以发现,没抢到锁(也就是没CAS到state)的线程会先进入阻塞状态等待持有锁的线程进行唤醒,阻塞和唤醒用的是LockSupport.parkLockSupport.unpark,底层是用一个标志位来让CPU识别进行线程调度与否,这样多线程的情况下,也就出现了让出CPU和线程切换了。

ConcurrentLinkedQueue则是多个线程一起操作队列,线程在这个过程中是可以不阻塞的,减少了线程切换。

看到这可能有的同学眼睛亮了,线程切换少了、性能损耗少了,ConcurrentLinkedQueue亚克西,阻塞队列渣渣。我们不能武断的下结论,线程不挂起-唤醒,一直并发运行,真正并行多少是要看CPU核数的,笔者这里猜想非阻塞的ConcurrentLinkedQueue的CPU的使用效率会高一些(少了切换带来的损耗、能更多的用于实际线程的执行),但是CPU面对一堆可执行状态的线程的能力也是有上限的,这会不会带来更高的负载?且ConcurrentLinkedQueue本身的无界特性对于任务积压场景来说也是个大问题。

入队、出队方法的源码

ConcurrentLinkedQueue的源码分析,在认清了其链表为基础的大致结构之后可以从offerpoll两个入队和出队方法入手:

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

可以看到,offer和poll两个方法内部都是for死循环的结构,入队或出队不成功则反复CAS进行重试,没有基于AQS做的锁机制,做入队和出队操作的线程是一直处于可运行状态的(包括运行和就绪两种状态)

具体哪个线程是Running状态,则靠操作系统进行线程调度。

由此可知,使用ConcurrentLinkedQueue可以充分的利用CPU,对时间片的使用比较充分、因为少了线程阻塞-唤醒这类切换成本(CPU要花费上千个时间片在一次阻塞-唤醒这样的操作上),也就是延迟比较低、性能会比较高,但是我们也应该看到,当并发线程数大大多于CPU核数时,会带来CPU高负载的风险,CPU使用率也会随之升高。

参考:

聊聊并发(六)ConcurrentLinkedQueue的实现原理分析 | 并发编程网 – ifeve.com

上一篇 下一篇

猜你喜欢

热点阅读