ConcurrentHashMap解析三(transfer方法解

2018-09-26  本文已影响0人  代码potty

在之前计数方法addCount()方法中,它有两部分内容,一个是计数另一个是扩容,在扩容语句中有这样一句:

    transfer(tab, null);

这句话表示,当第一个线程执行扩容操作的时候,会向transfer()传入现在的表和一个空对象,好,我们一步步来看下这个方法的代码。

第一部分代码

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            //stride为每个cpu所需要处理的桶个数
            int n = tab.length, stride;
            //将 n / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
            //这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
                //新的 table 尚未初始化
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    //扩容两倍
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                //如果扩容出现错误,则sizeCtl赋值为int最大值返回
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                //transferIndex代表的是旧数组的尾节点
                transferIndex = n;
            }

首先将旧表长度赋值给n,然后另外申请了一个变量stride,这个变量表示的是每个CPU所需要处理的桶的个数,代码中的第一个判断语句可以看出,stride最小值为16,也就是说,当桶的个数小于16的时候,默认一个线程来处理所有的桶。
第二条判断语句是对nextTab的判断,一开始的时候,我讲到,在addCount()方法中为该参数传入了一个Null值,说明table尚未初始化,需要进行初始化,容量直接申请为旧表的两倍,如果在扩容中出现错误,那么sizeCtl赋值为int最大值返回,然后需要注意的是transferIndex表示的迁移数据的下标,一开始的下标指向旧表的尾节点。

第二部分代码

        int nextn = nextTab.length;
            //创建一个标识类用于占位,当其他线程扫描到这个类的时候会跳过
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            //数组一层层推进的标识符
            boolean advance = true;
            //扩容结束的标识符
            boolean finishing = false; // to ensure sweep before committing nextTab
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                //每个线程进入这里,先获取自己需要处理桶的区间,第一次进入因为--i,会直接跳到else if中,对nextIndex进行赋值操作
                //这里设置了一个i = -1,
              // 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
                while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing)
                    //防止在没有成功处理一个桶的情况下却进行了推进
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        //防止在没有成功处理一个桶的情况下却进行了推进
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;//线程负责桶区间当前最小下标
                        i = nextIndex - 1;//线程负责桶区间当前最大下标
                        advance = false;
                    }
                }
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {//如果完成扩容
                        nextTable = null;// 删除成员变量
                        table = nextTab;// 更新 table
                        sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
                        return;// 结束方法。
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;//扩容结束
                        i = n; // 再次循环检查一次表
                    }
                }
                //获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
                else if ((f = tabAt(tab, i)) == null)
                    //如果写进fwd,则推进
                    advance = casTabAt(tab, i, null, fwd);
                    //如果当前位置不是Null,且hash值为-1,说明其他线程已经处理过这个桶,继续推进
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed

第二部分代码主要是每个CPU负责的桶区间进行规划,然后进行不同情况的判定工作,介绍下关键的几个量
1、fwd:这个类是个标识类,用于指向新表用的,其他线程遇到这个类会主动跳过这个类,因为这个类要么就是扩容迁移正在进行,要么就是已经完成扩容迁移,也就是这个类要保证线程安全,再进行操作。
2、advance:这个变量是用于提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识
3、finishing:这个变量用于提示扩容是否结束用的

首先使用一个while(advance)循环出每个进入该循环的线程所要负责的桶的区间,再判断扩容是否结束,如果扩容结束,清空临死变量,更新 table 变量,更新库容阈值。如果没完成,但已经无法领取区间(没了),该线程退出该方法,并将 sizeCtl 减一,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。
接着判断当前位置是否为空,为空则插入cas方法插入fwd,如果插入fwd成功继续推进
如果当前位置不为空的情况下,hash值等于-1,表示当前桶已经完成扩容和数据迁移操作

第三部分代码

          else {
                //锁住首节点
                    synchronized (f) {
                    //二次判断地址偏移量所指向位置是否与f对象相等
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            //fh>0为链表数据转移
                            if (fh >= 0) {
                                //首节点的hash值
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;//最后一个节点
                                //这个地方跟hashmap不同,hashmap是直接推进到链表尾
                                //这个地方的处理在于想保留链表后所有hash值计算相同的点,这些点可以重复利用,不需要重新new
                                //所以下边需要获取哪个节点后的hash值全部相同
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                //如果runBit==0,说明低位重用
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                //高位重用
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                    //注意创建node节点的最后一个参数ln指代的是next
                                    //也就是说,我们不再是从头到尾节点,而是从为节点开始向头结点走
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                //将遍历好的链表一放入i中
                                setTabAt(nextTab, i, ln);
                                //将遍历好的链表二放入i+n中
                                setTabAt(nextTab, i + n, hn);
                                //将fwd占位放入旧表中
                                setTabAt(tab, i, fwd);
                                //向前推进
                                advance = true;
                            }
                            //如果是红黑树
                            else if (f instanceof TreeBin) {
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                //遍历,将链表转成双端链表
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                //如果长度小于等于6,则将红黑树转换成链表
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }

第三部分代码跟hashmap中的类似,也是使用synchronized来锁定头结点,然后根据链表和红黑树分开进行判断数据迁移,如果是链表,ConcurrentHashmap比hashmap多了一个步骤,使用runBit来记录头结点hash值,然后遍历链表一个个跟runBit进行比较,runBit只有两种可能,一种是为0,另一种是为1,也就是说,如果为0,则低位上可以有很多重用的节点,如果为1则表示高位上有很多重用的节点,用lastRun来记录某个节点后runBit不变,也就是lastRun节点后的节点都是重用的,然后遍历lastRun之前的节点,这个地方也不同,hashmap是从头通过node的next属性进行从头往后添加节点,而ConcurrentHashmap是从尾开始往前添加节点,直到添加到首节点,然后通过setTabAt()方法添加到新表的桶中,最后在旧表放入一个fwd占位符。
红黑树跟链表也差不多,分出两个红黑树,一个在高位一个在低位,分出来的红黑树再进行长度判断,长度小于等于6的会通过方法untreeify()转换成链表结构存储。

参考链接:
https://www.jianshu.com/p/2829fe36a8dd
https://blog.csdn.net/u011392897/article/details/60479937
https://www.cnblogs.com/stateis0/p/9062095.html

上一篇 下一篇

猜你喜欢

热点阅读