Java 并发

【Java 并发笔记】ConcurrentLinkedQueue

2019-01-09  本文已影响0人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. ConcurrentLinkedQueue

1.1 ConcurrentLinkedQueue 的结构

ConcurrentLinkedQueue 的类图
        //Node代码中使用了UNSAFE提供的CAS方法保证操作的原子性,
        //UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
        //第一个参数表示要更新的对象,第二个参数nextOffset是Field的偏移量,第三个参数表示期望值,最后一个参数更新后的值。若next域的值等于cmp,则把next域更新为val并返回true;否则不更新并返回false。
        private static class Node<E> {
            volatile E item;    //Node值,volatile保证可见性
            volatile Node<E> next;    //Node的下一个元素,volatile保证可见性

            /**
             * Constructs a new node.  Uses relaxed write because item can
             * only be seen after publication via casNext.
             */
            Node(E item) {
                UNSAFE.putObject(this, itemOffset, item);
            }

            boolean casItem(E cmp, E val) {
                return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }

            void lazySetNext(Node<E> val) {
                UNSAFE.putOrderedObject(this, nextOffset, val);
            }

            boolean casNext(Node<E> cmp, Node<E> val) {
                return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            // Unsafe mechanics

            private static final sun.misc.Unsafe UNSAFE;
            private static final long itemOffset;
            private static final long nextOffset;

            static {
                //初始化UNSAFE和各个域在类中的偏移量
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();//初始化UNSAFE
                    Class k = Node.class;
                    //itemOffset是指类中item字段在Node类中的偏移量,先通过反射获取类的item域,然后通过UNSAFE获取item域在内存中相对于Node类首地址的偏移量。
                    itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                    //nextOffset是指类中next字段在Node类中的偏移量
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
}
public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
}

结点类型

1.2 入队操作

public boolean offer(E e) {
    // 如果e为null,则直接抛出NullPointerException异常
    checkNotNull(e);
    // 创建入队结点
    final Node<E> newNode = new Node<E>(e);
 
    // 循环CAS直到入队成功
    // 1、根据tail结点定位出尾结点(last node);2、将新结点置为尾结点的下一个结点;3、casTail更新尾结点
    for (Node<E> t = tail, p = t;;) {
        // p用来表示队列的尾结点,初始情况下等于tail结点
        // q是p的next结点
        Node<E> q = p.next;
        // 判断p是不是尾结点,tail结点不一定是尾结点,判断是不是尾结点的依据是该结点的next是不是null
        // 如果p是尾结点
        if (q == null) {
            // p is last node
            // 设置p结点的下一个结点为新结点,设置成功则casNext返回true;否则返回false,说明有其他线程更新过尾结点
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                // 如果p != t,则将入队结点设置成tail结点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail结点
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
        // 所以这里需要重新找新的head,因为新的head后面的结点才是激活的结点
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        // 寻找尾结点
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

1.2.1 初始化状态

初始化状态

1.2.2 入队一个结点

1.2.2.1 casNext 操作成功的情况

casNext 设置成功状态

1.2.2.2 casNext 操作失败的情况

casNext 设置失败的情况 失败后根据条件设置

1.2.3 将入队结点设置为尾结点

1.2.3.1 casTail 操作成功的情况

casTail 操作成功的情况

1.2.3.2 casTail 操作失败的情况

casTail 操作失败的情况 失败后根据条件设置

1.2.3.3 p 等于 q 的情况处理

1.3.2.3 中的状态 p 等于 q 的处理 入队新结点

1.2.3.4 tail 结点不一定为尾结点的设计意图

1.3 出队操作

public E poll() {
    restartFromHead:
    for (;;) {
        // p结点表示首结点,即需要出队的结点
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
 
            // 如果p结点的元素不为null,则通过CAS来设置p结点引用的元素为null,如果成功则返回p结点的元素
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                // 如果p != h,则更新head
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 如果头结点的元素为空或头结点发生了变化,这说明头结点已经被另外一个线程修改了。
            // 那么获取p结点的下一个结点,如果p结点的下一结点为null,则表明队列已经空了
            else if ((q = p.next) == null) {
                // 更新头结点
                updateHead(h, p);
                return null;
            }
            // p == q,则使用新的head重新开始
            else if (p == q)
                continue restartFromHead;
            // 如果下一个元素不为空,则将头结点的下一个结点设置成头结点
            else
                p = q;
        }
    }
}

1.3.1 队列为空情况

1.3.1.1 执行过程中没有其他的线程入队结点

队列为空情况
final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
}

1.3.1.2 执行过程中其他线程入队了结点

执行过程中其他线程入队了结点 执行过程中其他线程入队了结点

1.3.2 移除结点

1.3.2.1 casItem 操作成功至 updateHead 操作期间无其他操作情况

casItem 操作成功至 updateHead 操作期间无其他操作情况

1.3.2.2 casItem 操作失败的情况

casItem 操作失败的情况

1.3.2.3 casItem 操作成功至 updateHead 操作期间其他线程进行了出队

casItem 操作成功至 updateHead 操作期间有其他线程进行了出队操作 重新查找 head 结点

1.4 其他相关方法

1.4.1 peek 方法

// 获取链表的首部元素(只读取而不移除)
public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

1.4.2 size 方法

public int size() {
    int count = 0;
    // first() 获取第一个具有非空元素的结点,若不存在,返回 null
    // succ(p) 方法获取p的后继结点,若 p 等于 p.next,则返回 head
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            // 最大返回Integer.MAX_VALUE
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

//获取第一个具有非空元素的结点,没有则为 null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
 
//获取当前结点的 next 元素,如果是自引入结点则返回真正头结点
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

1.4.3 remove 方法

public boolean remove(Object o) {
    // 删除的元素不能为null
    if (o != null) {
        Node<E> next, pred = null;
 
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;

            // 结点元素不为null
            if (item != null) {
                // 若不匹配,则获取next结点继续匹配
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }

                // 若匹配,则通过CAS操作将对应结点元素置为null
                removed = p.casItem(item, null);
            }

            // 获取删除结点的后继结点
            next = succ(p);
            // 将被删除的结点移除队列
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

1.4.4 contains 方法

public boolean contains(Object o) {
    if (o == null) return false;

    // 遍历队列
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        // 若找到匹配结点,则返回true
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

1.4.5 add 方法

public boolean add(E e) {
    return offer(e);
}

1.5 总结


基本不变式

head 的不变式和可变式

tail 的不变式和可变式



向后推进


参考资料

https://blog.csdn.net/qq_38293564/article/details/80798310
http://www.importnew.com/25668.html
https://www.jianshu.com/p/7816c1361439

上一篇 下一篇

猜你喜欢

热点阅读