Java Concurrency面试精选Java技术升华

Java并发编程之并发容器 CopyOnWrite,Concur

2021-07-16  本文已影响0人  干天慈雨

前言

JUC 高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程(Lock Free)算法主要通过 CAS(Compare And Swap)+volatile 组合实现,通过 CAS 保障操作的原子性,通过volatile 保障变量的内存的可见性。无锁编程(Lock Free)算法的主要优点:
(1)开销较小:不需要在内核态和用户态之间切换进程。
(2)读写不互斥:只有写操作需要使用基于 CAS 机制的乐观锁,读读操作之间可以不用互斥。

1.高并发容器分类

JUC 包中提供了 List、Set、Queue、Map 各种类型的高并发容器,如 ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。
在性能上,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步
的 TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList 优于同步的ArrayList。

1.1 List

JUC 包中高并发 List 主要有 CopyOnWriteArrayList,对应的基础容器为 ArrayList。
CopyOnWriteArrayList 相当于线程安全的 ArrayList,它实现了 List 接口。在读多写少的场景中,其性能远远高于 ArrayList 的同步包装容器。

1.2 Set

JUC 包中 Set 主要有 CopyOnWriteArraySet、ConcurrentSkipListSet。

1.3 Map

JUC 包中 Map 主要有 ConcurrentHashMap、ConcurrentSkipListMap。

1.4 Queue

JUC 包中 Queue 的实现类包括三类:单向队列、双向队列、阻塞队列。

2. CopyOnWriteArrayList分析

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此如果每次读取都进行加锁操作,其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读操作是线程安全的。
写时复制(CopyOnWrite,简称 COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个 Accessor(访问器)访问一个资源(如内存或者是磁盘上的数据存储)时,他们会共同获取相同的指针指向相同的资源,只要有一个(修改器)需要修改该资源,系统会复制一份专用 Private Copy(副本)给该 Mutator,而其他 Accessor 所见到的最初的资源仍然保持不变,修改的过程对其他的 Accessor 都是透明的(transparently)。COW 主要的优点是如果没有修改器(mutator)去修改资源,就不会有副本被创建,因此多个 Accessor 可以共享同一份资源。

2.1 CopyOnWriteArrayList 的使用

在不使用CopyOnWriteArrayList 的情况下代码如下:

public class WithoutCopyOnWriteArrayListTest {

    public static class ConcurrentTarget implements Runnable {
        //并发操作的目标队列
        List<String> targetList = null;
        public ConcurrentTarget(List<String> targetList) {
            this.targetList = targetList;
        }

        @Override
        public void run() {
            Iterator<String> iterator = targetList.iterator();
            //迭代操作
            while (iterator.hasNext()) {
                // 在迭代操作时,进行列表的修改
                String threadName = Thread.currentThread().getName();
                System.out.println("开始往同步队列加入线程名称:" + threadName);
                targetList.add(threadName);
            }
        }

        //测试同步队列:在迭代操作时,进行列表的修改
        public static void main(String[] args) {
            List<String> notSafeList = Arrays.asList("a", "b", "c");
            List<String> synList = Collections.synchronizedList(notSafeList);
            //创建一个执行目标
            ConcurrentTarget synchronizedListListDemo =
                    new ConcurrentTarget(synList);
            //10 个线程并发
            for (int i = 0; i < 10; i++) {
                new Thread(synchronizedListListDemo , "线程" + i).start();
            }
            //主线程等待
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行代码会报如下错误:

java.lang.UnsupportedOperationException
    at java.util.AbstractList.add(AbstractList.java:148)
    at java.util.AbstractList.add(AbstractList.java:108)
    at java.util.Collections$SynchronizedCollection.add(Collections.java:2035)
    at com.ymj.study.code10_juc_container.CopyOnWriteArrayListTest$ConcurrentTarget.run(CopyOnWriteArrayListTest.java:33)
    at java.lang.Thread.run(Thread.java:748)

这个时候可使用 CopyOnWriteArrayList 替代 Collections.synchronizedList同步包装实例,具体的代码如下:

public class CopyOnWriteArrayListTest {
    public static class ConcurrentTarget implements Runnable {
        //并发操作的目标队列
        List<String> targetList = null;

        public ConcurrentTarget(List<String> targetList) {
            this.targetList = targetList;
        }

        @Override
        public void run() {
            Iterator<String> iterator = targetList.iterator();
            //迭代操作
            while (iterator.hasNext()) {
                // 在迭代操作时,进行列表的修改
                String threadName = Thread.currentThread().getName();
                System.out.println("开始往同步队列加入线程名称:" + threadName);
                targetList.add(threadName);
            }
        }
    }

    public static void main(String[] args) {
        List<String> notSafeList = Arrays.asList("a", "b", "c");
        //创建一个 CopyOnWriteArrayList 队列
        List<String> copyOnWriteArrayList = new CopyOnWriteArrayList();
        copyOnWriteArrayList.addAll(notSafeList);

        //并发执行目标
        ConcurrentTarget copyOnWriteArrayListDemo =
                new ConcurrentTarget(copyOnWriteArrayList);
        for (int i = 0; i < 10; i++) {
            new Thread(copyOnWriteArrayListDemo, "线程" + i).start();
        }
        //主线程等待
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行之后发现UnsupportedOperationException 异常没有了。也就是说,使用CopyOnWriteArrayList 容器,可以在进行元素迭代的同时,又要进行元素添加操作。

2.2 CopyOnWriteArrayList 原理

所谓 CopyOnWrite(写时复制):就是在修改器(mutator)对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收。
CopyOnWriteArrayList 是写时复制思想的一种典型实现: 其含有一个指向操作内存的内部指针 array,而可变操作(add、set 等)是在 array 数组的副本上进行的。当元素需要被修改或者增加的时候,并不直接在 array 指向的原有数组上操作,而是首先对 array 进行一次拷贝,将修改的内容写入拷贝副本中。写完之后,再将内部指针 array 指向新的副本,这样就可以确保修改操作不会影响访问器(accessor)的读取操作了。CopyOnWriteArrayList 的原理,如图所示:


CopyOnWriteArrayList 的原理

CopyOnWriteArrayList 核心成员如下:

public class CopyOnWriteArrayList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;

    /** The lock protecting all mutators */
    /**
     * 对所有的修改器(mutator)方法进行保护,访问器(accessor)方法并不需要保护
     */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    /**
     * 内部对象数组,通过 getArray/setArray 方法去访问
     */
    private transient volatile Object[] array;

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    /**
     * 获取内部对象数组
     */
    final Object[] getArray() {
        return array;
    }

    /**
     * Sets the array.
     */
    /**
     * 设置内部对象数组
     */
    final void setArray(Object[] a) {
        array = a;
    }
}

2.3 CopyOnWriteArrayList 读取操作

访问器(accessor)的读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

   /** 操作内存的引用*/
    private transient volatile Object[] array;
    public E get(int index) {
        return get(getArray(), index);
    }
    //获取元素
    @SuppressWarnings("unchecked")
    private E get(Object[] a, int index) {
        return (E) a[index];
    }
    //返回操作内存
    final Object[] getArray() {
        return array;
    }

2.4 CopyOnWriteArrayList 写入操作

CopyOnWriteArrayList 的写入操作 add( )方法在执行的时候加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会 copy 出多个副本。

   /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 拷贝新数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

从 add 操作可以看出,在每次进行添加操作的时候,CopyOnWriteArrayList 底层都是重新 copy了一份数组,再往新的数组中添加数组,待添加完了,再将新的 array 引用指向新的数组。当 add操作完成后,array 的引用就已经指向另一个存储空间了。
那么既然每次添加元素的时候,都会重新复制一份新的数组,那就带来了一个问题,就是增加了内存的开销,如果容器的写操作比较频繁,那么其开销就比较大。所以,在实际应用的时候,CopyOnWriteArrayList 并不适合做添加操作。但是如果在并发场景下,迭代操作比较频繁,那CopyOnWriteArrayList 是个不错的选择。

2.5 CopyOnWriteArrayList 的迭代器实现

CopyOnWriteArray 有自己的迭代器,该迭代器不会检查修改状态,也无需检查状态。为什么呢?因为被迭代的 array 数组是可以说是只读的,不会有其他线程能够修改它。

   static final class COWIterator<E> implements ListIterator<E> {
        /** Snapshot of the array */
        /**对象数组的快照(snapshot)*/
        private final Object[] snapshot;
        /** Index of element to be returned by subsequent call to next.  */
        private int cursor;

        private COWIterator(Object[] elements, int initialCursor) {
            cursor = initialCursor;
            snapshot = elements;
        }

        public boolean hasNext() {
            return cursor < snapshot.length;
        }

        public boolean hasPrevious() {
            return cursor > 0;
        }

        @SuppressWarnings("unchecked")
        //下一个元素
        public E next() {
            if (! hasNext())
                throw new NoSuchElementException();
            return (E) snapshot[cursor++];
        }

        @SuppressWarnings("unchecked")
        public E previous() {
            if (! hasPrevious())
                throw new NoSuchElementException();
            return (E) snapshot[--cursor];
        }

        public int nextIndex() {
            return cursor;
        }

        public int previousIndex() {
            return cursor-1;
        }

        /**
         * Not supported. Always throws UnsupportedOperationException.
         * @throws UnsupportedOperationException always; {@code remove}
         *         is not supported by this iterator.
         */
        public void remove() {
            throw new UnsupportedOperationException();
        }

        /**
         * Not supported. Always throws UnsupportedOperationException.
         * @throws UnsupportedOperationException always; {@code set}
         *         is not supported by this iterator.
         */
        public void set(E e) {
            throw new UnsupportedOperationException();
        }

        /**
         * Not supported. Always throws UnsupportedOperationException.
         * @throws UnsupportedOperationException always; {@code add}
         *         is not supported by this iterator.
         */
        public void add(E e) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void forEachRemaining(Consumer<? super E> action) {
            Objects.requireNonNull(action);
            Object[] elements = snapshot;
            final int size = elements.length;
            for (int i = cursor; i < size; i++) {
                @SuppressWarnings("unchecked") E e = (E) elements[i];
                action.accept(e);
            }
            cursor = size;
        }
    }

迭代器的 snapshot(快照)成员,会在构造迭代器的时候,使用 CopyOnWriteArrayList 的 array成员去初始化,具体如下:

//获取迭代器
 public Iterator<E> iterator() {
 return new COWIterator<E>(getArray(), 0);
 }

 //返回操作内存
 final Object[] getArray() {
      return array;
 }

2.6 CopyOnWriteArrayList总结

CopyOnWriteArrayList 的优点
CopyOnWriteArrayList 有一个显著的优点,那就是读取、遍历操作不需要同步,速度会非常快。所以,CopyOnWriteArrayList 适用于读操作多、写操作相对较少的场景("读多写少"),比如可以在进行“黑名单”拦截时使用 CopyOnWriteArrayList。
CopyOnWriteArrayList 和 ReentrantReadWriteLock 的比较
CopyOnWriteArrayList 和 ReentrantReadWriteLock 读写锁的思想非常类似,读写锁的思想是读读共享、写写互斥、读写互斥、写读互斥。但是 CopyOnWriteArrayList 相比读写锁的又更进一步:为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,而且写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待,读操作的性能得到大幅度提升。

3 BlockingQueue分析

在 Java8 中,提供了 7 个阻塞队列

阻塞队列 介绍
ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序
DelayQueue 优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
LinkedTransferQueue 链表实现的无界阻塞队列
LinkedBlockingDeque 链表实现的双向阻塞队列

3.1 阻塞队列的操作方法

在阻塞队列中,提供了四种处理方式:
1. 插入操作

3.2 ArrayBlockingQueue 原理分析

3.2.1 构造方法

ArrayBlockingQueue 提供了三个构造方法,分别如下:
capacity: 表示数组的长度,也就是队列的长度。
fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

  public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //重入锁,出队和入队持有这一把锁
        lock = new ReentrantLock(fair);
        //初始化非空等待队列
        notEmpty = lock.newCondition();
        //初始化非满等待队列
        notFull =  lock.newCondition();
    }

   public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

3.2.2 Add方法

以 add 方法作为入口,在 add 方法中会调用父类的 add 方法,也就是 AbstractQueue.如果看源码看得比较多的话,一般这种写法都是调用父类的模版方法来解决通用性问题

public boolean add(E e) {
 return super.add(e);
}
   // 从父类的 add 方法可以看到,这里做了一个队列是否满了的判断,如果队列满了直接抛出一个异常
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
3.2.2.1 offer 方法

add 方法最终还是调用 offer 方法来添加数据,返回一个添加成功或者失败的布尔值反馈。
这段代码做了几个事情:

  1. 判断添加的数据是否为空
  2. 添加重入锁
  3. 判断队列长度,如果队列长度等于数组长度,表示满了直接返回 false
  4. 否则,直接调用 enqueue 将元素添加到队列中
    public boolean offer(E e) {
        //对请求数据做判断
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
3.2.2.2 enqueue方法

这个是最核心的逻辑,方法内部通过 putIndex 索引直接将元素添加到数组 items

   /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        //通过 putIndex 对数据赋值
        items[putIndex] = x;
        // 当putIndex 等于数组长度时,将 putIndex 重置为 0
        if (++putIndex == items.length)
            putIndex = 0;
        count++; //记录队列元素的个数
        //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
        notEmpty.signal();
    }

putIndex 为什么会在等于数组长度的时候重新设置为 0?
因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了。
下面这个图模拟了添加到不同长度的元素时,putIndex 的变化,当 putIndex 等于数组长度时,不可能让 putIndex 继续累加,否则会超出数组初始化的容量大小。同时还需要思考两个问题:

  1. 当元素满了以后是无法继续添加的,因为会报错
  2. 其次,队列中的元素肯定会有一个消费者线程通过 take或者其他方法来获取数据,而获取数据的同时元素也会从队列中移除


    image.png

3.2.3 put方法

put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下
它的实现逻辑:

   public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        /**
         * 这个也是获得锁,但是和 lock 的区别是,这个方法优先允许在等待时由其他线程调
         *  用等待线程的 interrupt 方法来中断等待直接返回。而 lock
         *  方法是尝试获得锁成功后才响应中断
         */
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                //队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
put

3.2.4 take方法

take 方法是一种阻塞获取队列中元素的方法它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //如果队列为空的情况下,直接通过 await 方法阻塞
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
take

如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素


image.png
3.2.4.1 dequeue 方法

这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端,takeIndex,是用来记录拿数据的索引值

  /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //默认获取 0 位置的元素
        E x = (E) items[takeIndex];
        //将该位置的元素设置为空
        items[takeIndex] = null;
        //这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据
        if (++takeIndex == items.length)
            takeIndex = 0;
        //记录 元素个数递减
        count--;
        if (itrs != null)
            //同时更新迭代器中的元素数据
            itrs.elementDequeued();
        //触发 因为队列满了以后导致的被阻塞的线程
        notFull.signal();
        return x;
    }

3.2.4.2 itrs.elementDequeued();

ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素

      /**
         * Called whenever an element has been dequeued (at takeIndex).
         */
        void elementDequeued() {
            // assert lock.getHoldCount() == 1;
            if (count == 0)
                queueIsEmpty();
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }

itrs.elementDequeued() 是用来更新迭代器中的元素数据的

3.2.4 remove方法

remove 方法是移除一个指定元素。看看它的实现代码

   public boolean remove(Object o) {
        if (o == null) return false;
        //获取数组元素
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        //获得锁
        lock.lock();
        try {
            //如果队列不为空
            if (count > 0) {
                //获取下一个要添加元素时的索引
                final int putIndex = this.putIndex;
                //获取当前要被移除的元素的索引
                int i = takeIndex;
                do {
                    //从takeIndex 下标开始,找到要被删除的元素
                    if (o.equals(items[i])) {
                        //移除指定元素
                        removeAt(i);
                        //返回执行结果
                        return true;
                    }
                    //当前删除索引执行加 1 后判断是否与数组长度相等
                    //若为 true,说明索引已到数组尽头,将 i 设置为 0
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex); //继续查找,直到找到最后一个元素
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

4 BlockingDeque分析

BlockingDeque定义了一个阻塞的双端队列接口,如下所示

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { 
    void putFirst(E e) throws InterruptedException; 
    void putLast(E e) throws InterruptedException; 
    E takeFirst() throws InterruptedException; 
    E takeLast() throws InterruptedException; 
    // ... 
}

该接口继承了BlockingQueue接口,同时增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque。
其核心数据结构如下所示,是一个双向链表。

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements
        BlockingDeque<E>, java.io.Serializable {
    static final class Node<E> {
        E item; Node<E> prev; // 双向链表的Node Node<E> next;
        Node(E x) {
            item = x;
        }
    }
    transient Node<E> first; // 队列的头和尾
    transient Node<E> last;
    private transient int count; // 元素个数
    private final int capacity; // 容量
    // 一把锁+两个条件
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.netCondition();
    private final Condition notFull = lock.newCondition();
    // ...
}

对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

5 ConcurrentLinkedQueue/Deque

AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。
首先,它是一个单向链表,定义如下:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements
        Queue<E>, java.io.Serializable {
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        //...
    }
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    //...
}

其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定后移一个位置,以保证head指向队列头部,tail指向链表尾部。但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作。下面进行详细分析:

5.1 初始化

初始的时候, head 和 tail 都指向一个 null 节点。对应的代码如下。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
image.png

5.2 入队列

代码如下所示:

   public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // 对tail的next指针而不是对tail指针执行CAS操作
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        // 每入队两个节点后移一次tail指针
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                // 已经到达队列尾部
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 后移p指针
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点。如下图所示:
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
step1: p=tail,q=p.next=NULL.
step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail方法不会执
行,直接返回。此时tail指针没有变化。


image.png

之后,假设线程2要入队item3节点,如下图所示:
step3: p=tail,q=p.next.
step4:q!=NULL,因此不会入队新节点。p,q都后移1位。
step5:q=NULL,对p的next执行CAS操作,入队item3节点。
step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。


image.png
总结出以下关键点:
  1. 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
  2. 只有当 p != tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。

5.3 出队列

上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?

 public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // 出队列的时候,并没有移动head指针,而是把item设置为null
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        // 每产生2个null节点,才把head指针后移两位
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3 三个节点。
step1:p=head,q=p.next.p!=q.
step2:后移p指针,使得p=q。
step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
step4:p!=head,此时队列中有了2个 NULL 节点,再前移1次head指针,对其执行updateHead操作。


image.png

总结:

  1. 出队列的判断并非观察 tail 指针的位置,而是依赖于 head 指针后续的节点是否为NULL这一条件。
  2. 只要对节点的item执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。

5.4 队列判空

因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较 head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示:

   public boolean isEmpty() {
        // 寻找第一个不是null的节点
        return first() == null;
    }
    Node<E> first() {
        restartFromHead:
        for (;;) {
            // 从head指针开始遍历,查找第一个不是null的节点
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

6. ConcurrentHashMap解析

Java并发编程之并发容器ConcurrentHashMap详解

7. ConcurrentSkipListMap/Set

ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。

7.1 ConcurrentSkipListMap

7.1.1 为什么要使用SkipList实现Map?

在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。
而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?
借用Doug Lea的原话:

The reason is that there are no known efficient lock0free insertion and deletion algorithms for search trees.

也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。
那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。

7.1.2 无锁链表

之前讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!
操作1:在节点10后面插入节点20。如下图所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。

操作1
操作2:删除节点10。如下图所示,只需把头节点的next指针,进行CAS操作到节点30即可。
操作2
但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如下图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。
image.png
为什么会出现这个问题呢
原因: 在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!
针对这个问题,在论文中提出了如下的解决办法,如下图所示,把节点 10 的删除分为两2步:
第一步,把节点10的next指针,mark成删除,即软删除;
第二步,找机会,物理删除。
做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10的next指针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1 个CAS操作里面完成!
image.png
具体的实现有两个办法:
办法一:AtomicMarkableReference
保证每个 next 是 AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
办法2:Mark节点
我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里面完成。

7.1.3 跳查表

解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的
下面先看一下跳查表的数据结构:

static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile Node<K, V> next;

        /**
         * Creates a new regular node.
         */
        Node(K key, Object value, Node<K, V> next) {
            this.key = key;
            this.value = value;
            this.next = next;
        }
        //...
    }

上图中的Node就是跳查表底层节点类型。所有的<K, V>对都是由这个单向链表串起来的。
上面的Index层的节点:

   static class Index<K,V> {
        final Node<K, V> node;
        final Index<K, V> down;
        volatile Index<K, V> right;

        /**
         * Creates index node with given values.
         */
        Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
            this.node = node;
            this.down = down;
            this.right = right;
        }
        //...
    }

上图中的node属性不存储实际数据,指向Node节点。
down属性:每个Index节点,必须有一个指针,指向其下一个Level对应的节点。
right属性:Index也组成单向链表。
整个ConcurrentSkipListMap就只需要记录顶层的head节点即可:

public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
        implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
    // ... 
    private transient Index<K,V> head; 
    // ... 
}
image.png

下面详细分析如何从跳查表上查找、插入和删除元素。

7.1.4 put实现分析

  private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    if (n != b.next)               // inconsistent read
                        break;
                    if ((v = n.value) == null) {   // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n) // b is deleted
                        break;
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }

                z = new Node<K,V>(key, value, n);
                if (!b.casNext(n, z))
                    break;         // restart if lost race to append to b
                break outer;
            }
        }

        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { // try to grow by one level
                level = max + 1; // hold in array and later pick the one to use
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }
            // find insertion points and splice in
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    if (r != null) {
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        if (!q.link(r, t))
                            break; // restart
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }

                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

在底层,节点按照从小到大的顺序排列,上面的index层间隔地串在一起,因为从小到大排列。查找的时候,从顶层index开始,自左往右、自上往下,形成图示的遍历曲线。假设要查找的元素是32,遍历过程如下:
先遍历第2层Index,发现在21的后面;
从21下降到第1层Index,从21往后遍历,发现在21和35之间;
从21下降到底层,从21往后遍历,最终发现在29和35之间。
在整个的查找过程中,范围不断缩小,最终定位到底层的两个元素之间。


image.png

在put代码中,通过findPredecessor找到了待插入的元素在[b,n]之间之后,并不能马上插入。因为其他线程也在操作这个链表,b、n都有可能被删除,所以在插入之前执行了一系列的检查逻辑,而这也正是无锁链表的复杂之处。

7.1.5 remove实现分析

   // 若找到了(key, value)就删除,并返回value;找不到就返回null
    final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    break outer;
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    findPredecessor(key, cmp);      // clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

上面的删除方法和插入方法的逻辑非常类似,因为无论是插入,还是删除,都要先找到元素的前驱,也就是定位到元素所在的区间[b,n]。在定位之后,执行下面几个步骤:

  1. 如果发现b、n已经被删除了,则执行对应的删除清理逻辑;
  2. 否则,如果没有找到待删除的(k, v),返回null;
  3. 如果找到了待删除的元素,也就是节点n,则把n的value置为null,同时在n的后面加上Marker节点,同时检查是否需要降低Index的层次。

7.1.6 get实现分析

private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }

无论是插入、删除,还是查找,都有相似的逻辑,都需要先定位到元素位置[b,n],然后判断b、n是否已经被删除,如果是,则需要执行相应的删除清理逻辑。这也正是无锁链表复杂的地方。

7.2 ConcurrentSkipListSet

如下面代码所示,ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。

public class ConcurrentSkipListSet<E>
        extends AbstractSet<E>
        implements NavigableSet<E>, Cloneable, java.io.Serializable {
    // 封装了一个ConcurrentSkipListMap
    private final ConcurrentNavigableMap<E,Object> m;
    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }
    public boolean add(E e) {
        return m.putIfAbsent(e, Boolean.TRUE) == null;
    }
    // ... 
}

以上内容就是对Java并发编程中并发容器的一些介绍,其中阻塞队列中还有很多并没有一一赘述了。

上一篇 下一篇

猜你喜欢

热点阅读