中北软院创新实验室Java面试题总结

ConCurrentHashMap了解一下

2018-05-18  本文已影响75人  HikariCP

前言

前言

本人使用的是jdk1.8

ConcurrentHashMap 1.7

首先我们来回顾一下在jdk1.7当中ConcurrentHashMap是如何实现的。

还在jdk1.7的时候ConcurrentHashMap的底层数据结构其实是由Segment数组和多个HashEntry组成。如图:

image

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分段技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的散列表+链表的数据存储结构是一样的。这里的每个table就像我们之前所说的HashTable一样。

不同于HashTable为了并发情况下的安全性锁整个table那么暴力不同,ConcurrentHashMap的处理方式则是对其进行了优化,是把一个大table分割成小table来分别锁住。也就是我们常说的锁分段。

同时从源码中我们可以看出,Segment继承了ReentrantLock(独占锁)类。使大table中的每个小table都有了一个锁。

static final class Segment<K,V> extends ReentrantLock implements Serializable 

ConcurrentHashMap 1.8

在jdk1.8中,ConCurrentHashMap的数据结构底层是:散列表+链表+红黑树,其变化与HashMap在1.8的变化是一样的。

ConcurrentHashMap头注释信息

/**
 * A hash table supporting full concurrency of retrievals and
 * high expected concurrency for updates. This class obeys the
 * same functional specification as {@link java.util.Hashtable}, and
 * includes versions of methods corresponding to each method of
 * {@code Hashtable}. However, even though all operations are
 * thread-safe, retrieval operations do <em>not</em> entail locking,
 * and there is <em>not</em> any support for locking the entire table
 * in a way that prevents all access.  This class is fully
 * interoperable with {@code Hashtable} in programs that rely on its
 * thread safety but not on its synchronization details.
 *
 * <p>Retrieval operations (including {@code get}) generally do not
 * block, so may overlap with update operations (including {@code put}
 * and {@code remove}). Retrievals reflect the results of the most
 * recently <em>completed</em> update operations holding upon their
 * onset. (More formally, an update operation for a given key bears a
 * <em>happens-before</em> relation with any (non-null) retrieval for
 * that key reporting the updated value.)  For aggregate operations
 * such as {@code putAll} and {@code clear}, concurrent retrievals may
 * reflect insertion or removal of only some entries.  Similarly,
 * Iterators, Spliterators and Enumerations return elements reflecting the
 * state of the hash table at some point at or since the creation of the
 * iterator/enumeration.  They do <em>not</em> throw {@link
 * java.util.ConcurrentModificationException ConcurrentModificationException}.
 * However, iterators are designed to be used by only one thread at a time.
 * Bear in mind that the results of aggregate status methods including
 * {@code size}, {@code isEmpty}, and {@code containsValue} are typically
 * useful only when a map is not undergoing concurrent updates in other threads.
 * Otherwise the results of these methods reflect transient states
 * that may be adequate for monitoring or estimation purposes, but not
 * for program control.
 *
 * ······
 * ······
 */

从头注释中我们可以知道:

ConcurrrentHashMap关键属性

由于ConcurrentHashMap的属性众多,我们挑几个典型关键的来进行分析。

CAS操作依赖于现代处理器指令集,通过底层CMPXCHG指令实现。CAS(V,O,N)核心思想为:若当前变量实际值V与期望的旧值O相同,则表明该变量没被其他线程进行修改,因此可以安全的将新值N赋值给变量;若当前变量实际值V与期望的旧值O不相同,则表明该变量已经被其他线程做了处理,此时将新值N赋给变量操作就是不安全的,再进行重试。

ConcurrentHashMap为何存在

我们知道由于HashMap在多线程情况下是线程不安全的。而和其对应的HashTable虽然是线程安全的,但是却是十分极端的在所有涉及多线程的操作上都加上了synchronized关键字来锁住整个table。这就意味着在多线程情况下,所有的线程都在竞争同一把锁。虽然是线程安全的,但是却无疑是效率低下的。

其实HashTable有很多的优化空间,锁住整个table这么粗暴的方法可以变相的柔和点,比如在多线程的环境下,对不同的数据集进行操作时其实根本就不需要去竞争一个锁,因为他们不同hash值,不会因为rehash造成线程不安全,所以互不影响,这就是锁分离技术,将锁的粒度降低,利用多个锁来控制多个小的table,这就是ConcurrentHashMap的核心思想

总而言之:

CAS算法

上面我们提到ConcurrentHashMap是通过CAS算法来实现同步的。接下来我们就简单了解一下什么是CAS算法

什么是CAS

CAS: 全称Compare and swap,字面意思:”比较并交换“,一个 CAS 涉及到以下操作:

我们假设内存中的原数据V,旧的预期值A,需要修改的新值B。

比较 A 与 V 是否相等。(比较)
如果比较相等,将 B 写入 V。否则什么都不做(交换)
返回操作是否成功。

我们知道采用锁对共享数据进行处理的话,当多个线程竞争的时候,都需要进行加锁,没有拿到锁的线程会被阻塞,以及唤醒,这些都需要用户态到核心态的转换,这个代价对阻塞线程来说代价还是蛮高的,那cas是采用无锁乐观方式进行竞争,性能上要比锁更高些才是,为何不对锁竞争方式进行替换?

在高度竞争的情况下,锁的性能将超过cas的性能,但在中低程度的竞争情况下,cas性能将超过锁的性能。多数情况下,资源竞争一般都不会那么激烈。

参考:java多线程——CAS

ConcurrentHashMap构造函数

// 构造一个空map映射,初始容量16
public ConcurrentHashMap() {
}

// 初始化时明确给定一个初始容量,减少resize次数
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    // tableSizeFor 函数返回一个最接近入参 initialCapacity 容量的2进制整数
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

// 创建一个与给定的 Map 映射具有相同元素的 ConcurrentHashMap
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    // 我们前面所述 sizeCtl 两个含义,构造和扩容。
    // 此处必然是构造容量为 DEFAULT_CAPACITY = 16 的 ConcurrentHashMap
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

public void putAll(Map<? extends K, ? extends V> m) {
    // 初始化数组容量,防止直接迭代insert导致频繁扩容
    tryPresize(m.size());
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}

// 构造一个空的 Map 映射,并给定其初始容量与加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

// 构造一个空的 Map 映射,并给定其初始容量,加载因子与预估的并发更新的线程数
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        // 该情况下,一个更新线程负责一个HashEntry
        initialCapacity = concurrencyLevel;   // as estimated threads
    // 确定 table 的初始容量 = 初始容量 initialCapacity / 元素密度 loadFactor
    // 比如你有30个元素,构造Map的时候传入30和0.75,那么table真实容量就应该是 30/0.75。保证你元素数量是table容器的0.75倍
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

需要注意的就是最后一个构造函数中引用的tableSizeFor函数:

/**
 * Returns a power of two table size for the given desired capacity.
 * See Hackers Delight, sec 3.2
 */
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

通过注释就很清楚了,该方法会将调用构造器方法时指定的大小转换成一个2的幂次方数,也就是说ConcurrentHashMap的大小一定是2的幂次方,比如,当指定大小为18时,为了满足2的幂次方特性,实际上concurrentHashMapd的大小为2的5次方(32)。

另外,需要注意的是,调用构造器方法的时候并未构造出table数组(可以理解为ConcurrentHashMap的数据容器),只是算出table数组的长度,当第一次向ConcurrentHashMap插入数据的时候才真正的完成初始化创建table数组的工作。

ConcurrentHashMap 常见Api解析

initTable()

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // 1. 保证只有一个线程正在进行初始化操作
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 2. 得出数组的大小
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 3. 这里才真正的初始化数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

代码的逻辑请见注释,有可能存在一个情况是多个线程同时走到这个方法中,为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。

另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里n - (n >>> 2)是不是刚好是n-(1/4)n=(3/4)n,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为DEFAULT_CAPACITY(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。

putVal(K key, V value, boolean onlyIfAbsent)

当且仅当table中不存在该key对应的Entry时才插入该Entry。否则不替换table中原有的Entry中的value

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 1. 计算key的hash值,与HashMap处理逻辑一样
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        /* 4. 如果要插入的位置是一个forwordingNode节点,表示正在扩容,那么当前线程帮助扩容
        这里我有个问题:为什么该位置会是forwordingNode节点 */
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 5. 进行到这一步,说明要插入的位置有值,需要对该桶加锁。
            synchronized (f) {
                // 确定f是tab中的头节点
                if (tabAt(tab, i) == f) {
                    // fh = 桶首元素的hast值。如果头结点的哈希值大于等于0,说明要插入的节点在(链表)中。否则有可能该桶的数据结构不是链表而是红黑树
                    if (fh >= 0) {
                        binCount = 1;
                        // 开始迭代找key的节点,f = 桶首元素
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果某一节点的key的哈希值及key与参数相等,替换该节点的value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 没有找到则继续向后迭代,当迭代到最后一个元素还没有找到时,将该Entry插入到链表尾。
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 6. 如果要插入的节点在红黑树中,则按照树的方式插入或替换节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 7. 如果binCount不为0,说明插入或者替换操作完成了
            if (binCount != 0) {
                // 判断节点数量是否大于等于8,如果是就需要把链表转化成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    // 链表转成红黑树 
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 8. 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 
    // 能执行到这一步,说明节点不是被替换的,是被插入的,否则在binCount判断 !=0 的时候就要被return了。
    addCount(1L, binCount);
    return null;
}

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

整体流程下来就是:

  1. 计算key哈希值
  2. 根据哈希值计算在table中的位置
  3. 根据哈希值执行插入或替换操作
    1. 如果这个位置没有值,直接将键值对放进去,不需要加锁。
    2. 如果要插入的位置是一个forwordingNode节点,表示正在扩容,那么当前线程帮助扩容
    3. 加锁。以下操作都需要加锁。
    4. 如果要插入的节点在链表中,遍历链表中的所有节点,如果某一节点的key哈希值和key与参数相等,替换节点的value,记录被替换的值;如果遍历到了最后一个节点,还没找到key对应的节点,根据参数新建节点,插入链表尾部
    5. 如果要插入的节点在树中,则按照树的方式插入或替换节点。如果是替换操作,记录被替换的值
  4. 判断节点数量是否大于8,如果大于就需要把链表转化成红黑树
  5. 如果操作3中执行的是替换操作,返回被替换的value。程序结束。
  6. 能执行到这一步,说明节点不是被替换的,是被插入的,所以要将map的元素数量加1。

get(Object key)

据我们之前的类头注释所译。get函数没有必要加锁的。但是可以看到的是ConcurrentHashMap对其Node类的next属性加上了volatile关键字进行修饰。来保证并发情况下其他线程若正在与get函数同步的修改该节点的next属性,保证了它的可见性。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 散列key
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 根据key的散列值h来找到桶头元素e
        if ((eh = e.hash) == h) {
            // key相等值相等,则桶头元素刚好就是所找元素
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 如果hash < 0 那么在红黑树中
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 在该桶的链表上,不是桶头元素上,迭代继续向下寻找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

整体流程:

首先先看当前的hash桶数组节点即table[i]是否为查找的节点,若是则直接返回;若不是,则继续再看当前是不是树节点?通过看节点的hash值是否为小于0,如果小于0则为树节点。如果是树节点在红黑树中查找节点;如果不是树节点,那就只剩下为链表的形式的一种可能性了,就向后遍历查找节点,若查找到则返回节点的value即可,若没有找到就返回null。

ConcurrrentHashMap关键类

Node内部类

Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域

static class Node<K,V> implements Map.Entry<K,V> {
     final int hash;
     final K key;
     volatile V val;
     volatile Node<K,V> next;
    ......
}

另外可以看出很多属性都是用volatile进行修饰的,也是为了保证并发情况下的该属性的可见性。同事对hash和key用final进行修饰也是提供了这两个常用变量的缓存,性能上有所提高。

TreeNode 树节点

继承于承载数据节点Node类。而红黑树的操作是针对TreeBin类的,从该类的注释也可以看出,也就是TreeBin会将TreeNode进行再一次封装

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
    ······

TreeBin 树箱

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。

 static final class TreeBin<K,V> extends Node<K,V> {
     TreeNode<K,V> root;
     volatile TreeNode<K,V> first;
     volatile Thread waiter;
     volatile int lockState;
     // values for lockState
     static final int WRITER = 1; // set while holding write lock
     static final int WAITER = 2; // set when waiting for write lock
     static final int READER = 4; // increment value for setting read lock
     ·····  
 }

ForwardingNode

在扩容时才会出现的特殊节点,其key,value,hash全部为null。并拥有nextTable指针引用新的table数组。

static final class ForwardingNode<K,V> extends Node<K,V> {
     final Node<K,V>[] nextTable;
     ForwardingNode(Node<K,V>[] tab) {
         super(MOVED, null, null, null);
         this.nextTable = tab;
     }
    ······
 }

CAS 关键操作

我们之前说到,在ConcurrentHashMap中会有大量的CAS操作来修改它的一些属性和操作。所以先来看一些常用的CAS操作是如何保证线程安全的

tabAt

获取table数组中索引为i的Node元素。

 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
     return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
 }

casTabAt

利用CAS操作设置table数组中索引为i的元素

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                     Node<K,V> c, Node<K,V> v) {
     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

compareAndSwapInt

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

这是一个Native函数。函数的作用是修改当前类的var2属性。如果var2属性和var4属性一样的就修改,否则什么都不做。

总结

上一篇下一篇

猜你喜欢

热点阅读