并发编程专题

(十一) 深度分析ConcurrentHashMap中的并发扩容

2021-09-27  本文已影响0人  跟着Mic学架构

内容目录

深度分析ConcurrentHashMap中的并发扩容机制

说到扩容,相比各位读者都不陌生,无非就是创建一个扩容目标大小的数组,把原来老数组中的数据迁移到新数组中来即可,这种方式比较适合在没有多线程并发的场景中完成,但是在ConcurrentHashMap中并没有那么简单,因为在多线程环境下进行扩容时,会存在其他线程同时往集合中添加元素。

可能有些读者会想,这个很简单,把整个扩容过程加一把同步锁,保证扩容过程中不存在其他线程对数据进行操作。很显然,这种方式对性能的损耗非常大,特别是如果涉及到数据量比较多的扩容时,会导致非常多的线程被阻塞。

ConcurrentHashMap中扩容部分的设计非常巧妙,它通过使用CAS机制实现无锁的并发同步策略,同时对于同步锁synchronized,也只把粒度控制到了单个数据节点做数据迁移的这个范围,并且利用多个线程来进行并行扩容,大大提高了数据迁移的效率。

多线程并发扩容原理图解

首先,如下图所示,通过一个简略图来整体了解一下并发扩容是怎么一回事,当存在多个线程并行进行扩容以及数据迁移时,默认情况下会给每个线程分配一个区间,这个区间默认值是16。每个线程负责自己区间的数据迁移工作。需要注意的是,在下图所示中有一个transferIndex的属性,这个是一个转移索引,如果当前只有两个线程要对64位长度的数组做数据迁移,意味着每个线程需要做多次迁移,而这个过程就依赖于transferIndex来更新每个线程迁移的数据区间。

在这里插入图片描述

transfer数据迁移

transfer这个方法的代码非常多,代码如下。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

为了更清晰的理解transfer方法的代码,我们把它分成五个部分去解读。

第一个部分,创建扩容后的数组

这部分代码主要做两个事情。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
}
第二个部分,数据迁移区间计算

这部分代码,通过while(advance)循环计算每个线程需要进行数据迁移的数组区间。笔者在前面提到过,如果根据数组长度计算出来的每个CPU处理的区间数小于16的情况下,会设置默认的区间是16,假设数组长度是64,但是只有两个线程在并行做数据迁移时,那这两个线程就需要执行多次区间迁移。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    //省略部分代码....
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        //省略部分代码....
    }
}

上面这段代码,有一些关键的东西需要简单分析一下。

第三个部分,更新扩容标记

这部分主要是判断逻辑,有两个点。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    //省略部分代码....
    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
    else if ((fh = f.hash) == MOVED)
        advance = true; // already processed
    //省略部分代码....
}
第四个部分,开始数据迁移和扩容

这部分内容就是真正实现数据迁移的逻辑,代码比较长,从大的层面来说就两块。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    //省略部分代码....
    for (int i = 0, bound = 0;;) {
        //省略部分代码....
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                Node<K,V> ln, hn;
                if (fh >= 0) {
                    int runBit = fh & n;
                    Node<K,V> lastRun = f;
                    for (Node<K,V> p = f.next; p != null; p = p.next) {
                        int b = p.hash & n;
                        if (b != runBit) {
                            runBit = b;
                            lastRun = p;
                        }
                    }
                    if (runBit == 0) {
                        ln = lastRun;
                        hn = null;
                    }
                    else {
                        hn = lastRun;
                        ln = null;
                    }
                    for (Node<K,V> p = f; p != lastRun; p = p.next) {
                        int ph = p.hash; K pk = p.key; V pv = p.val;
                        if ((ph & n) == 0)
                            ln = new Node<K,V>(ph, pk, pv, ln);
                        else
                            hn = new Node<K,V>(ph, pk, pv, hn);
                    }
                    setTabAt(nextTab, i, ln);
                    setTabAt(nextTab, i + n, hn);
                    setTabAt(tab, i, fwd);
                    advance = true;
                }
                else if (f instanceof TreeBin) {
                    //如果当前节点是红黑树,则按照红黑树的处理逻辑进行迁移。
                }
            }
        }
    }
}

上述代码其实也包含一个比较有意思的设计,就是用到了高低位整体迁移的方式,来提升迁移效率,在分析上述代码之前,先来了解一下什么是高低位迁移。

假设存在这样一个数据存储的结构,如下图所示,在数组下标为4的位置,存在一条由链表组成的节点,其中节点上这些数字表示的是key对应的hash码。

在这里插入图片描述

上述这些hash值4、20、52、68、84、100,他们是怎么计算并且放在数组下标4的位置呢?我们回到putVal方法上可以看到,当前key是通过这个方法tabAt(tab, i = (n - 1) & hash)去数组中查找的,关键的逻辑是(n-1)&hash

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //省略代码....
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //查找逻辑
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //省略代码....
    }
    //省略代码....
}

我们仔细观察(n-1)&hash这个逻辑,它有一个动态变化的因素n(数组长度),也就是说,随着n的值的变化,原本存储在数组下标4位置的key,在扩容之后计算的下标位置也会变化。

举例来说,在如上图所示的链表中,4、20、52、68、84、100这些hash值,在数组长度为16位的情况下,通过(n-1)&hash得到的下标位置都是4。但是当数组长度扩容到32位时,再通过(n-1)&hash来计算,发现20、52、84这三个hash值对应的下标位置都变成了20,其他值4、68、100计算得到的数组下标位置仍然是4。这就意味着,由一个链表组成的节点中,有可能存在一部分节点在扩容后不需要迁移,一部分节点在扩容后需要迁移的情况。

因此,所谓的高低位迁移,表示的就是上述这种情况,而所谓的低位就是指不需要迁移的元素、高位是表示需要迁移的元素。

继续回到transfer代码的高低位迁移逻辑中来,这里有一个比较有意思的设计,就是通过一定的规则计算出两条链ln(低位链)hn(高位链),然后把这两条链表一次性迁移到新的数组中,这样的方式减少了数据迁移次数。

if (tabAt(tab, i) == f) {
    Node<K,V> ln, hn;
    if (fh >= 0) {
        int runBit = fh & n;
        Node<K,V> lastRun = f;
        for (Node<K,V> p = f.next; p != null; p = p.next) {
            int b = p.hash & n;
            if (b != runBit) {
                runBit = b;
                lastRun = p;
            }
        }
        if (runBit == 0) {
            ln = lastRun;
            hn = null;
        }
        else {
            hn = lastRun;
            ln = null;
        }
        for (Node<K,V> p = f; p != lastRun; p = p.next) {
            int ph = p.hash; K pk = p.key; V pv = p.val;
            if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
            else
                hn = new Node<K,V>(ph, pk, pv, hn);
        }
        setTabAt(nextTab, i, ln);
        setTabAt(nextTab, i + n, hn);
        setTabAt(tab, i, fwd);
        advance = true;
    }

上述代码中,主要分析一下高低位链路的计算方法:

在这里插入图片描述

至此,就完成了扩容以及基于链表结构下的数据迁移工作,整体原理如下图所示。

在这里插入图片描述
第五部分,完成迁移后的判断

在transfer方法中还有一部分代码,就是用来判断是否完成扩容,以及扩容完成之后的后置处理,代码如下。

if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

这部分代码有两个逻辑。

上一篇 下一篇

猜你喜欢

热点阅读