并发编程专题

(十)深度剖析ConcurrentHashMap原理及源码

2021-09-26  本文已影响0人  跟着Mic学架构

ConcurrentHashMap

ConcurrentHashMap是JDK1.5引入的一个并发安全且高效的HashMap,简单来说,我们可以认为它在HashMap的基础上增加了线程安全性的保障。实际上,关于HashMap的线程安全问题,各位读者应该有一些了解,在JDK1.7的版本中,HashMap采用的是数组+链表的数据结构来存储数据,在多个线程并发执行扩容时,可能造成环形链导致死循环和数据丢失的情况;在JDK1.8中,HashMap采用数组+链表+红黑树的数据结构来存储数据,优化了1.7版本中数据扩容的方案解决了死循环和数据丢失的问题,但是在并发场景下调用put方法时,有可能会存在数据覆盖的问题。

为了解决线程安全问题带来的影响,我们可以采用一些具备线程安全性的集合,比如HashTable,它使用了synchronized关键字来保证线程安全性;还有Collections.synchronizedMap,它可以把一个线程不安全的Map,通过synchronized互斥锁的方式来达到安全性。但是这些方法都有一个问题,就是线程竞争比较激烈的情况下,效率都非常低。原因是他们都是以及方法层面使用synchronized实现的锁的机制,导致所有线程在操作数据时,不管是put还是get都需要去竞争同一把锁。

各位读者看过前面章节中分析的synchronized就应该明白,性能和安全性这两者只能做好平衡,无法两者都满足,而ConcurrentHashMap在性能和安全性方面的设计和实现非常的巧妙,它既能保证线程安全性,在性能方面也远远优于HashTable等集合。

读者需要注意,笔者是基于JDK1.8版本来分析ConcurrentHashMap,之所以要强调是因为不同的JDK版本在实现上有差异。

正确理解ConcurrentHashMap的线程安全性

ConcurrentHashMap本身就是一个HashMap,因此在实际应用上,只需要考虑到当前场景是否存在多线程并发访问同一个Map实例,如果存在,则采用ConcurrentHashMap。但是,各位读者需要注意的是,ConcurrentHashMap的线程安全特性,只是保证多线程并发执行操作时,容器中的数据不会被破坏,但是对于涉及到多个线程的复合操作,ConcurrentHashMap无法保证业务行为的正确性。

举个例子,假设我们需要通过一个ConcurrentHashMap来记录每个用户的访问次数,如果针对指定用户已经有访问次数的记录,则进行递增,否则,则添加一个新的访问记录,代码如下。

private static final ConcurrentMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
public static void main(String[] args) throws InterruptedException {
    Long accessCount=USER_ACCESS_COUNT.get("mic");
    if(accessCount==null){
        USER_ACCESS_COUNT.put("mic",1L);
    }else{
        USER_ACCESS_COUNT.put("mic",accessCount+1);
    }
}

上述代码在多线程并发调用时,会存在线程安全问题,虽然ConcurrentHashMap对于数据操作本身是安全的,但是在上述代码中是一个复合操作,也就是读-修改-写,而这个三个操作不是原子的,所以当多个线程访问同一个用户mic时,很可能会覆盖相互操作的结果,造成记录的次数少于实际记录。

因此笔者想在这里说明的一个点是,虽然ConcurrentHashMap是线程安全的,但是对于ConcurrentHashMap的复合操作行为,需要我们去关注。当然,上述问题其实解决的方案有很多,比如我们针对这个复合操作进行加锁,但是ConcurrentHashMap提供了另外一个解决办法,就是使用ConcurrentMap接口定义的方法来做。

ConcurrentMap是一个可以支持并发访问的Map集合,相当于在原本的Map集合上新增了一些方法来扩展原有Map的功能,而ConcurrentHashMap实现了ConcurrentMap接口。

public interface ConcurrentMap<K, V> extends Map<K, V> {

    V putIfAbsent(K key, V value); 
    boolean remove(Object key, Object value);   
    boolean replace(K key, V oldValue, V newValue); 
    V replace(K key, V value);   
    //此处省略JDK1.8中的default方法
}

ConcurrentMap接口定义的四个方法都满足原子性,可以用在对ConcurrentHashMap的复合操作场景中,方法说明如下:

通过ConcurrentMap提供的这些方法,我们可以针对前面的代码进行线程安全性改造如下。

private static final ConcurrentMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
public static void main(String[] args) throws InterruptedException {
    while(true) {
        Long accessCount = USER_ACCESS_COUNT.get("mic");
        if (accessCount == null) {
            if(USER_ACCESS_COUNT.putIfAbsent("mic", 1L)==null){
                break;
            }
        } else {
            if(USER_ACCESS_COUNT.replace("mic", accessCount, accessCount + 1)){
                break;
            }
        }
    }
}

代码量看起来多了一些,主要是改造了原本的put方法,针对于第一次添加使用putIfAbsent,对于已经存在的数据的修改使用replace方法,由于这两个方法都能保证原子性,所以能够避免多线程并发的影响。同时,增加了一个while(true)实现一个类似自旋的操作,保证本次操作的成功执行。

另外,在JDK1.8中,ConcurrentMap引入了一些支持lambda表达式的原子操作,源码如下。

public interface ConcurrentMap<K, V> extends Map<K, V> {
    default V computeIfAbsent(K key,Function<? super K, ? extends V> mappingFunction) 
    default V computeIfPresent(K key,BiFunction<? super K, ? super V, ? extends V> remappingFunction)
    default V compute(K key,BiFunction<? super K, ? super V, ? extends V> remappingFunction)
    default V merge(K key, V value,BiFunction<? super V, ? super V, ? extends V> remappingFunction)
}

上述几个方法都是JDK1.8引入的default方法,这些方法的作用说明如下:

computeIfAbsent

判断传入的key是否存在来对ConcurrentMap集合进行数据初始化操作,如果存在,则不作任何处理。如果不存在,则调用mappingFunction计算出value值,然后把key=value存入到ConcurrentHashMap中。由于mappingFunction是一个函数式接口,所以它的返回值也会影响到存储结果。

如果mic这个用户不存在,则通过下面这段代码会初始化mic这个用户的值为10

USER_ACCESS_COUNT.computeIfAbsent("mic",k->10L);

computeIfPresent

computeIfAbsent方法的作用相反,对已经存在的key对应的value值进行修改,如果key不存在,则返回null。如果key存在,则调用remappingFunction进行运算,根据返回value的情况作出不同的处理。

如果我们想针对mic这个已经存在的用户的value进行修改,可以这样使用。

USER_ACCESS_COUNT.computeIfPresent("mic",(k,v)->v+1);

compute

compute相当于computeIfAbsentcomputeIfPresent的结合体,它不管key是否存在,都会调用remappingFunction进行计算。如果key存在,则调用remappingFunction对value进行修改;如果key不存在,同样调用remappingFunction方法进行初始化。

通过compute方法,我们可以把前面演示的那段很长的代码,通过一行代码就可以解决。

USER_ACCESS_COUNT.compute("mic",(k,v)->(v==null)?1L:v+1);

这段代码的含义是,如果mic这个key存在,则通过后面的lambda表达式对value进行v+1的修改,否则,初始化为1L

merge

这个方法翻译过来的意思是合并,对ConcurrentHashMap相同key的value值可以选择进行合并。它包含三个参数keyvalueremappingFunction函数式接口。它的作用是:

针对merge,举一个比较简单的demo,针对一个集合中相同元素的key,进行累计合并,代码如下。

public static void main(String[] args) {
    ConcurrentMap<Integer,Integer> cm=new ConcurrentHashMap<>();
    Stream.of(1,2,8,2,5,6,5,8,3,8).forEach(v->{
        cm.merge(v,2,Integer::sum);
    });
    System.out.println(cm);
}

ConcurrentHashMap的数据结构

在JDK1.8中,ConcurrentHashMap采用数组+链表+红黑树的方式来实现数据存储,数据结构如下图所示。

image-20210419122647623.png

相比较于JDK1.7,它做了如下改进:

在前文提到过,ConcurrentHashMap为了在性能和安全性方面做好平衡,使用了一些比较巧妙的设计,主要体现在以下几个方面。

那么接下来,我们针对ConcurrentHashMap的核心源码做一个全面分析,帮助大家更好的了解ConcurrentHashMap的底层实现。

关于数据存储相关定义

ConcurrentHashMap采用Node数组来存储数据,该数组默认长度为16,代码如下。

private static final int DEFAULT_CAPACITY = 16;
transient volatile Node<K,V>[] table;

Node表示数组中的一个具体的数据节点,其定义如下。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
}

Node实现了Map.Entry接口的对象,并且声明了几个成员属性:

前面说过,当链表长度大于等于8且Node数组长度大于64时,链表会转化为红黑树,红黑树的存储是采用TreeNode来实现,定义如下。

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
    //省略....
}

Node数组初始化过程分析

Node数组的初始化过程是被动的,也就是当我们调用put方法或者Java8中ConcurrentMap提供的default方法时,如果发现Node[]没有被初始化,则会调用initTable方法完成初始化过程。

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //省略....
        }
    //省略....
}

put方法实际调用的是putVal来做数据存储,putVal的上述代码中,逻辑说明如下:

initTable方法的代码如下。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

initTable方法和一般初始化方法不同,因为它需要考虑到多线程并发的安全问题,实现逻辑如下:

完成初始化之后代码的分析之后,继续回到putVal方法如下部分。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //省略....
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
            break;                   // no lock when adding to empty bin
    }
    //省略....
}

通过(n - 1) & hash)来计算当前key在table数组中对应的下标位置,如果该位置还没有任何值,则把当前的key/value封装成Node,使用casTabAt方法修改到指定数组下标位置。这里用casTabAt是一种线程安全的更新机制,如果更新成功,则返回true,否则返回false继续下一次循环重试。

至此,对于数组初始化的过程就分析完成了,为了更好的理解,笔者基于图形的方式整理了整个初始化过程,如下图所示。

[图片上传中...(image-20210419152158311.png-ead3a0-1632652316180-0)]

注: 扩容因子为什么设置为0.75呢?其实是一种时间和空间成本的折中考虑,在ConcurrentHashMap中有一段注释如下。

/** However, statistically, under
  * random hash codes, this is not a common problem.  Ideally, the
  * frequency of nodes in bins follows a Poisson distribution
  * (http://en.wikipedia.org/wiki/Poisson_distribution) with a
  * parameter of about 0.5 on average, given the resizing threshold
  * of 0.75, although with a large variance because of resizing
  * granularity. Ignoring variance, the expected occurrences of
  * list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The
  * first values are:
  *
  * 0:    0.60653066
  * 1:    0.30326533
  * 2:    0.07581633
  * 3:    0.01263606
  * 4:    0.00157952
  * 5:    0.00015795
  * 6:    0.00001316
  * 7:    0.00000094
  * 8:    0.00000006
  * more: less than 1 in ten million
**/

理想情况下,bin中的节点频率遵循泊松分布(http://en.wikipedia.org/wiki/Poisson_distribution),使用0.75作为负载因子,哈希膨胀的概率遵循约为0.5的泊松分布,也就是说可以降低节点在某一个特定桶中出现的概率。

另外,在注释中可以看到,当链表长度达到8的时候,也就是说哈希冲突出现8次的概率为0.00000006,几乎是不可能的事情,这也从另外一个层面去尽量避免链表转红黑树的出现。

单节点到链表的转化过程分析

使用put方法向ConcurrentHashMap存入数据时,是基于Key使用哈希函数计算后得到一个指定的数组下标进行数据存储,这种存储结构我们也称为哈希表。哈希表本身是一个有限大小的数据结构,所以对于任何hash函数,都可能会出现不同元素的key得到一个相同的哈希值从而映射到同一个位置的情况,这种情况我们成为hash冲突。而解决hash冲突有比较多成熟的方法,常见的方法是:

而ConcurrentHashMap中,解决hash冲突的方法就是基于链式寻址法,putVal方法中,解决hash冲突的代码如下。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //省略部分代码....
    else {
        V oldVal = null;
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                if (fh >= 0) {
                    binCount = 1;
                    for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        Node<K,V> pred = e;
                        if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key,
                                                      value, null);
                            break;
                        }
                    }
                }
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                          value)) != null) {
                        oldVal = p.val;
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                }
            }
        }
       //省略部分代码....
}

这段代码比较长,但是整体逻辑并不算复杂,简单分析一下核心代码。

综上所述,便是ConcurrentHashMap中基于链式寻址解决hash冲突的方法,通过图形方式表达如下图所示,调用put方法存入一对键值对,如果当前key计算得到的数组下标位置已经存在一个Node,并且该Node是链表类型,则添加到该链表的尾部。

image-20210419152158311.png

扩容还是转化为红黑树?

当链表长度大于或者等于8的时候,ConcurrentHashMap认为链表已经有点长了,需要考虑去优化,而优化方式有两种。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //省略....
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }
    //省略....
}

在上述代码中,binCount表示的是链表的个数,如果binCount>=TREEIFY_THRESHOLD(TREEIFY_THRESHOLD默认值为8),则调用treeifyBin方法进行后续处理。

treeifyBin

treeifyBin方法的主要作用是根据相关阈值来决定扩容还是把链表转化为红黑树。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

上面这段代码的逻辑说明如下:

tryPresize

tryPresize是用来实现扩容的方法,代码如下。

private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

我们把tryPresize这个方法的代码分四个部分来看。

第一个部分

对扩容的大小size进行判断。

private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    //省略部分代码...
}

其中,(size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY是用来判断当前要扩容的目标大小size的值,如果大小为MAXIMUM_CAPACITY的一半,则直接设置扩容大小为MAXIMUM_CAPACITY,否则通过tableSizeFor来计算当前size的最小的幂次方,也就是说如果当前传入的size不等于2的n次幂,那么通过tableSizeFor就可以整形成离size最近的一个幂次方的值。

第二个部分

判断table是否初始化,这部分代码和前面分析的initTable方法一样。至于这里为什么会有这样一个判断,原因是在ConcurrentHashMap中的putAll方法中,有调用tryPresize进行初始化功能。

private final void tryPresize(int size) {
    //省略....
    if (tab == null || (n = tab.length) == 0) {
        n = (sc > c) ? sc : c;
        if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if (table == tab) {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
        }
    }
    //省略....
}

第三个部分

通过(c <= sc || n >= MAXIMUM_CAPACITY)进行扩容判断,判断的逻辑有两个。

private final void tryPresize(int size) {
    //省略部分代码...
    else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
    //省略部分代码...
}

第四个部分

正式开始执行扩容操作,这部分代码也有两个比较核心的逻辑。

private final void tryPresize(int size) {
    //省略部分代码....
    else if (tab == table) {
        int rs = resizeStamp(n);
        if (sc < 0) {
            Node<K,V>[] nt;
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                transfer(tab, nt);
        }
        else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                     (rs << RESIZE_STAMP_SHIFT) + 2))
            transfer(tab, null);
    }
    //省略部分代码....
}

上述代码中有一些比较有意思的设计,笔者简单说明一下。

至此,ConcurrentHashMap扩容前置的一些基本操作就分析完成了,接下来跟着笔者一起分析并发扩容的核心代码。

上一篇 下一篇

猜你喜欢

热点阅读