ConcurrentHashMap 1.8 计算 size 的方

2020-06-07  本文已影响0人  M_lear

一、认识相关字段

相关字段,

    // 两种情况
    // 1. counterCells 数组未初始化,在没有线程争用时,将 size 的变化写入此字段
    // 2. 初始化 counterCells 数组时,没有获取到 cellsBusy 锁,会再次尝试将 size 的变化写入此字段
    private transient volatile long baseCount;
    
    // 用于同步 counterCells 数组结构修改的乐观锁资源
    private transient volatile int cellsBusy;

    // counterCells 数组一旦初始化,size 的变化将不再尝试写入 baseCount
    // 可以将 size 的变化写入数组中的任意元素
    // 可扩容,长度保持为 2 的幂
    private transient volatile CounterCell[] counterCells;

其中,CounterCell 是 ConcurrentHashMap 的一个静态内部类。

    /* ---------------- Counter support -------------- */

    /**
     * A padded cell for distributing counts.  Adapted from LongAdder
     * and Striped64.  See their internal docs for explanation.
     */
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

二、计算 size 的源码分析

计算 size 的方法调用链:size() -> sumCount(),

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n); // 将 n 裁剪到 [0, Integer.MAX_VALUE] 内
    }
    
    // 计算 baseCount 字段与所有 counterCells 数组的非空元素的和
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

可以看到,map 中键值对的个数通过求 baseCount 与 counterCells 非空元素的和得到。

那么现在的问题就是 baseCount 和 counterCells 里的值都是什么时候计算的呢?

ConcurrentHashMap 内部所有改变键值对个数的方法都会调用 addCount 方法更新键值对的计数。
addCount 方法源码,

    // 参数 x 表示键值对个数的变化值,如果为正,表示新增了元素,如果为负,表示删除了元素
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 如果 counterCells 为空,则直接尝试通过 CAS 将 x 累加到 baseCount 中
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            // counterCells 非空
            // 或 counterCells 为空,但 CAS baseCount 失败都会来到这里
            CounterCell a; long v; int m;
            boolean uncontended = true; // CAS 数组元素时,有没有发生线程争用的标志
            // 如果当前线程探针哈希到的数组元素非空,则尝试将 x 累加到对应数组元素
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // counterCells 为空,或其长度小于1
                // 或当前线程探针哈希到的数组元素为空
                // 或当前线程探针哈希到的数组元素非空,但 CAS 数组元素失败
                // 都会调用 fullAddCount 方法来完成 x 的写入
                fullAddCount(x, uncontended);
                return; // 如果调用过 fullAddCount,则当前线程一定不会协助扩容
            }
            // 走到这说明,CAS 数组元素成功
            // 此时如果 check <= 1,也不协助可能会发生的扩容
            if (check <= 1)
                return;
            // 如果 check 大于 1,则计算当前 map 的 size,为判断是否需要扩容做准备
            s = sumCount();
        }
        // size 的变化已经写入完成
        // 后面如果 check >= 0,则判断当前的 size 是否会触发扩容
        if (check >= 0) {
            // 扩容相关的逻辑
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

如果不管其中与扩容有关的逻辑,addCount 方法记录 size 变化的过程可以分为两类情况,

  1. counterCells 数组未初始化
    a. CAS 一次 baseCount
    b. 如果 CAS 失败,则调用 fullAddCount 方法

  2. counterCells 数组已初始化
    a. CAS 一次当前线程探针哈希到的数组元素
    b. 如果 CAS 失败,则调用 fullAddCount 方法

fullAddCount 方法的源码,

    // 只被 addCount 方法调用
    // 如果 counterCells 数组未初始化
    // 或者线程哈希到的 counterCells 数组元素未初始化
    // 或者 CAS 数组元素失败,都会调用此方法
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        // 判断线程探针哈希值是否初始化
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true; // 重新假设未发生争用
        }
        boolean collide = false;                // 是否要给 counterCells 扩容的标志
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                // 数组不为空且长度大于 0
                if ((a = as[(n - 1) & h]) == null) {
                    // 尝试初始化线程探针哈希到的数组元素
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        // 注意,这里已经把 x 放入对象
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 && // 准备初始化数组元素,要求 cellsBusy 为 0,并尝试将其置 1
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            // 获得 cellsBusy 锁
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                // 判断有没有被其它线程初始化
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0; // 释放 cellsBusy 锁
                            }
                            if (created) // 初始化元素成功,直接退出循环
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash(指的是更改当前线程的探针哈希值)
                // wasUncontended 为 true 执行到这
                // 尝试将 x 累加进数组元素
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                // CAS 失败
                // 判断 counterCells 是否正在扩容,或数组长度是否大于等于处理器数
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                // 如果数组没有在扩容,且数组长度小于处理器数
                // 此时,如果 collide 为 false,则把它变成 true
                // 在下一轮循环中,如果 CAS 数组元素继续失败,就会触发 counterCells 扩容
                else if (!collide)
                    collide = true;
                // 如果 collide 为 true,则尝试给 counterCells 数组扩容
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h); // 更改当前线程的探针哈希值
            }
            // counterCells 数组为空或长度为 0
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                // 获取 cellsBusy 锁
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2]; // 初始长度为 2
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // counterCells 数组为空或长度为 0,并且获取 cellsBusy 锁失败
            // 则会再次尝试将 x 累加到 baseCount
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        } // end for
    }

这个方法细节较多,比较复杂。
细节方面请参考上面的源码和注释。
下面,我们只从整体上看它实现了哪些功能,

  1. 线程探针哈希值的初始化。
  2. counterCells 数组的初始化和扩容。
  3. counterCells 元素的初始化。
  4. 将 size 的变化,写入 counterCells 中的某一个元素。(如果 counterCells 初始化时,获取锁失败,则还会尝试将 size 的变化,写入 baseCount。)

三、小小总结

代码虽然看起来很复杂,但 Doug Lea 计算 size 的思想很明确,也很巧妙。

指导思想: 尽量降低线程冲突,以最快的速度写入 size 的变化。

如何降低冲突?
如果没有冲突发生,只将 size 的变化写入 baseCount。
一旦发生冲突,就用一个数组(counterCells)来存储后续所有 size 的变化。这样,线程只要对任意一个数组元素写入 size 变化成功即可,数组长度越长,线程发生冲突的可能性就越小。

关于 counterCells 扩容:
如果 CAS 数组元素连续失败两次,就会进行 counterCells 数组的扩容,直到达到机器的处理器数为止。
比如我的机器是双核四线程,真正能并行的线程数是 4,所以在我机器上 counterCells 初始化后,最多扩容一次。

关于线程的探针哈希值是如何初始化和更改的,可以参考:关于 ConcurrentHashMap 1.8 中的线程探针哈希

上一篇下一篇

猜你喜欢

热点阅读