Java 杂谈并发编程

JAVA并发容器-ConcurrentLinkedQueue 源

2019-07-30  本文已影响6人  xiaolyuh

在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两 种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。

ConcurrentLinkedQueue是一个非阻塞的基于链表节点的无界线程安全队列,它遵循先进先出的原则,并采用了“wait-free”算法(即CAS算法)来实现。

类关系图

ConcurrentLinkedQueue.png

核心属性

// 头结点指针(不一定指向头,缓更新,非阻塞的,所以没法保证)
private volatile Node<E> head;

// 尾节点指针(不一定指向尾,缓更新,非阻塞的,所以没法保证)
private volatile Node<E> tail;

// 链表节点
private static class Node<E> {
    // 节点元素(存数据)
    volatile E item;
    // 下一个节点
    volatile Node<E> next;
...
}

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和 指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一 张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

注意:

  • 核心属性都是用volatile来修饰来保证数据的可见性。
  • 默认情况下头结点和尾节点是相等的对应节点的itemnull
  • 头和尾节点都是缓更新,所以他们只能代表一个逻辑上的位置,不是实际的头尾节点

offer() 入队方法

public boolean offer(E e) {
    // 检查元素是否为NULL,这里不允许存NULL值
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    // 自旋,t是tail节点的引用,p表示尾节点,默认情况下p=t的
    for (Node<E> t = tail, p = t; ; ) {
        // q是p的下一个节点
        Node<E> q = p.next;
        // q==null表示p是尾节点,否则需要更新p进入下一次循环
        if (q == null) {
            // 使用CAS将新节点设置为p的next节点
            if (p.casNext(null, newNode)) {
                // 不是每次添加新节点都会更新尾节点指针,每新增两个节点才会更新尾节点指针。
                // 这样做的好处是减少了更新尾节点的竞争,但是增加了寻找真正尾节点的代价。
                if (p != t) // hop two nodes at a time
                    // 即使设置失败也没有问题,说明其他线程已经更新了尾节点的值
                    casTail(t, newNode);

                // 永远返回true
                return true;
            }
        }
        // 定位尾节点
        // 删除节点的时候,会把删除的节点指向自己(自引用),就会发生p==q. 当添加第一个节点后tail也会发生自引用
        else if (p == q)
            // t != tail表示尾节点指针已经被移动过,这时p直接指向新的尾节点指针tail,否则p直接指向头结点指针,
            // 因为这时p已经被移出了链表,所以不能将p指向p.next
            p = (t != (t = tail)) ? t : head;
        else
            // t != tail表示尾节点指针已经被移动过,所以p直接指向新的尾节点指针(tail),否则p指向p.next
            p = (p  != t && t != (t = tail)) ? t : q;
    }
}

入队过程:

  1. 检查元素是否为NULL,如果是NULL直接抛出异常
  2. 初始化新节点newNode
  3. 找到尾节点指针```t``
  4. 根据尾节点指针t找到真正的尾节点p
  5. CAS将pnext指向newNode
  6. 判断是否更新尾节点指针的位置

注意:

  • ConcurrentLinkedQueue 是允许存NULL值的,NULL有特殊含义,表示已经从链表中移除(出队了)
  • 入队过程大致分为三步,第一步找到真正的尾节点p;第二步CAS更新p.next;第三步CAS更新尾节点指针的位置casTail(t, newNode);
  • t != tail表示尾节点指针已经被移动过
  • p == q表示p节点已经从链表中移除,并发生了自引用
  • 每新增两个节点才会更新尾节点指针, 这样能减少了更新尾节点的竞争,但是增加了寻找真正尾节点的代价。从本质上来 看它通过增加对volatile变量的读操作来减少对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

入队快照图

ConcurrentLinkedQueue队列添加元素的快照图.jpg

入队debug图

ConcurrentLinkedQueue队列添加元素debug图.jpg

poll()出队方法

public E poll() {
    //  外循环标记
    restartFromHead:
    for (; ; ) {
        // h是头结点指针,p头结点,q是p的下一个节点
        for (Node<E> h = head, p = h, q; ; ) {
            // 保存头节点的值
            E item = p.item;
            // CAS替换头结点的item值为null,如果item为null表示该节点已经从链表中移除了
            if (item != null && p.casItem(item, null)) {
                // 每移除两个元素,头节点指针才开始移动
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 当前队列为NULL
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 发生了自引用,重头再来
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

出队过程:

  1. 根据头结点指针找到头结点
  2. 暂存头结点的值
  3. 判断节点内容item是否为NULL,如果为NULL表示该节点已经出队了
  4. 如果item不为NULL则使用CAS替换为NULL,如果成功判断是否需要更新头结点指针
  5. 如果失败表示该节点已经被其他线程出队了,寻找新的头结点,继续第二步

注意:

  • 每删除两个节点才会更新头点指针, 这样能减少了更新头点的竞争。
  • 当节点的item==null时表示节点已经出队了

测试代码

public class ConcurrentLinkedQueueTset {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> linkedQueue = new ConcurrentLinkedQueue<>();

        linkedQueue.offer(1);
        linkedQueue.offer(2);
        linkedQueue.poll();
        linkedQueue.offer(3);
        linkedQueue.offer(4);
        linkedQueue.poll();
        linkedQueue.poll();
    }
}

参考

《java并发编程的艺术》

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-concurrent 工程

layering-cache

为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下

上一篇 下一篇

猜你喜欢

热点阅读