Java 并发容器 ConcurrentHashMap 实现原理
前言
之前我们了解了 HashMap(HashMap 基于 JDK 1.7 源码解析),知道其是非线程安全的,当我们只有一个线程在使用 HashMap 的时候,自然不会有问题,但是并不适用于多线程情况,可以通过 Collections.synchronizedMap() 方法或者替换为 Hashtable,但是这两种方式基本都是对整个表结构做锁定操作,性能不是很理想,这时我们可以使用 ConcurrentHashMap,需要注意的是,ConcurrentHashMap 也分 1.7,1.8版本,两者实现上略有不同。
1.7 版本
数据结构
ConcurrentHashMap 数据结构为一个 Segement 数组,Segment 的数据结构为 HashEntry 数组,而 HashEntry 存储的是键值对,可以构成链表。如下图所示:
Segment 和 HashEntry
Segment 类继承 ReentrantLock 类,从而使 Segment 对象能充当锁的角色,不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上每当一个线程占用锁访问一个 Segment 时,不会影响其他的 Segment,这就是 ConcurrentHashMap 的锁分段技术。Segment 源码如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 在本 segment 范围内,包含的 HashEntry 元素的个数
* 该变量被声明为 volatile 型,保证可见性,即每次读到都是
* 最新数据
*/
transient volatile int count;
/**
* table 被更新的次数
*/
transient int modCount;
/**
* 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
*/
transient int threshold;
/**
* table 是由 HashEntry 对象组成的数组
* 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
* table 数组的数组成员代表散列映射表的一个桶
* 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
* 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16
*/
transient volatile HashEntry<K,V>[] table;
/**
* 负载因子
*/
final float loadFactor;
}
其中的 HashEntry:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
HashEntry 几乎是不可变的,代表的是每个 hash 链中的一个节点。可以看到除了 value 其余都是 fianl 的,这意味着不能从 hash 链的中间或尾部添加加或者删除节点,因为这需要修改 next 引用值,所有的节点的修改值只能从头部开始。
put 方法
代码如下:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
通过 key 找到 Segment,再在 Segment 中执行 put 操作。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue; // 旧值
try {
HashEntry<K,V>[] tab = table; // table 数组
int index = (tab.length - 1) & hash; //找到在 table 中的索引
HashEntry<K,V> first = entryAt(tab, index); // 根据索引获取到 HashEntry
// 遍历 HashEntry 链表
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// key 存在,更新值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); //扩容
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock(); // 释放锁
}
return oldValue;
}
由于 HashEntry 不能保证原子性(volatile 只能保证可见性),因此要对 put 操作要做加锁处理。
get 方法
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get 逻辑比较简单,如下:
- 根据 key 值计算 hash 定位到具体的 Segment。
- 再通过一次 hash 定位到具体的位置上。
由于 HashEntry 的 value 是用 volatile 修饰的,保证了可见性,所以每次获取到都是最新值,get() 方法是非常高效的,因为整个过程中都不用加锁。
Remove 方法
V remove(Object key, int hash, Object value) {
lock(); //加锁
try {
int c = count - 1;
HashEntry<K, V>[] tab = table;
//根据 hash 找到 table 的索引
int index = hash & (tab.length - 1);
//找到索引对应的那个桶
HashEntry<K, V> first = tab[index];
HashEntry<K, V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) { //找到要删除的节点
oldValue = v;
++modCount;
HashEntry<K, V> newFirst = e.next;// 待删节点的后继结点
for (HashEntry<K, V> p = first; p != e; p = p.next)
//头插法
newFirst = new HashEntry<K, V>(p.key, p.hash,
newFirst, p.value);
//把桶链接到新的头结点
//新的头结点是原链表中,删除节点之前的那个节点
tab[index] = newFirst;
count = c;
}
}
return oldValue;
} finally {
unlock(); //释放锁
}
}
整个操作是在持有段锁的情况下执行的,需要注意的是删除结点后,该删除结点前面的结点会倒序,如下图:
1.8 版本
数据结构
-
8 的实现抛弃了 Segment 分段锁机制,利用 CAS + synchronized 来保证并发更新的安全,底层采用数组 + 链表 + 红黑树的数据结构,用 Node 替换 了 HashEntry(名称改变了而已),如下图所示:
put 方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算 hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化 table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // 利用 CAS 尝试写入数据
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 扩容处理
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) { // 利用 sychronized 获取锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
get 方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 计算 hash 值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
//在桶上直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// 通过红黑树获取值
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 链表单向查询
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结
1.7 版本主要使用了分段锁的机制,将数据分成许多个 Segment,每个 Segment 拥有一把锁,当一个线程占用锁访问其中一个 Segment 的时候,其他 Segment 的数据也能被其他线程访问。有些方法需要跨 Segment,比如 size() 和 containsValue(),它们可能需要锁定整个表而而不仅仅是某个Segment,这需要按顺序锁定所有 Segment,操作完毕后,又按顺序释放所有段的锁,否则可能会出现死锁。
1.8 版本 抛弃了 1.7 版本的分段锁机制,引入了 CAS + Synchronized 的机制,数据结构也变成了 数组 + 链表 + 红黑树 的形式,CAS 是非常轻量级的同步操作,而且也跟新版本虚拟机对 Synchronized 的优化有关,有兴趣的同学可以去具体了解。