JAVA并发学习-并发容器ConcurrentHashMap

2017-06-10  本文已影响0人  eliter0609

首先简单的说下并发情况下HashMap,并发情况下,HashMap采用的是fail-fast快速失败机制)当容器在迭代的时候被修改hasNext()或next()就会抛出ConcurrentModificationException异常,

fail-fast具体实现:HashMap里有个modCount字段记录修改的次数,迭代器里有个expectedModCount,当得到迭代器时就会将modCount赋值给expectedModCount,迭代过程中每次修改会改变modCount的值而不会改变expectedModCount(除非是从迭代器里remove)而在调用hasNext或next的时候回检查modCount与expectedModCount是否相等,不相等就抛出上面的异常。

线程安全的HashTable实现比较粗暴,简单的以自身作为对象锁,将相关方法都声明为synchronized,故每次只有一个线程能调用这些同步方法。

HashMap与HashTable区别:

1.HashMap能存入null,HashTable则不能,主要是计算hash值的时候,HashMap判断如果是null,hash值为0
2.HashMap不是线程安全的HashTable是线程安全的 原因上面已说明
3.Hashtable 继承自 Dictiionary 而 HashMap继承自AbstractMap

java 8 ConcurrentHashMap

存储原理:

ConcurrentHashMap存储数据采用table数组+单向链表+红黑树的结构(与HashMap一致),对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

当在一个空bin中insert第一个node时,其使用CAS操作来同步,而对于其他的update操作(insert,delete, and replace)则需要使用锁来同步。而在每一个bucket中,一般使用此bin中第一个node作为这个bin的锁,锁住整个bucket。(因为新放入bin的node总会添加到list的末尾,故除了delete掉第一个节点或resize数组之外,这个节点总是此bin的第一个node,具有稳定性)。锁住整个bucket的策略是合理的,因为在实际使用中,一个bucket中的node不会太多(0.75的装填因子),所以一般锁住整个bin不会造成特别恶劣的性能影响。(同样ConcurrentHashMap也会使用tree化的策略,将过深的bin进行tree化,即使用红黑树来降低bin的深度,将查找时间限制为O(logN))。

关键的put( )方法:采用分段锁,提高并发效率
源码:

final V putVal(K key, V value, boolean onlyIfAbsent){ 
if (key == null || value == null)          //ConcurrentHashMap不允许null key或null value  
    throw new NullPointerException(); 

int hash = spread(key.hashCode());         //用位运算处理key.hashCode以使key分布更均匀  
int binCount = 0;                          //记录bucket中的node的数量  

for (Node<K,V>[] tab = table;;) {          //赋tab = 底层bin数组  
    Node<K,V> f;   
    int n, i, fh;  

    if (tab == null || (n = tab.length) == 0)     //延迟初始化,在第一次put的时候,才初始化  
        tab = initTable();  
              //f代表这个bucket中的第一个node  
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {         //此bucket为空的  
        if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))  
            break;                       //当向empty bin 中添加node时,并不使用锁,而使用CAS操作来添加  
    }  
    else if ((fh = f.hash) == MOVED)  
        tab = helpTransfer(tab, f);      //

    else {                               //***put操作的核心之处***  
        V oldVal = null;  
        synchronized (f) {               //以此bucket中的first node作为对象锁,锁住整个bucket  
            if (tabAt(tab, i) == f) {  
                if (fh >= 0) {  
                    binCount = 1;  
                    for (Node<K,V> e = f;; ++binCount) {  
                        K ek;                                  //在bucket中找到了同key的node,就替换其value  
                        if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {  
                            oldVal = e.val;  
                            if (!onlyIfAbsent)  
                                e.val = value;  
                            break;  
                        }  
                        Node<K,V> pred = e;  
                        if ((e = e.next) == null) {           //遍历到list末尾了,仍未找到key,就新建一个node,添加到list末尾  
                            pred.next = new Node<K,V>(hash, key, value, null);  
                            break;  
                        }  
                    }  
                }  
                else if (f instanceof TreeBin) {  //如果是红黑树节点
                       Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                }  
            }  
        }  

        if (binCount != 0) {  
            if (binCount >= TREEIFY_THRESHOLD)       //当bucket中node过多超过或等于(默认值8)时,将此bucket转化为红黑树的  
                treeifyBin(tab, i);  
            if (oldVal != null)  
                return oldVal;  
            break;  
        }  
    }  
}  
addCount(1L, binCount);              //里面也用到CAS操作了  
return null;  

在put操作中,判断table是否是空的,如果是空的就调用resize()初始化容量为2的幂次方(延迟初始化),再通过(容量 - 1) & hash 计算放入哈希表中桶的位置,如果当桶为empty时,insert并不使用锁,而是使用CAS操作来将 new node方法此bucket作为first node如下

if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                      //当前    bucketfirst node = null  
 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))  //使用CAS来insert一个新node  
       break;  
}  

//CAS操作的静态方法:

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {  
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c  , v);  
  }  

put中CAS分析:在putval代码中casTabAt返回的是boolean如果CAS成功了就break,跳出外层for (Node<K,V>[] tab = table;;)这个无限循环;如果失败了继续外层这个循环,如果第一个桶还是空的就继续上面的操作,不是空的往下走。

上一篇下一篇

猜你喜欢

热点阅读