ConcurrentLinkedQueue全解析

2018-09-25  本文已影响71人  longhuihu

使用java的concurrent包写程序已经快两三年了,至今对其实现原理不怎么了解,突然感到非常惭愧。下定决心从这个包里面找几个核心类,从源码级别做一个全面的分析,搞懂它们的实现原理,以及它们为啥这么快,这一篇挑选ConcurrentLinkedQueue。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个无锁化、非阻塞、线程安全的单向队列,JDK1.5提供,由大名鼎鼎的Doug Lea编写。不过要是你认为这是大神灵感爆发的杰作,那就错了,仔细读读java源文件的注释就知道,这个类的实现原理来自Michael & Scott设计的算法(请参考论文),而Michael & Scott也是在很多前人研究的基础上加以综合完善得到该算法的,成为很多具体平台并发FIFO队列实现的蓝本。如果不读读这篇论文,直接阅读java源码,光凭那点注释,会有点摸不着头脑。

我们要研究的ConcurrentLinkedQueue可以说是这个算法的java版本,当然针对java语言做了很多修改。到这里,你要是认为Doug Lea就干了点将原始算法翻译成java的活,那又错了,从一个理论算法到具体某个语言的实现还有相当长的距离;而且java语言本来就比较慢,所以为了挖掘性能,作者做了大量精细而巧妙的设计。

ConcurrentLinkedQueue实现的依赖cas操作和java的valotile语义,所以不理解这两点是无法看懂代码的。关于cas操作和valotile关键字,不是本文要讲的内容,有很多的资料可以查询,比如:
JAVA中的CAS
volatile关键字解析

说明:

  1. java里面并没有指针的概念,但是算法原理结构理论讲队列,有“头指针”、“尾指针”的说法,为了方便,下面在讲解ConcurrentLinkedQueue内部结构时也会使用这两个术语;
  2. 采用的源码是Java 1.8版本;
  3. 强烈建议先阅读cas和java内存模型相关资料。

一、队列的内部链表结构

下面是摘录自源码,经过简化的ConcurrentLinkedQueue定义片段。

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
     
    private transient volatile Node<E> head;
     private transient volatile Node<E> tail;

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
 }

一个空的ConcurrentLinkedQueue包含一个空节点(数据字段=null),队列的HEAD和TAIL指针都指向这个节点。当经过一些出入队列的操作以后,队列的结构可能类似下面:

【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
HEAD                                                                    TAIL

但是,也有可能类似这样:
【N1】->【N2】->【N3】->【N4】->【N5】->【N6】
                  HEAD                                  TAIL

上边这个结构,表示N1节点已经出了队列,此时的HEAD是N2节点,但是N1节点的next指针仍然保持着。而TAIL节点指向的N5却不是真正的队尾,队尾是N6。这是因为在入队列的过程中,由于并发,导致TAIL更新不及时。

同样还是上面的结构,如果我把里面的数据字段标识出来,有可能是这样的:
【N1(null)】->【N2(null)】->【N3(data)】->【N4(data)】->【N5(data)】->【N6(data)】
                          HEAD                                                                 TAIL
HEAD节点的数据字段是NULL,说明这是一个空节点,这个队列语义上第一个节点应该是N3才对。

再来看一个状态:
【N1】<-| 【N2】->【N3】->【N4】->【N5】->【N6】
                  HEAD                                  TAIL
N1节点的next字段指向自身,这表示N1节点彻底断开和队列的关系,为什么不直接将N1的next置为null呢,因为这样就不能直接断定N1到底是不是尾节点了。

上面这些情形是ConcurrentLinkedQueue在执行一些操作之后可能处于的状态,之所于允许这些看起来不够严谨的状态,是为了在并发过程中提高效率。但是不管如何,在整个生命周期内,算法保持以下属性:

  1. 链表里面至少会有一个节点,数据字段为null的节点是空节点;
  2. 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
  3. TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
  4. 和队列断开的节点,next字段指向自身;
  5. 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。

在ConcurrentLinkedQueue内部链表上,可能有一个或多个数据字段为null的空节点,空节点虽然没有数据,但是对高效地保持链表的连接状态至关重要。另一方面,节点的数据字段和next字段值的变化有很强的规律性:数据字段在节点入队列是不为null,出队列时变为null;next字段入队列时是null,出队列时先保持不变,再指向自身。这种规律性可以有效规避掉cas操作的ABA问题。

上面这些也可以认为是ConcurrentLinkedQueue实现算法的不变式,有些在源码注释里面就有说明,有些是我总结的。带着这些原则才能理解每个方法的实现为什么是这样的。

二、内部节点类

直接上源码吧,由于Node提供一个比较简单的节点数据结构,逻辑不多,但是我在读的过程中还是有一些疑惑,直接将解释写在下面的源码块里面了。

private static class Node<E> {
       volatile E item;
       volatile Node<E> next;

       /**
        * Constructs a new node.  Uses relaxed write because item can
        * only be seen after publication via casNext.
        */
       Node(E item) {
           UNSAFE.putObject(this, itemOffset, item);
       }
       为什么要使用UNSAFE.putObject而不直接赋值呢?刚看到“relaxed write”这个注释时一脸懵逼,
       后来才搞明白,因为item是volatile修饰的,如果直接赋值,会触发volatile的内存同步语义,
       在初始化阶段不需要如此,所以使用UNSAFE.putObject能提高一些性能。

       boolean casItem(E cmp, E val) {
           return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
       }
       cas操作设置数据字段,这个操作在出队列操作时,保证并发安全。

       void lazySetNext(Node<E> val) {
           UNSAFE.putOrderedObject(this, nextOffset, val);
       }
       这个操作相比UNSAFE.putObject,会一定程度禁止指令重排,
       相比volatile赋值,不保证全局可见性,性能也稍好一些,用于节点出队列后断开链接,

       boolean casNext(Node<E> cmp, Node<E> val) {
           return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
       }
       给节点设置next字段,入队列的关键操作,cas操作保证并发安全
       
       // Unsafe mechanics
       private static final sun.misc.Unsafe UNSAFE;
       private static final long itemOffset;
       private static final long nextOffset;
 } 
  

三、出队列:poll方法

poll的目标是安全地取出第一个有效节点的数据,如果没有返回null。
源码如下,为了方便分析,对关键行,我加上了L1~Ln这样的行标记 。

    public E poll() {
        restartFromHead:
        for (;;) {
        L1 for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

        L2      if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
        L3          if (p != h) // hop two nodes at a time
        L4              updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
        L5      else if ((q = p.next) == null) {
        L6          updateHead(h, p);
                    return null;
                }
        L7      else if (p == q)
                    continue restartFromHead;
                else
        L8          p = q;
            }
        }
    }

L2行如果执行成功,那么出队列可以算是完成了,此时的head指向一个空节点,这种情况是允许的,但是队列里面也不能总是留很多空节点,所以L3L4行,就是把head指针后移。

上面L2~L4行,每个线程在成功poll一个节点之后,就会尝试把头结点移到p的下一个节点,这样保证了head后面最多只有一个空节点。L3行的那个判断如果没有,也不会影响程序的正确性,但是性能会差一些。

再介绍一下更新头指针的updateHead方法:

    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

将头指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集,这个操作也维护了第一节总结的节点属性。值得注意的是h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到L5行时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么L7行的判断也可能失败,最终执行到L8行,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为,作者经过测量,认为这种情况造成的性能损失低于volatile造成的性能损失)。

还有一个有意思的现象是,在poll的过程中,并不会去修正TAIL指针,所以在链表的TAIL指针是有可能落在HEAD之后的,甚至暂时指向一个已经出队列的节点。

四、入队列:offer方法

offer的目标是将新节点插入到尾节点的后面,即使在并发情况下也不丢失数据。

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

     L1 for (Node<E> t = tail, p = t;;) {
     L2     Node<E> q = p.next;
     L3     if (q == null) {
                // p is last node
     L4         if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
     L5            if (p != t) // hop two nodes at a time
     L6                 casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
     L7     else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
     L8        p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
     L9         p = (p != t && t != (t = tail)) ? t : q;
        }
    }

移除操作:remove

再来看移除操作,通过数据字段的比较,来移除数据相等的节点。

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
        L1   if (item != null) {
        L2      if (!o.equals(item)) {
        L3          next = succ(p);
        L4          continue;
                }
        L5      removed = p.casItem(item, null);
            }

        L6  next = succ(p);
        L7  if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

上面再取p的下一个节点的时候,用了调用了succ这个方法,这里也看一下:

   final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

很简单,如果p的next指向自己,说明p已经脱离链表,此时返回head指向的节点。

源码可以看出,如果队列里面有多个相同数据的节点,一次remove调用最多删除一个。

迭代器

private class Itr implements Iterator<E> {
     L1 private Node<E> nextNode;
     L2 private E nextItem;
     L3 private Node<E> lastRet;

        Itr() {
     L4     advance();
        }
        private E advance() {
     L5     lastRet = nextNode;
     L6     E x = nextItem;

            Node<E> pred, p;
     L7       if (nextNode == null) {
     L8           p = first();
                pred = null;
            } else {
                pred = nextNode;
     L9         p = succ(nextNode);
            }

            for (;;) {
     L10        if (p == null) {
                    nextNode = null;
                    nextItem = null;
                    return x;
                }
                E item = p.item;
     L11        if (item != null) {
                    nextNode = p;
                    nextItem = item;
                    return x;
                } else {
                    // skip over nulls
     L12            Node<E> next = succ(p);
     L13            if (pred != null && next != null)
     L14                pred.casNext(p, next);
     L15            p = next;
                }
            }
        }

        public boolean hasNext() {
     L16    return nextNode != null;
        }

        public E next() {
            if (nextNode == null) throw new NoSuchElementException();
     L17    return advance();
        }

        public void remove() {
     L18    Node<E> l = lastRet;
            if (l == null) throw new IllegalStateException();
            // rely on a future traversal to relink.
     L19    l.item = null;
     L20    lastRet = null;
        }
    }

源代码稍微长点,我删掉了注释。

从上面的实现我们可以知道,在迭代过程中,如果有数据并发入队列,这些数据是可以被迭代到的;如果有值被并发删除或出队列,那么这些数据有可能也被迭代到。尤其是迭代器的remove方法,删除的有可能是一个不存在的数据。

队列长度

ConcurrentLinkedQueue的size方法是很没效率的的,实际是把队列遍历了一遍来计算长度。所以推荐大家使用isEmpty来判断队列是否为空,而不要使用size()==0。

public boolean isEmpty() {
    return first() == null;
}

 public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

为什么ConcurrentLinkedQueue内部不维护一个size变量来跟踪队列的长度呢?不是不想,是做不到,由于ConcurrentLinkedQueue使用无锁化设计,通过cas操作来保证并发安全。而cas操作只能保证单个变量的并发安全性,无法在出入队列操作的同时,维护size变量。

小结

上面基本讲解了ConcurrentLinkedQueue源代码的大部分核心内容。在开篇的时候也讲过,高效的队列算法不是一朝一夕的出来的,看似短短一两百行代码,是经过前人大量的研究才得到的;因此我们读起来有些吃力是理所当然的,如果不去读一读相关的论文及其他资料,那就更困难了。ConcurrentLinkedQueue的代码在深入挖掘jvm特性的基础上,做了不少性能优化,比如offer方法的L8L9行;当然,上文的解释是我的推测,不一定正确,如果大家发现谬误之处,欢迎指正。

上一篇下一篇

猜你喜欢

热点阅读