JDK源码

java.util.concurrent.ConcurrentH

2018-05-15  本文已影响6人  sunpy

为什么使用ConcurrentHashMap?

首先要明白HashTable容器是通过使用关键字synchronized来保证线程安全的,但是HashTable的效率并不高,因为当线程1添加元素时,线程2不能获取和添加元素了,就是锁的细粒度不高,而ConcurrentHashMap容器是将容器进行分段,在每个段Segment上面上锁,这样细粒度就会增高,提高了并发的效率。

继承与实现关系

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable

ConcurrentHashMap的结构

20170912112223306.jpg

解释:
ConcurrentHashMap是由Segment数组组成的,而Segment对象里面是包含HashEntry数组。而Segment类似于HashMap,是数组加链表的组合结构。所以如果我们想要修改HashEntry数组中的元素时,首先必须获得与它对应的Segment的锁。可以理解为ConcurrentHashMap为多个加锁的HashMap组成的。

ConcurrentHashMap的全局变量

    //默认初始化容量为16
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //默认的加载因子为0.75
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //默认并发级别为16(锁的个数)  
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量为2^30次幂
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的分段容量为2(需要为2的n次幂)  
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大分成2^16段
    static final int MAX_SEGMENTS = 1 << 16;
    //同步重试的次数  
    static final int RETRIES_BEFORE_LOCK = 2;

    //片段偏移量
    final int segmentMask;
    //片段掩码
    final int segmentShift;
    //每个上锁的片段数组
    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

HashEntry数据结构

static final class HashEntry<K,V> {
        final int hash;//节点对应的hash值  
        final K key;//节点对应的键
        volatile V value;//节点对应的值
        volatile HashEntry<K,V> next;//后继结点 
        
        //初始化节点的构造器
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        //设置当前节点的后继节点
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }
}

Segment结构

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    
        private static final long serialVersionUID = 2249069246763182397L;

        //在准备锁定段操作之前,在预锁之前对预存器进行最大锁定次数
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        //片段上的桶数组
        transient volatile HashEntry<K,V>[] table;
        //片段中桶的数量
        transient int count;
        //修改片段的次数  
        transient int modCount;
        //临界值,如果实际大小超过了临界值,就使用容量*加载因子重新计算来扩容
        transient int threshold;
        //加载因子  
        final float loadFactor;
        
        //通过加载因子,临界值,HashEntry节点来初始化片段
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
 }

Segment中的功能方法

rehash函数:将表的大小扩增,重新计算hash值,然后将指定的节点添加到新表中

        @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {
            //获取旧表
            HashEntry<K,V>[] oldTable = table;
            //获取旧表的容量
            int oldCapacity = oldTable.length;
            //计算新表的容量为旧表的2倍
            int newCapacity = oldCapacity << 1;
            //获取临界值,新容量乘以加载因子
            threshold = (int)(newCapacity * loadFactor);
            //构造新的HashEntry数组  
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            //计算新的长度掩码
            int sizeMask = newCapacity - 1;
            //遍历旧容量
            for (int i = 0; i < oldCapacity ; i++) {
                //获取旧表的节点HashEntry ,也可以说获取每个桶
                HashEntry<K,V> e = oldTable[i];
                //如果该节点不是尾节点
                if (e != null) {
                    //获取当前遍历节点的后继节点
                    HashEntry<K,V> next = e.next;
                    //通过当前节点的hash值与长度掩码的按位与运算获取下标索引
                    int idx = e.hash & sizeMask;
                    //如果后继节点为空,就是当前节点的后继节点为空,那么就是尾节点
                    if (next == null)   
                        //给新的HashEntry数组对应的下标设置值
                        newTable[idx] = e;
                    else { //如果后继节点不为空
                        //初始化最后运行的节点为当前节点e
                        HashEntry<K,V> lastRun = e;
                        //初始化最后运行的节点为当前下标idx
                        int lastIdx = idx;
                        //遍历当前的桶
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            //计算当前的下标索引
                            int k = last.hash & sizeMask;
                            //如果当前下标不等于旧表下标
                            if (k != lastIdx) {
                                //设置最新遍历到的下标索引
                                lastIdx = k;
                                //设置最新遍历到的HashEntry节点
                                lastRun = last;
                            }
                        }
                        //新表设置最后运行的节点
                        newTable[lastIdx] = lastRun;
                        //遍历当前的桶,构造新的HashEntry单链表(头插法)
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            //获取HashEntry的值
                            V v = p.value;
                            //获取HashEntry的hash值
                            int h = p.hash;
                            //计算当前的下标索引
                            int k = h & sizeMask;
                            //获取下标索引对应的HashEntry节点值 
                            HashEntry<K,V> n = newTable[k];
                            //构造新的HashEntry节点,作为下次的后继节点
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            //获取节点的下标索引
            int nodeIndex = node.hash & sizeMask;
            //设置节点的后继结点 
            node.setNext(newTable[nodeIndex]);
            //设置新表的下标索引nodeIndex值为node
            newTable[nodeIndex] = node;
            table = newTable;
        }

put函数:往片段Segment存入元素的操作

        //往片段Segment存入元素的操作
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //如果片段的锁已经被其他线程持有了,那么将获取null节点
            //如果片段的锁没有被其他线程持有,那么将直接获取节点链中对应的节点
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                //获取片段上的桶数组(旧)
                HashEntry<K,V>[] tab = table;
                //计算下标索引
                int index = (tab.length - 1) & hash;
                //获取给定表tab中指定下标index对应的头节点first  
                HashEntry<K,V> first = entryAt(tab, index);
                //遍历节点链表
                for (HashEntry<K,V> e = first;;) {
                    //如果遍历到的节点不为空
                    if (e != null) {
                        K k;
                        //如果键相同,或者hash值相同和键值相等
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            //获取键对应的节点值
                            oldValue = e.value;
                            //只有当值在节点链中存在的时候
                            if (!onlyIfAbsent) {
                                //新值覆盖旧值
                                e.value = value;
                                //修改片段次数加1
                                ++modCount;
                            }
                            //跳出循环
                            break;
                        }
                        //遍历下个后继节点
                        e = e.next;
                    }
                    else {//如果遍历到的节点为空
                        //如果node节点不为空
                        if (node != null)
                            //设置node节点的后继结点为first  
                            node.setNext(first);
                        else//如果node节点为空
                            //重新构建一个HashEntry节点
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //将片段中桶的数量加1
                        int c = count + 1;
                        //如果片段的容量大于了临界值并且表的长度小于最大容量  
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //将表的大小扩增,重新计算hash值,然后将指定的节点添加到新表中 
                            rehash(node);
                        else//为指定表的指定下标设置节点值node 
                            setEntryAt(tab, index, node);
                        //片段的修改次数加1
                        ++modCount;
                        //重新设置片段中的桶的数量
                        count = c;
                        //释放临时变量
                        oldValue = null;
                        //跳出循环
                        break;
                    }
                }
            } finally {
                //将当前线程获取的锁释放 
                unlock();
            }
            //返回替换的值
            return oldValue;
        }

编写思路:
①首先判断当前片段是否被其他线程所持有,然后获取节点链HashEntry的头节点,准备遍历单链表,然后分为没有遍历到尾节点情况和遍历到尾节点的情况。
②没有遍历到尾节点;如果出现键相同或者键值相同并且hash值相同,那么就覆盖节点链中的旧值,全局变量修改片段次数modCount加1,跳出循环。如果出现键不同或者键值不同,那么将继续往下遍历节点链(桶)。
③遍历到尾节点;如果当前片段并没有被其他线程持有,那么将节点node插入到头节点前面(头插法)。如果当前片段为空,那么就需要重新构建一个节点链了。然后判断片段Segment中桶的数量是否大于临界值(threshold = capicity乘loadFactor),如果大于了临界值,将桶的容量扩大为原来的2倍。(值得注意的是先添加元素,再判断容量是否超出,如果达到临界值,而无元素添加,就会产生一次无效的扩容)

ConcurrentHashMap的构造方法

//构造一个带有默认初始化容量、默认加载因子、默认并发级别的空映射
public ConcurrentHashMap() {  
        this(DEFAULT_INITIAL_CAPACITY, 
             DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);  
} 
//构造一个带有指定容量、默认加载因子、默认并发级别的空映射 
public ConcurrentHashMap(int initialCapacity) {  
        this(initialCapacity, DEFAULT_LOAD_FACTOR,   DEFAULT_CONCURRENCY_LEVEL);  
}
//构造一个带有指定容量、指定加载因子、默认并发级别的空映射
public ConcurrentHashMap(int initialCapacity, float loadFactor) {  
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);  
    } 
//通过一个指定的映射关系Map来构造一个映射
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {  
        this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,  
                      DEFAULT_INITIAL_CAPACITY),  
             DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);  
        putAll(m);  
}  
    //构造一个指定容量、指定加载因子、指定并发级别的新映射
    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //参数校验,如果不满足条件就抛出非法参数异常
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //如果并发级别大于最大分段数,那么并发级别设置为最大分段数(2的16次方)
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        //变量sshift用于记录ssize增加的次数  
        int sshift = 0;
        //变量ssize用于记录小于锁的数量的最大偶数 
        int ssize = 1;
        //循环遍历将ssize设置为2的sshift次方
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //设置片段掩码
        this.segmentShift = 32 - sshift;
        //设置片段偏移量
        this.segmentMask = ssize - 1;
        //如果初始化容量大于最大容量,将最大容量设置为初始化容量
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //初始化容量除以锁的数量得到分段数量
        int c = initialCapacity / ssize;
        //设置分段数量,如果 initialCapacity / ssize有余数,那么c将会额外加1一次
        if (c * ssize < initialCapacity)
            ++c;
        //获取最小分段数量
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        //如果最小分段数量小于分段数量将循环,最小分段数量将以2倍的方式递增
        while (cap < c)
            cap <<= 1;
        //初始化数组中每个元素对应的片段Segment对象
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        //有ssize个锁,那么就构建ssize个Segment片段
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

ConcurrentHashMap中常用的方法


注意:按位与运算
0&0=0; 0&1=0; 1&0=0; 1&1=1;
按位与运算只有两位同时为1时才为1


二次hash函数:

    /**
     * 当我们存入元素时需要定位到对应的Segment时,而一次hash的结果是通过按位与运算。
     * 我们知道按位与运算的特点就是如果hash值的低位相同,无论高位是什么,结果总是一样的。
     * 这个函数是为了二次hash计算,这样就会减少冲突
     */
    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

解释:在前面的Segment的rehash函数中,我们发现key的计算是通过HashEntry节点的hash值和长度掩码sizeMask(sizeMask低位都是1,高位都是0)的按位与运算获得。hash值和长度掩码的与运算最后的结果完全都依赖于低位了,散列值冲突严重,采用二次hash运算是为了避免这种冲突,保持hash冲突的结果降低。


put方法:将指定的键映射到该表中指定的值

    //将指定的键映射到该表中指定的值  
    @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        //参数校验,如果存入的值value为空,那么将抛出空指针异常
        if (value == null)
            throw new NullPointerException();
        //通过键key生成hash值
        int hash = hash(key);
        //hash值右移片段偏移量个位,然后按位与片段掩码 
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            //通过给定的索引获取片段
            s = ensureSegment(j);
        //将元素存入到片段
        return s.put(key, hash, value, false);
    }

get方法:如果映射不包含键的映射,则返回指定键被映射到的值

    //如果映射不包含键的映射,则返回指定键被映射到的值 
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        //通过键二次hash生成hash值
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            //遍历片段中的节点链HashEntry
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                //如果键存在于节点HashEntry链中,那么就返回HashEntry节点值
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

阅读总结

(1)ConcurrentHashMap是一种采用分段锁的方式来实现的。
(2)ConcurrentHashMap默认初始化容量为16,默认加载因子为0.75。
(3)ConcurrentHashMap中HashEntry节点链采用单链表数据结构。
(4)ConcurrentHashMap的put方法是采用头插法,先插入值,再判断是否超出临界值。如果超出了临界值,将扩容为原来的2倍。


--------------------------------------该源码为jdk1.7版本的

上一篇 下一篇

猜你喜欢

热点阅读