Java7 ConcurrentHashMap总结分析

2018-09-29  本文已影响0人  barry_di
ConcurrentHashMap

在JAVA线程安全的并发集合有HashTable,而HashTable
是一种通过在方法上使用Synchronize的简单粗暴的方式进行保证线程的安全,ConcurrentHashMap则是一个高效的并发集合。ConcurrentHashMap不是简单粗暴的在方法上使用Synchronize,而是通过分段(segment) 锁的方式进行处理并发问题。ConcurrentHashMap的锁机制更类似于我们数据库中的行锁,而HashTable则是整张表锁。下图为ConcurrentHashMap的数据结构。

结构.png
取模算法

ConcurrentHashMap获取Segment或者在Segment中获取Entry都是是通过mod求余来确定当前的Segment或者Entry.而在ConcurrentHashMap不是使用“%”的方式来进行求余值,而是通过&位运算的方式求余值。但是这种&位运算求余的方式必须是2n
公式:a%(2n)是等价于a&(2n-1)。

System.out.println("345%16="+(345%16)+" 等价于: 345&(16-1)="+(345&(16-1)));
输出结果:
345%16=9 等价于: 345&(16-1)=9
ConcurrentHashMap初始化
  public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //根据并发级别进行计算Segment的size和位移量
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //计算key&(2n-1)的值和key左移量
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //16384
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1; 
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

我们通过源码的初始化方法进行分析得出:

put
  @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        //计算hash值,在hash方法中会对对象中的hashCode进行Hash
        int hash = hash(key);
        //对hash值进行左位移后,求余
        int j = (hash >>> segmentShift) & segmentMask;
       //判断当这个Segment是否为Null,如果为Null创建一个Segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

   @SuppressWarnings("unchecked")
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        //判断是否存在segament,因为不是线程安全的所以需要多次检测是否存在Segment
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            //初始化Segment中Table的阔值、容量、负载因子
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            //创建Table
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            //判断是否存在segament,因为不是线程安全的所以需要多次检测是否存在Segment
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    //使用CAS方式进行插入Segment
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

通过对源码的分析我们可以看到因为在初始化ConcurrentHashMap的时候只有一个Segment,当计算出key的hash值后,若该Hash值没有对应的Segment的时候会去创建一个Segment,而因为在创建Segment是非加锁的方式,因此在创建Segment的时候会多次判断是否存在Segment,在最后还使用了CAS的方式进行插入到Segment的数组中。

CAS的使用的标准范式,在循环中使用CAS操作。保证CAS一定执行。

``
(1)这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
(2)循环table链,如果存在HashEntry时,则进行判断是否需要更新,如果存相等的Hash和Key,则判断onlyIfAbsent是否为False,若为False则更新当前的值并且跳出循环返回。因为put方法有putIfAbsent和put公共,而putIfAbsent则只有当前值不存在时插入,如果存在则返回旧值,而put则是无论任何时候都会更新旧值,并且将旧值返回。如果不存相等的Hash和Key。判断当前节点是否头节点,如果是则设置为头节点。然后判断是否需要对Table进行扩容,如果需要则从新计算hash值并且将新值增加到链中。如果不需要扩容则直接添加到链中。

get
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

计算 hash 值,找到 segment 数组中的具体位置.根据 hash 找到数组中具体的位置,循环链查找相等的值并且返回

size
 public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

在获取ConcurrentHashMap的size时候,是获取两次size的值判断是否一致,如果不是一致的情况下,再进行加锁操作。所以在高并发写入的情况下避免使用Size方法,以免由于加锁导致性能问题。

上一篇 下一篇

猜你喜欢

热点阅读