concurrenthashmap浅谈java并发编程程序员

ConcurrentHashMap源码分析

2017-04-13  本文已影响904人  Justlearn

前言

JDK中的Hashtable是一个线程安全的K-V形式的容器,它实现线程安全的原理十分简单,就是在所有涉及对该哈希表操作的方法上都加上了synchronized关键字,进行加锁操作。这么做实现了线程安全,但是效率非常低。

//通过synchronized在每次进入方法时获取hashmap的锁,高并发情况下会出现争用冲突
public synchronized V get(Object key) {
        Entry tab[] = table;
        int hash = hash(key);
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return e.value;
            }
        }
        return null;
    }

ConcurrentHashMap使用了分段锁的设计,只有在每个段上才会存在并发争用,不同的段之间不存在锁的竞争,提高了该hashmap的并发访问效率。但也是由于ConcurrentHashMap的分段设计,某些需要扫描所有段的操作如size()等方法实现上比较复杂,并且并不能保证强一致性,这个在某些情况下需要注意。由于ConcurrentHashMap在各个JDK版本下的实现是有区别的,特说明本文基于jdk1.7源码,我们下面一起通过其源码来分析它的实现原理。

ConcurrentHashMap的组成

查看源码,我们可以通过ConcurrentHashMap的成员变量,了解该数据结构的基本组成。

    final int segmentMask;//作为查找segments的掩码,前几个bit用来选择segment

    final int segmentShift;

    final Segment<K,V>[] segments;//每个segment都是一个hashtable 

ConcurrentHashMap内部有一个segments数组,每个segment都是一个hashtable ,由于Segment继承自ReentrantLock ,因此Segment本身就是锁,对每个Segment的加锁就是调用自身的acquire()方法。Segment的基本组成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile HashEntry<K,V>[] table;//存入ConcurrentHashMap的k-v数据都存在这里
        transient int count;//该segment内的HashEntry计数(如put/remove/replace/clear)
        transient int modCount;//对该segment的修改操作数量
        final float loadFactor;//hashtable的负载因子
. . . . . . . . .
}

HashEntry是最终存储每一对k-v的最底层数据结构,它的结构为:

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
}

value为volatile,在多线程环境下可以保持可见性,因此可以不加锁直接读取。
ConcurrentHashMap的整体数据结构如图


ConcurrentHashMap源码分析

本文前言中,我们说了‘ConcurrentHashMap通过分段锁技术提高了该hashmap的并发访问效率’,我们接下来就通过ConcurrentHashMap的get/set/remove等方法来说明,ConcurrentHashMap在保证线程安全的情况下,是如何做到这些的。

//loadFactor是每个segment的负载因子,concurrencyLevel是估计的并发修改线程,默认为16,所以创建16个segment
  public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;//默认16
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);//初始化一个segment
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }
 private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//u是该key在segments数组的下标,可以定位该key是在哪一个Segment
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
//(tab.length - 1) & h)定位key在Segment的HashEntry数组中的下标
//遍历HashEntry链表,找到与key和key的hash值一致的HashEntry元素
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
//该方法是ConcurrentHashMap所有put操作的核心
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//首先尝试加锁(调用ReentrantLock的tryLock方法,对当前Segment实例加锁),若没有获取成功则找到与该Key相同的Node,若没有则new一个。返回时必须获取锁,
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;//
        int index = (tab.length - 1) & hash;//当前该Node的插入点
        HashEntry<K,V> first = entryAt(tab, index);//获取tbale中第index个元素
        //在链表上遍历对比节点有相同的key则修改对应value,没有则放在链表尾部
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {//key不存在的情况下才会设置value
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                //当当前segment的元素数量超过阈值时rehash
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);//超过阈值进行扩容
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
}
public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)//判断前后modCount和相等
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

总结

以上就是jdk1.7中ConcurrentHashMap重要操作的源码实现分析。我们可以对 ConcurrentHashMap做出 以下判断:

参考

上一篇 下一篇

猜你喜欢

热点阅读