JDK源码系列之ConcurrentHashMap(未完)
关于ConcurrentHashMap的作用或设计初衷,可以引用源码的注释OverView来回答:
The primary design goal of this hash table is to maintain concurrent readability (typically method get(), but also iterators and related methods) while minimizing update contention. Secondary goals are to keep space consumption about the same or better than java.util.HashMap, and to support high initial insertion rates on an empty table by many threads
并发读的同时,尽可能的减少更新冲突,同时提供比HashMap更优的内存利用,同时支持空map的高并发写入
如何实现高并发情况下的安全写和准确读?在解答这个问题之前,先了解一下ConcurrentHashMap的 一些重要属性:
Node<K,V> node对象是一对键值对的封装。包含了hash,key, value, next等属性。hash冲突时,next指向在冲突解决的链表上处于下一个位置的node对象
DEFAULT_CAPACITY = 16 初始化未指定node数组长度时的默认大小
MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 最大数组长度 由于在数组的对象头需要8字节的长度存储_length属性,所以数组的最大长度一般都需要减8
LOAD_FACTOR = 0.75f 负载系数 即数组元素数达到数组长度的0.75时,进行扩容
TREEIFY_THRESHOLD 树化阈值 hashmap通过链表法解决冲突,当链表长度达到8,转为红黑树
分析ConcurrentHashMap的运行机制,还是先从其最常用的put和get开始吧
PutVal
put直接调用了putVal
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;// no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key
,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
我直接按行数来写,直接与代码写在一起影响阅读:
2:可以看到ConcurrentHashMap对key和value都做了非空校验,而我们知道HashMap是允许key和value为空的。这么做是因为如果允许为null,那么在多线程环境下get的结果为null时无法判断是其他线程put了null的value还是根本就没有put过。
3:spread方法计算hash值,算法与HashMap的基本一致,只是为了保证不超过int的最大值,在计算结束后与了一个0x7fffffff,即16进制表示的int最大值(为什么hashmap计算的时候不需要???)
5:遍历table,即Node数组
7:table数组为空,则调用InitTable方法初始化。table初始化即初始化一个长度为capacity的数组,capacity为ConcurrentHashMap初始化时调用者指定,或默认值16
9:初始化node数组完成后,计算key在数组中的位置。计算方式为n-1 & hash,n-1即为tab数组的最后一个位置的下标,hash则为key在spread中计算出的hash值。计算下标,从数组中取值并非像平常使用数组一样,而是通过一个navite方法Unsafe.getObjectVolatile()来实现的。
10,11,12:通过数组下标获取对应位置的头对象f,如果f为null,由于ConcurrentHashMap判value不允许为null,则认定当前位置无元素。将Key与Value构造成Node对象,再次调用native方法cas设置到数组的相应位置。
14,15:如果hash值计算为-1,标志tables数组正在进行扩容操作,那么调用helpTransfer方法帮忙扩容。首先计算得到一个扩容的标志位rs,同时获取当前待插入的node节点的nextTab,当其与类中的属性nextTable是否一致,table与参数tab一致,sizeCtl值小于0(小于0表示正在扩容),则开启循环,新增一个线程帮助扩容,直到sizeCtl右移16位不等于rs(表示rs的值发生了变化),或者sizeCtl的值等于rs+1(表示扩容结束),或sizeCtl的值等于rs+65535(表示帮助线程达到了最大值),或者transferIndex<0(表示扩容结束),则结束循环,不再创建新线程帮助扩容。sizeCtl+1,表示增加了一个线程帮助扩容。帮助扩容的具体过程,调用了transfer方法【1】,transfer方法的细节在此不赘述。helpTransfer方法执行完成之后,返回扩容完成之后的node数组。
17,18:若hash冲突,且待插入的元素hash值不是-1,那么就需要正儿八经的解决hash冲突了,而且此位置必须是线程安全的,防止其他线程在同一时间对同一位置做出修改。ConcurrentHashMap的做法是锁定该位置的头对象。
19:取出当前位置的对象,再次与之前取出的对象做相等比较,二次校验防止过程中被修改
22-37:从当前位置的头对象开始,与待插入的对象进行比较,如果key的hash值相等,key的内存位置也一致,且equals判断也相等,那么判定是对头对象进行更新,则更新value并返回。如果头对象比较失败,那么就从hash冲突的链表一路往下找了,也就是顺着node对象的next一路找下去。如果一致找到链表的尾部,也没有找到(33行),那么将当前待插入的node对象,插入到链表的最后。
43-47:如果当前位置的头对象是TreeBin对象,表示在当前插入之前,该位置由于hash冲突过多,解决hash冲突已经由链表升级为树了,那么需要调用头对象的putTreeVal方法【2】找到待插入对象在树中的位置。
53-54:在链表法解决Hash冲突的同时,binCount也记录了链表的长度,如果超过了TREEIFY_THRESHOLD 树化阈值,那么当前位置的链表需要树化,调用的是treeifyBin方法【3】
61:addCount方法【4】检查是否出现并发情况,或正在扩容。如果正在扩容,那么新增线程帮忙扩容
Get
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
get方法的逻辑相对put方法较为简单。
2:第一步是通过同样的方法计算当前传入key的hash值
3:通过hash值计算key在数组中的下标,获取该位置的头对象。
5-7:如果头对象e的hash值与计算所得的请求key的hash值相等,头对象与请求Key的内存位置相等或equals,则认定当前头对象e即为所查询的对象,那么直接返回e的value
9:如果头对象e的hash值小于0,说明该位置由于Hash冲突过多,已经树化,那么从头对象e开始,调用TreeBin的find方法,从树中寻找与请求Key的hash值相等且对象相等的对象,找到则返回其value
11-14:如果非树化,那么用链表解决问题,相对来说更简单一些,只需要不停的寻找next,直到链接到头,或者找到为止