带你读 ConcurrentHashMap 1.7 1.8
概述:
ConcurrentHashmap是java并发中重要的类,用来替代HashTable,实现可并发的hashtalbe。我将以1.7和1.8中的实现,分别为大家讲解ConcurrentHashMap的关键数据结构,和关键函数,看看他到底是如何实现并发的。
JDK1.7中的实现
JDK1.7 中的ConcurrentHashMap采用了分段锁的设计,先来看一下它的数据结构。ConcurrentHashMap中含有一个<b>segment</b>数组。每个segment中又含有一个HashEntry数组。即每个segment中含有一个完整的hashtable。
来看看上述segment结构的定义:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
... ...
}
去掉方法,可以看到几个熟悉的域。HashEntry(哈希数组),threshold(扩容阈值),loadFactor(负载因子)表示segment是一个完整的hashmap。
接下来我们看看ConcurrentHashMap的构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
三个参数分别代表了:
初始容量:初始容量表示所有的segment数组中,一共含有多少个hashentry。若initialCapacity不为2的幂,会取一个大于initialCapacity的2的幂。
负载因子:默认0.75。
并发级别:可以同时允许多少个线程并发。concurrencyLevel为多少,就有多少个segment,当然也会取一个大于等于这个值的2的幂。
接下来我们看一下ConcurrentHashMap中的几个关键函数,put, get, rehash(扩容), size方法,看看他是如何实现并发的。
put 方法的实现
现在来跟踪一个put方法的调用过程
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
这里注意几点:
1.key和value都不能为空,value有判断,key值在计算hashcode时候,也会有判断。
2.首先并发度算出一个key值的segment号,例如并发度为16,那么hash值得高4位会被用来算segment号。
3.找到对应的segment,调用segment的put方法。
接着看看segment的put方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
流程大概是这样:
1.先对该segment进行加锁
2.找到hashentry数组中,对应的位置(索引)。用了unsafe的方法,保证得到数组元素的原子性。
3.如果找到了对应的key,修改这个值。因为value是volatile,没有使用unsafe的方法。
可以看到,put方法是需要加锁的,这个时候加的是分段锁。
get方法
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
可以看到,get方法采用了unsafe的方法,来保证线程安全,并且这里没有加锁。看了几个版本的get方法,均没有加锁,都使用各种方式来实现线程安全。
rehash方法:
当segment中的count(指的是segment数组中使用的个数)大于threshold的时候,需要使用rehash方法来扩容。扩容操作也在put的锁中,保证线程安全。
size方法:
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
size()方法会尝试三次去获取总个数,如果其中任意连续两次的modcount相等,则,不同加锁,返回个数。若都不相等,则加锁去算出个数。
JDK1.8 中的实现
JDK1.8中,出现了较大的改动。没有使用段锁,改成了Node数组 + 链表 + 红黑树的方式。
其中有个重要的变量:sizeCtl
- 负数表示正在进行初始化或者扩容,-1表示正在初始化,-N表示有N - 1个线程正在扩容
- 正数0,表示还没有被初始化。其他正数表示下一次扩容的大小。
我们先来看一下核心数据结构:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
还有两个数据结构TreeNode、TreeBin,用来当链表的大小超过阈值的时候,将链表变作红黑树。
ForwardingNode,扩容用到的链表,扩容时,若遍历到了这个node,表示有其他在对这个node进行扩容。跳过即可。
CAS 操作
这一版本大量使用了CAS操作。所谓的CAS就是,比较内存对应的区域的值,和期望值是不是相等,如果相等,就设置一个新的值进去。
一般是这样使用,先获取对象中的某个域的值,并以这个值为期望值去调用CAS算法。
ConcurrentHashMap中有三个核心的CAS操作。
tabAt:获得数组中位置i上的节点
casTabAt:设置数组位置i上的节点
setTabAt:利用volatile设置位置i上的节点。
接下来介绍几个重点方法:
initTable
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
在put方法调用时,回去判断table是不是为null,若为null就调用initTable去初始化。调用initTable会判断sizeCtl的值,若正在初始化,会调用yield()去等待。若为0,这是先调用CAS算法去设置为-1,再初始化。
初始化是单线程操作。
扩容方法transfer
当ConcurrentHashMap中数组使用的数量大于了阈值,这时需要进行扩容。这里的扩容分为两个部分:
1.单线程处理,新建一个newTable。容量变为原来的两倍。
2.多线程处理,把原来table中的数据拷贝到newTable中。
多线程处理的逻辑:
- 利用tabAt获取i的位置。
- 如果这个位置为空,不用加锁,将它设置为forward节点。
- 若为普通的Node节点,则加锁,就分析,并把他们放到newTable的i和i+n的位置上,处理完后将这个节点设置为forward。
put方法
逻辑很复杂,就不贴代码了,我说一下流程。
1.若table==null,则调用上面说过的初始化。
2.若table中i位置为空,则不用加锁,新建一个node,以cas的方式放入。
3.若该Node,正在扩容,则需要帮助扩容,调用helpTransfer函数。
4.若是正常的添加,此时需要加锁。用synchronized,若存在碰撞,需要判断的,当链表的长度大于8,将链表改为红黑树。
get方法
get方法不用加锁。利用CAS操作,可以达到无锁的访问。当map正在扩容的时候,forwardnode也提供了find()方法,保证正在扩容的时候,也能找到对应的value。
size()操作也只是一个大概的值。
concurrentHashMap是弱一致的,不能保证get,size得到正确的值。