并发编程专题

(十二) ConcurrentHashMap分段锁设计原理

2021-09-27  本文已影响0人  跟着Mic学架构

内容目录

分段锁设计提高统计元素数量的性能

最后一个部分,也是ConcurrentHashMap中设计比较巧妙的地方。我们知道,当调用完put方法后,ConcurrentHashMap必须会增加当前元素的个数,方便在size()方法中获得存储的数据大小。代码的实现如下。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //省略部分代码....
    addCount(1L, binCount);
    return null;
}

很多读者可能会问,这块代码有什么好值得分析的呢?请大家思考一个问题,在常规的集合中,我们只需要一个全局int类型的字段去保存元素个数即可,每次添加一个元素,就对这个size变量+1。但是在ConcurrentHashMap集合中,需要保证对于该变量修改的线程安全性,那怎么设计呢?难道又用锁来实现?很显然,通过前面的分析,使用同步锁带来的性能开销太大,所以不适合。因此,在ConcurrentHashMap中,采用的是自旋锁和分段锁的设计。

size计数的基本原理分析

在ConcurrentHashMap中,采用两种方式来保存元素的个数。

private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;

元素个数累加的整体流程如下图所示。

在这里插入图片描述

addCount方法分析

addCount方法的完整代码如下,从整体结构来看,包含两个部分的逻辑。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

我们重点来分析下面这段增加元素个数的代码。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
}

上述代码的处理逻辑如下:

细心的读者不难发现,在if判断中,首先是执行(as = counterCells) != null,然后再尝试对baseCount做累加。这里有一个概率性的问题,就是当一个集合发生过并发时,后续发生并发的可能性会越高,这种思想在并发编程以及很多应用场景中都有体现。

fullAddCount方法分析

fullAddCount的主要功能有几个。

接下来,笔者分别针对这三个部分进行分析。

第一个部分,初始化CounterCell

CounterCell数组初始化处理不难理解。

private final void fullAddCount(long x, boolean wasUncontended) {
    //省略部分代码....
   for (;;) {
       CounterCell[] as; CounterCell a; int n; long v;
       if ((as = counterCells) != null && (n = as.length) > 0) {
           //省略部分代码....
       }
       else if (cellsBusy == 0 && counterCells == as &&
                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
           boolean init = false;
           try {                           // Initialize table
               if (counterCells == as) {
                   CounterCell[] rs = new CounterCell[2];
                   rs[h & 1] = new CounterCell(x);
                   counterCells = rs;
                   init = true;
               }
           } finally {
               cellsBusy = 0;
           }
           if (init)
               break;
       }
   }
}

第二个部分,增加元素个数

如果CounterCell数组已经初始化了,也存在两种情况。

第一种情况是通过(a = as[(n - 1) & h]) == null计算到某个数组下标存在null的对象时,那么直接把当前要增加的元素个数x保存到数组中的某一个对象中即可。

private final void fullAddCount(long x, boolean wasUncontended) {
    //省略部分代码....
   for (;;) {
       CounterCell[] as; CounterCell a; int n; long v;
       if ((as = counterCells) != null && (n = as.length) > 0) {
           if ((a = as[(n - 1) & h]) == null) {
               if (cellsBusy == 0) {            // Try to attach new Cell
                   CounterCell r = new CounterCell(x); // Optimistic create
                   if (cellsBusy == 0 &&
                       U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                       boolean created = false;
                       try {               //Recheck under lock
                           CounterCell[] rs; int m, j;
                           if ((rs = counterCells) != null &&
                               (m = rs.length) > 0 &&
                               rs[j = (m - 1) & h] == null) {
                               rs[j] = r;
                               created = true;
                           }
                       } finally {
                           cellsBusy = 0;
                       }
                       if (created)
                           break;
                       continue;           // Slot is now non-empty
                   }
               }
               collide = false;
           }
           //省略部分代码....
       }
       //省略部分代码....
   }
}

第二种情况,就是直接通过U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)操作,对CounterCell指定位置的元素进行累加。

private final void fullAddCount(long x, boolean wasUncontended) {
    //省略部分代码....
   for (;;) {
       //由于指定下标位置的cell值不为空,则直接通过cas进行原子累加,如果成功,则直接退出
       else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//
           break;
   }
  //省略部分代码....
}

第三个部分,CounterCell数组扩容

如果竞争比较激烈,也就是通过多次自旋(也就是线程无法满足上述的判断条件时),就会触发CounterCell的扩容。

private final void fullAddCount(long x, boolean wasUncontended) {
    //省略代码....
    else if (cellsBusy == 0 &&
             U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        try {
            if (counterCells == as) {// Expand table unless stale
                //扩容一倍 2变成4
                CounterCell[] rs = new CounterCell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                counterCells = rs;
            }
        } finally {
            cellsBusy = 0;//恢复标识
        }
        collide = false;
        continue;//继续下一次自旋
    }
    //省略部分代码....
}

size()方法结果汇总

基于前面的分析,基本上我们也能猜测出它的整体实现,无非就是把baseCountCounterCell保存的value进行累加即可。接下来我们看一下size()方法的定义如下。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

主要看一下sumCount这个方法,实现原理和我们猜想的一致。

我们发现,size()方法在计算总的元素个数时,并没有加锁,所以size()方法返回的元素个数不一定代表总的容量。

上一篇 下一篇

猜你喜欢

热点阅读