JAVA并发学习-并发容器ConcurrentHashMap
首先简单的说下并发情况下HashMap,并发情况下,HashMap采用的是fail-fast快速失败机制)当容器在迭代的时候被修改hasNext()或next()就会抛出ConcurrentModificationException异常,
fail-fast具体实现:HashMap里有个modCount字段记录修改的次数,迭代器里有个expectedModCount,当得到迭代器时就会将modCount赋值给expectedModCount,迭代过程中每次修改会改变modCount的值而不会改变expectedModCount(除非是从迭代器里remove)而在调用hasNext或next的时候回检查modCount与expectedModCount是否相等,不相等就抛出上面的异常。
线程安全的HashTable实现比较粗暴,简单的以自身作为对象锁,将相关方法都声明为synchronized,故每次只有一个线程能调用这些同步方法。
HashMap与HashTable区别:
1.HashMap能存入null,HashTable则不能,主要是计算hash值的时候,HashMap判断如果是null,hash值为0
2.HashMap不是线程安全的HashTable是线程安全的 原因上面已说明
3.Hashtable 继承自 Dictiionary 而 HashMap继承自AbstractMap
java 8 ConcurrentHashMap
存储原理:
ConcurrentHashMap存储数据采用table数组+单向链表+红黑树的结构(与HashMap一致),对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
当在一个空bin中insert第一个node时,其使用CAS操作来同步,而对于其他的update操作(insert,delete, and replace)则需要使用锁来同步。而在每一个bucket中,一般使用此bin中第一个node作为这个bin的锁,锁住整个bucket。(因为新放入bin的node总会添加到list的末尾,故除了delete掉第一个节点或resize数组之外,这个节点总是此bin的第一个node,具有稳定性)。锁住整个bucket的策略是合理的,因为在实际使用中,一个bucket中的node不会太多(0.75的装填因子),所以一般锁住整个bin不会造成特别恶劣的性能影响。(同样ConcurrentHashMap也会使用tree化的策略,将过深的bin进行tree化,即使用红黑树来降低bin的深度,将查找时间限制为O(logN))。
关键的put( )方法:采用分段锁,提高并发效率
源码:
final V putVal(K key, V value, boolean onlyIfAbsent){
if (key == null || value == null) //ConcurrentHashMap不允许null key或null value
throw new NullPointerException();
int hash = spread(key.hashCode()); //用位运算处理key.hashCode以使key分布更均匀
int binCount = 0; //记录bucket中的node的数量
for (Node<K,V>[] tab = table;;) { //赋tab = 底层bin数组
Node<K,V> f;
int n, i, fh;
if (tab == null || (n = tab.length) == 0) //延迟初始化,在第一次put的时候,才初始化
tab = initTable();
//f代表这个bucket中的第一个node
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //此bucket为空的
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; //当向empty bin 中添加node时,并不使用锁,而使用CAS操作来添加
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); //
else { //***put操作的核心之处***
V oldVal = null;
synchronized (f) { //以此bucket中的first node作为对象锁,锁住整个bucket
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek; //在bucket中找到了同key的node,就替换其value
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //遍历到list末尾了,仍未找到key,就新建一个node,添加到list末尾
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //如果是红黑树节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //当bucket中node过多超过或等于(默认值8)时,将此bucket转化为红黑树的
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //里面也用到CAS操作了
return null;
在put操作中,判断table是否是空的,如果是空的就调用resize()初始化容量为2的幂次方(延迟初始化),再通过(容量 - 1) & hash 计算放入哈希表中桶的位置,如果当桶为empty时,insert并不使用锁,而是使用CAS操作来将 new node方法此bucket作为first node如下
if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //当前 bucketfirst node = null
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //使用CAS来insert一个新node
break;
}
//CAS操作的静态方法:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c , v);
}
put中CAS分析:在putval代码中casTabAt返回的是boolean如果CAS成功了就break,跳出外层for (Node<K,V>[] tab = table;;)这个无限循环;如果失败了继续外层这个循环,如果第一个桶还是空的就继续上面的操作,不是空的往下走。