Java入门基础Java 杂谈

ConcurrentHashMap源码阅读笔记

2017-12-05  本文已影响16人  凯凯雄雄

HashMap是我们用的比较多的数据结构,但是它在高并发下面进行put操作时,很有可能会引起死循环,这主要是在它扩容的情况下,导致链表头尾可能存在重复节点,而这时候解决的办法有很多,如Hashtable和Collections.synchronizedMap(hashMap),但是这俩货的性能是存在缺陷的,因为都是锁整个对象。
这时候ConcurrentHashMap出现了,他很好的弥补了HashMap的并发缺陷,也兼顾了上两个方案的高性能读写。

Question :

相关概念介绍

// 数组节点 , 初始化是16 
transient volatile Node<K,V>[] table;

// 默认为null,扩容时新生成的数组,其大小为原数组的两倍。可以理解为为扩容所做的临时变量,临时用来做数据交换的,扩容完毕则设置为null
private transient volatile Node<K,V>[] nextTable;
  // 一个基础计数器,用于统计ConcurrentHashMap的计算次数
  private transient volatile long baseCount;
  /* 默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
      -1 代表table正在初始化
      -N 表示有N-1个线程正在进行扩容操作
      其余情况:
      1、如果table未初始化,表示table需要初始化的大小。
      2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。 
  */
 private transient volatile int sizeCtl;

// 扩容时候需要用到的下标计数值,需要通过cas去设置的下标值
 private transient volatile int transferIndex;

put 方法

 /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 通过Hash算法得到要存入Key的HashCode码
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化表格,初始化完成之后赋给tab,让下一次循环继续
                tab = initTable();
            // 判断内存中的对象是否为null,如果为空则新创建一个链表,把该对象作为首节点插入
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 通过原子性的修改查看值是否能够被插入成功,成功则结束循环
                //但是!!!! 如果不成功,不成功的可能性就是该节点的值发生了改变,一旦发生了改变,则需要重新比较。可能下一次就不是进入这个判断了,因为这个判断刚刚执行失败了,已经被初始化了
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 表示正在扩容的情况下,这里出现的场景是,正在扩容,将老的table数据迁移到新的table数据,而同时有线程在获取老的数据里面的值
            else if ((fh = f.hash) == MOVED)
                // 这里据说是为了未完成扩容的情况下,这里会帮助另一个线程加速扩容
                /**
                  这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已
                  经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的
                  transfer方法      可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。
                /*
                tab = helpTransfer(tab, f);
            else {
                // 能够进入到这里的情况有以下几种:
                // 1 它Hash到的下标链表已经有值了,有值了,也可能存在两个条件 
                //           1.存在重复的Hash值,需要覆盖,2.不存在重复的值,则需要将它添加到尾节点
                V oldVal = null;

                // 注意了,这里使用了synchronized , 
                //猜想是因为f是node链表,这里是为了防止这个链表在更新时出现数据不一致的问题.... 
                //这里也就是在插入的时候会进行链表的锁定,这时候就可以放心的对链表做操作了
                synchronized (f) {
                    // 通过CAS去获取内存中的node节点对象
                    if (tabAt(tab, i) == f) {
                        // fh是当前key的hashCode
                        if (fh >= 0) {
                            // 表示计数
                            binCount = 1;
                            // 下面是循环这个链表节点,取出链表中的hash码与当前key做比较
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 判断链表中的Hash码是否存在,存在则替换,不存在则添加到尾节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    // 如果存在,则将老的值取出来,作为返回出去的结果
                                    oldVal = e.val;
                                    // 在这个值为false的情况下,进行替换,表示是否覆盖
                                    if (!onlyIfAbsent)
                                        // 将新的值赋给这个链表
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //如果链表中不存在这个key相关的节点,则默认插入这个链表的尾部
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 这里会判断当前节点是否是Tree节点,
                        // 这一种情况会出现在链表大小达到8个的时候,会将node转化成TreeBin。
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }


// 初始化table的方法
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 
        while ((tab = table) == null || tab.length == 0) {
            // 如果当前sizeCtl标识小于0(-1表示正在初始化)时,则线程
            if ((sc = sizeCtl) < 0)
                // 表示让出CPU,处于就绪状态。
                Thread.yield(); // lost initialization race; just spin
             // compareAndSwapInt -> CAS 原子性操作,通过原子操作将当前表格设置为初始化
            //这个方法有四个参数,其中第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的valueOffset的值),
            //第三个参数为期待的值(这里默认为0),第四个为更新后的值(-1上面概念中提到-1表示table正在初始化)
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 这里会先判断table是否为null,因为害怕其他线程先一步已经创建好了.
                    if ((tab = table) == null || tab.length == 0) {
                        // 默认初始化table大小,DEFAULT_CAPACITY = 16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        // 构建一个上面指定的数组大小
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        // 将这个变量赋给一个全局变量,就是为了避免上面  if ((tab = table) == null)的情况
                        table = tab = nt;
                        // 这里会设定一个阀值,就是当前的0.75,可以这么理解                    
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 初始化完成之后.将这个阀值赋给全局变量
                    sizeCtl = sc;
                }
                break;
            }
        }
        // 返回创建的table
        return tab;
    }

    // 下面 4 个原子性操作
    // 获取内存中的地址
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
         //getObjectVolatile 获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义。
        // 第一个参数是读取节点对象
        // 第二个参数是内存中的偏移量,也就是说位置
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
      //compareAndSwapObject 
     
    /**
     * 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
     * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
     * 
     *  @param obj the object containing the field to modify.
     *    包含要修改field的对象 
     * @param offset the offset of the object field within <code>obj</code>.
     *         <code>obj</code>中object型field的偏移量
     * @param expect the expected value of the field.
     *               希望field中存在的值
     * @param update the new value of the field if it equals <code>expect</code>.
     *               如果期望值expect与field的当前值相同,设置filed的值为这个新值
     * @return true if the field was changed.
     *              如果field的值被更改
     */
 // public native boolean compareAndSwapObject(Object obj, long offset,Object expect,     Object update);

      static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        // 这里传入的第一个参数 是数组table
        // 第二个参数传入的是数组的位置下标
        // 第三个参数是节点本身对象
        // 第四个是期望更新后的节点对象
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

扩容方法的实现 :

 private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // 这里是否需要检测扩容,因为上面增加了一个值
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // s 表示当前数组大小.sizeCtl 表示达到阀值大小也就是初次的值 12 ,一旦满足扩容条件
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
               // 计算一个机器码
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 通过cas设置SIZECTL的值,一旦设置成功,则满足下列方法
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // 数据迁移
                    transfer(tab, null);
                // 计算总数
                s = sumCount();
            }
        }
    }



     // 数据迁移方法 , 也包括扩容
      private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        // 获得当前数组长度
        int n = tab.length, stride; 
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 表示扩容操作
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // n << 1 可以理解为当前数组长度的两倍递增
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  
                // 将新的数组传递
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                // 这里能够触发的情况是在下两个条件执行完成之后,会为i赋值一个默认的
                if (--i >= bound || finishing)
                    advance = false; // while不需要再循环了,已经得到了下标值了
                // 这里是表示已经到最后一个的标志
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 通过CAS去为这个TRANSFERINDEX变量赋值
                // TRANSFERINDEX 扩容后的大小值
                // nextBound 
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                     // 将下一个坐标值赋i,然外面的for循环根据这个下标去table中迁移数据
                    i = nextIndex - 1;
                    // 停止while的循环
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 这里有点绕.何时会满足这个条件?
                // 1. 当老数组全部数据迁移完毕之后,这时候会将finishing设置为true
                // 2.会执行一次数据检查,就是说再遍历一次.看是否还有没有迁移的值,直到检查完毕之后,则会满足这个条件,
                // 通俗一点来说,这个标记位表示所有迁移工作全部完成..
                if (finishing) {
                    // 将这个临时变量设置为null,下一次扩容再用
                    nextTable = null;
                    // 将新的数组赋值给老的
                    table = nextTab;
                    // 这里是设置新数组大小的阀值,比如扩容到32了,他的阀值是32 * 75% 则是扩容条件
                    // (n >>> 1) 理解为 0.75 ,总的理解就是上面的,实际上是32 - 8 ;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 当执行到最后一个节点完成之后,将SIZECTL设置为-1 表示正在初始化
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                   // 这里将i重新设置为老数组的长度,是为了检查是否还有没有需要提交的数据(PS:我也不是特别理解这一步的意义.. 重复检查 ? )
                    i = n; // recheck before commit
                }
            }
            // 获取table中的[i]下标链表,如果该链表为空,则给他赋予默认值
            else if ((f = tabAt(tab, i)) == null)
                // 如果获取到的节点链表为空的情况,那就好办了,直接赋值为null,
                //新的数组也不用迁移 , 需要注意的是赋值的null对象是一个自定义的ForwardingNode节点
                // 他使用这个节点的意义应该是能够快速标识出目前正处于扩容阶段
                // 其他线程如果也在执行扩容的话,如果标识出该链表为fwd类型的表示该链表已经迁移完成
                advance = casTabAt(tab, i, null, fwd);
            // 如果上面获取到的链表的Hash码为-1,表示已经处理过
            // 这里就表示取出来的链表节点为ForwardingNode节点,表示迁移完成
            else if ((fh = f.hash) == MOVED)
                // 这里是为了重读检查设置的,为null的节点,不做任何处理,只是为了检查一下
                advance = true; // already processed
            else {
                // 这里开始迁移数据了.用的还是同步,防止链表出现更改的情况
                synchronized (f) {
                    // 这里还是获取i的下标节点
                    if (tabAt(tab, i) == f) {
                        // 这俩变量是用来做数据迁移的
                        // ln表示不迁移的数据链表,hn表示迁移的数据链表
                        Node<K,V> ln, hn;
                        // hash码不为0的时候
                        if (fh >= 0) {
                            // 这里会将你的hashcode与老的数组大小做一次运算
                            // 这里的运算决定了你的数据是需要迁移
                            // 如果运算出来得到的值为0表示不迁移,如果不等于0 则默认迁移到新的数组那边去
                            // 举例 : 运算得到 16 这时候 i 是 15 ,因为不为0表示迁移到 16+15 = 31 的数组下标中去
                            int runBit = fh & n;
                            // 获取当前节点
                            Node<K,V> lastRun = f;
                            // 循环遍历当前节点的下一级节点
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                           // 这里就是为0的表示不迁移,还是重新放入到当前下标i中
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            // 不为0的时候
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // 这里类似于一个递归,循环获取下级节点,并且将这些节点进行分类(需要迁移的节点,不需要迁移的节点)
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                // 这里是分类的依据
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 这里开始重新设置值
                            // 设置不迁移的数据,还是在原来的数组下标中
                            setTabAt(nextTab, i, ln);
                            // 需要迁移的数据通过当前下标+原来数组大小得到最终存放的下标
                            setTabAt(nextTab, i + n, hn);
                            // 设置原来的节点数据为空
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 下面是红黑树结构的扩容
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

      // 计算当前数组中的总数
      final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

从看代码中衍生的问题

  1. 在写入的时候,我们会发现它最外层就是一个循环, 为什么就插入一个值也要用到一个循环呢?

     这就是为了防止多线程写入,在通过CAS插入值的时候,遇到失败的情况下,通过自旋的方式,一直尝试插入,直到成功为止。
    
  2. get方法里为什么需要用tabAt方法去读取table[i],而不是直接用table[i]?

     虽然table是用volatile方式修饰的,在多线程的环境之下都能保持可见,但table是一个数组。
     不能确保数组里面的节点内容也是最新的,也可能出现CPU缓存或者副本的情况,
     所以每次更新也是通过CAS去内存里面直接更新,获取也是直接从内存中直接获取..
    
  3. 为什么扩容一定要按照2倍的方式?

     这样做的好处就是方便数据迁移,也就是说在该下标值中的链表只要划分出一半的数据出去
     (其实就是说通过Key的hashCode运算为0的放入原来的位置,不等于0的划分到当前下标+老的数组长度的位置),
     不用做过多的复杂计算就能够完成扩容。
    
  4. 高并发下扩容是如何实现的?

     1. 在扩容的时候,会将当前链表进行锁定,这样可以避免HashMap中一旦满足扩容条件,多个线程都会出现扩容竞争的情况,
     而ConcurrentHashMap则是会让另一个线程帮助加速扩容这方面来,
     2. 为了保证链表的一致性,采用了cas和synchronized进行加锁的操作,保证每个链表都是原子性的操作.
     3.在进行老的table复制到新的table的时候,老的table会将已经清空链表设置为
     ForwardingNode对象,很巧妙的实现了节点的并发移动。当多个线程同时扩容的时候,
     只要发现有节点中有ForwardingNode对象表示正在扩容,
     则会加入到帮助扩容里面,而不是重新扩容,在已经扩容的基础上,再去帮助未复制的节点进行扩容.
    

解答

  1. 如何做到高性能写入?

     1. 借助使用CAS来实现非阻塞无锁的特点来实现线程安全的高效插入
     2. 基于链表的操作还是用了synchronized来保证线程安全,不过目前1.8的synchronized已经效率很高了.
     3. 其实也就是引入分段的概念.高并发下不会锁住整个table数组,而是单个链表的头节点,来保证安全,
    
  2. 如何避免HashMap的扩容引发的血案?

     1. 采用synchronized加锁来保证了链表节点的线程安全操作
     2. 并发下扩容,多个线程扩容,并不会重复的扩容。只会帮助它继续未完成扩容的节点,例如helpTransfer()方法。
        它利用ForwardingNode节点来标识当前链表是否已经迁移完毕,其他线程可以根据这个节点来帮助加速扩容。
    
上一篇下一篇

猜你喜欢

热点阅读