【广撒网】ConcurrentHashMap提供的compute
2020-12-10 本文已影响0人
小胖学编程
JDK7的ConcurrentHashMap使用Segment分段锁机制,但是在JDK8采用CAS自旋》+synchronized锁机制。并且加锁的粒度更小,直接锁Node数组的第一个元素,即实现了对单个key加锁。
private void analysisPo2Cache(Class < ?>clazz) {
//维护缓存
cache.computeIfAbsent(clazz.getName(), key - >{
HashSet < Field > values = new HashSet < >();
Field[] declaredFields = clazz.getDeclaredFields();
Field.setAccessible(declaredFields, true);
for (Field field: declaredFields) {
Class < ?>fieldClass = field.getType();
//判断是否含有List的属性
if (List.class.isAssignableFrom(fieldClass)) {
values.add(field);
}
}
return values;
});
}
源码分析:
JDK8的Map的数据结构是Node数组和链表+红黑树。
线程安全的核心思想:CAS+ synchronized锁对Node数组上第一个节点加锁。
- 多个线程均会创建
ReservationNode
节点,均执行执行synchronized(r)
(因为锁的是对象,所以所以线程都会执行casTabAt
方法)。CAS失败的线程释放锁,但是CAS成功的线程将执行lamda表达式(此时,持有synchronized(r)的锁)。 - 失败的线程将再次自旋,在
(f = tabAt(tab, i = (n - 1) & h)) == null)
方法中获取key所在位置的Node对象(lamda表达式没执行完毕,那么Node对象就是r,但是r已经被synchronized),此时synchronized(f)
会被阻塞,等待synchronized(r)
释放锁。当等到synchronized(r)
释放锁后,key所在位置的Node对象会发生变化,会再次通过tabAt(tab, i) == f
判断。开启下次自旋。
public V computeIfAbsent(K key, Function < ?super K, ?extends V > mappingFunction) {
//key的lamda表达式都不能为空
if (key == null || mappingFunction == null) throw new NullPointerException();
//右移16位计算有效的hash值
int h = spread(key.hashCode());
V val = null;
int binCount = 0;
//开启自旋
for (Node <K, V> [] tab = table;;) {
Node <K,V> f;
int n,i,fh;
//初次put时,因为concurrenthashmap在构造时没有初始化table,所以先初始化table
if (tab == null || (n = tab.length) == 0) tab = initTable();
//n为数组长度,低位有效二次hash后计算出数组的下标,并获取该下标的node对象(线程不安全),且node所在的位置为null
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
//创建空节点(预定节点)
Node <K,V> r = new ReservationNode <K,V> ();
//【神操作】这里加synchronized是为了CAS失败的线程在此自旋的时候在synchronized(f)处阻塞,等待CAS维护value释放锁。
synchronized(r) {
//CAS,使用预定节点去替换null(只有一个线程成功)
if (casTabAt(tab, i, null, r)) {
//成功进入的线程维护value。
binCount = 1;
Node <K,V> node = null;
try {
//执行lamda表达式
if ((val = mappingFunction.apply(key)) != null)
node = new Node <K,V> (h, key, val, null);
} finally {
//设置内存的值
setTabAt(tab, i, node);
}
}
}
//CAS进入的线程结束自旋,其他线程继续向下执行。
if (binCount != 0) break;
} else if ((fh = f.hash) == MOVED)
//node的hash值为-1,表示正在扩容,本次操作会帮助扩容【多线程完成扩容】
tab = helpTransfer(tab, f);
else {
//CAS加锁失败后的线程,自旋最终会到达此处
boolean added = false;
//【神操作2】即处理了lamda表达式执行慢,sync(r)方法持有锁,也解决了并发插入线程安全。
synchronized(f) {
//【神操作3】sync(r)释放锁后,线程才会获取到到此处的锁,此时f一定不是tabAt(tab, i) 。需要重新自旋。
if (tabAt(tab, i) == f) {
//fh为该node的hash值,如果hash值为正数,说明该下标是按链表存储的
if (fh >= 0) {
binCount = 1;
//遍历链表(哈希桶)
for (Node < K, V > e = f;; ++binCount) {
K ek;V ev;
//若key相同,直接返回,结束自旋。
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
val = e.val;
break;
}
Node < K,V > pred = e;
//如果链表中不存在这个key,则添加到链表末尾
if ((e = e.next) == null) {
if ((val = mappingFunction.apply(key)) != null) {
added = true;
pred.next = new Node < K,V > (h, key, val, null);
}
break;
}
}
} else if (f instanceof TreeBin) {
//红黑树的情况
binCount = 2;
TreeBin < K,V > t = (TreeBin < K, V > ) f;
TreeNode < K,V > r,
p;
//节点加到红黑树
if ((r = t.root) != null && (p = r.findTreeNode(h, key, null)) != null) val = p.val;
else if ((val = mappingFunction.apply(key)) != null) {
added = true;
t.putTreeVal(h, key, val);
}
}
}
}
//加锁成功的线程binCount会变为1
if (binCount != 0) {
//bitCount>8,尝试将链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (!added)
return val;
//结束自旋
break;
}
}
}
//总node+1,该方法中满足条件会进行扩容。
if (val != null) addCount(1L, binCount);
return val;
}
总结:
【广撒网】若key对应的第一个Node节点为null,每个线程均会对新建的空节点对象加锁,但只有CAS成功的线程一直持有对象锁,其他线程释放锁,开启下次自旋。
【实时更新】若key对应的第一个Node节点不为null,线程获取最新的Node节点对象(但Node节点可能已经被加锁),尝试对Node加锁去put数据。