ConcurrentHashMap源码分析

2019-05-20  本文已影响0人  奔跑地蜗牛

前言

本文中的ConcurrentHashMap源码分析版本是jdk1.8;

问题

在对ConcurrentHashMap进行源码分析之前,首先需要确定几个问题,也就是ConcurrentHashMap相对HashMap来说需要解决的问题:

ConcurrentHashMap常量说明

要理解ConcurrentHashMap,首先需要对其常量和变量有一定认识,必须知道这是用来干什么的,什么场景下使用;

bin:箱子或者柜子,可以理解为装node的箱子,每个箱子里面挂着一个node的链表或者红黑树;

为什么是64呢?因为避免在扩容和转换成红黑树时的冲突,因为哈希表容量较小时是比较容易进行扩容操作的,如果一个bin中nodes比较多就转换成红黑树,那么发生扩容之后,又得转换成链表,会造成很多不必要的计算,所以为了平衡两者的冲突,需要确定一个数组容量阈值;

ConcurrentHashMap变量说明

ConcurrentHashMap方法解析

putVal()解析

putVal有三个参数K key,V value,boolean onlyIfAbsent,onlyIfAbsent默认为false,当putIfAbsent方法调用时则为ture;

  final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //将key的hashcode进行平铺,也就是为了避免hash值低位与数组长度n-1相与可能会有更多的hash collide,因此将hash值高16位和低16位先进行或运算
        int hash = spread(key.hashCode());
        int binCount = 0;//用来计算bin链表长度
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)//初始化
                tab = initTable();
            //将hash和数组长度n-1进行与运算找到bin的下标i,然后对该bin第一个Node进行判断
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果该node为空,则cas添加一个新node,成功则退出循环,否则继续;
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//如果node的hash为moved,那么表示该bin正在扩容中,则协助扩容
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)//判断bin头node如果不能替换且old key和new key一致,则直接退出
                return fv;
            else {//如果bin header node可以替换或者不等于key不一致
                V oldVal = null;
                synchronized (f) {//利用synchronized方法将header node加锁,避免竞争造成一致性
                    if (tabAt(tab, i) == f) {//如果header node尚未没有发生变化,避免另有线程更新该header node
                        if (fh >= 0) {//hash值大于零表示,该node是普通node
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//从header node开始迭代找到key,hash值一致的node进行替换,找不到则在tail添加新node
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//如果header node是红黑树TreeBin
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)//保留节点一般情况不太可能出现
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {//如果binCount大于零即bin链表长度大于零
                    if (binCount >= TREEIFY_THRESHOLD)//判断是否大于红黑树转化阈值
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//添加到高效计数器,另外根据binCount判断是否需要协助扩容或者红黑树转换
        return null;
    }

transfer方法说明

transfer方法其实划分为3个阶段:

  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
         //计算transfer任务步长stride,最小为16,多线程情况下为n/(8*NCPU)        
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // nextTab初始化
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;//长度增加两倍后复制给nextTable
            transferIndex = n;//一开始transferIndex等于原数组末尾index+1处
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//转移节点
        boolean advance = true;//表示是否还能继续执行申请transfer任务
        boolean finishing = false; // 在提交nextTable之前用来保证所有transfer任务都已经完成
        for (int i = 0, bound = 0;;) {//i表示当前transfer任务初始化开始bin位置,bound表示当前transfer任务初始化结束bin的下标;
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;//定义下一个transfer任务开始bin的下标和结束bin的下标;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {//如果nextIndex已经不大于零,那么说明transfer任务已经结束;
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSetInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {//尝试申请下一个transfer任务
                    //申请成功,则将bound赋值开始和结束bin的下标位置(i,bound)
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //i<0表示transfer任务已经结束,i大于n表示存在新的扩容任务,
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//执行本轮扩容的收尾工作
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);//将原数组长度扩大两倍-减去一半即是新数组长度的0.75;
                    return;
                }
                if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//尝试将sizeCtl减1,表明需要减少一个活跃扩容线程
                   //sc-2如果除了扩容戳以外的低位都是零的话,那么表示当前线程是否是最后一个扩容线程,如果是直接返回
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit//重新检查先是否任务分配的hash bins都已经转移完毕
                }
            }
            else if ((f = tabAt(tab, i)) == null)//如果当前bin为null,则直接放置一个forwardnode表明是扩容转移
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)//如果当前bin是forwardnode,那么表示出现了扩容重叠,需要重新申请transfer任务
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            //n=2^k,表示第k位为1,而对于数组长度为n,hash&n时第k位为零说明是新数组低位bin,而第k为1则说明是新数组高位bin;
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {//找到最后一个node
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {//根据lastRun生成node链
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {//按照红黑树的方式转移bin
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

resizeStamp(n)解析

resizeStamp方法是用来生成扩容戳的,对于不同数组长度n,则扩容戳不一样,同时每个扩容戳需要容纳最大为MAX_RESIZERS=(1 << (32 - RESIZE_STAMP_BITS)) - 1的活跃扩容线程,那么sizeCtl-2=resizeStamp(n)<<RESIZE_STAMP_BITS,就应该要能表示这三组信息:负值,表示不同数组长度的扩容戳和活跃线程数;
先看下源码:

  static final int resizeStamp(int n) {
       //获取数组长度n前面零的个数一般为32-log(n),那么RESIZE_STAMP_BITS为扩容戳需要的位数
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

因为RESIZE_STAMP_BITS需要能展示出当前数组扩容次数,而数组可扩容次数最大值为32-log(n),所以RESIZE_STAMP_BITS>32>2^5,所以RESIZE_STAMP_BITS至少为6位;
则假设n=2^k,那么经过resizeStamp之后得到的rs如下图:


rs.png

;

经过左移RESIZE_STAMP_SHIFT后如下图:


sizeCtl.png

如上图所示最高位为1表示负数,扩容戳的后五位表示当前扩容次数,32-扩容戳所占位数即可表示活跃扩容线程数,活跃扩容线程数为1时表示初始化,为2时表示当前扩容线程数为1;

未完待续.....

上一篇下一篇

猜你喜欢

热点阅读