回家

Java 并发容器 ConcurrentHashMap 实现原理

2020-05-23  本文已影响0人  tandeneck

前言

之前我们了解了 HashMap(HashMap 基于 JDK 1.7 源码解析),知道其是非线程安全的,当我们只有一个线程在使用 HashMap 的时候,自然不会有问题,但是并不适用于多线程情况,可以通过 Collections.synchronizedMap() 方法或者替换为 Hashtable,但是这两种方式基本都是对整个表结构做锁定操作,性能不是很理想,这时我们可以使用 ConcurrentHashMap,需要注意的是,ConcurrentHashMap 也分 1.7,1.8版本,两者实现上略有不同。

1.7 版本

数据结构

ConcurrentHashMap 数据结构为一个 Segement 数组,Segment 的数据结构为 HashEntry 数组,而 HashEntry 存储的是键值对,可以构成链表。如下图所示:

Segment 和 HashEntry

Segment 类继承 ReentrantLock 类,从而使 Segment 对象能充当锁的角色,不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上每当一个线程占用锁访问一个 Segment 时,不会影响其他的 Segment,这就是 ConcurrentHashMap 的锁分段技术。Segment 源码如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {  
 private static final long serialVersionUID = 2249069246763182397L;  
         /** 
          * 在本 segment 范围内,包含的 HashEntry 元素的个数
          * 该变量被声明为 volatile 型,保证可见性,即每次读到都是
          * 最新数据
          */  
         transient volatile int count;  

         /** 
          * table 被更新的次数
          */  
         transient int modCount;  

         /** 
          * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
          */  
         transient int threshold;  

         /** 
          * table 是由 HashEntry 对象组成的数组
          * 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
          * table 数组的数组成员代表散列映射表的一个桶
          * 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
          * 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 
          */  
         transient volatile HashEntry<K,V>[] table;  

         /** 
          * 负载因子
          */  
         final float loadFactor;  
 }

其中的 HashEntry:

 static final class HashEntry<K,V> {  
     final K key;  
     final int hash;  
     volatile V value;  
     final HashEntry<K,V> next;  
 } 

HashEntry 几乎是不可变的,代表的是每个 hash 链中的一个节点。可以看到除了 value 其余都是 fianl 的,这意味着不能从 hash 链的中间或尾部添加加或者删除节点,因为这需要修改 next 引用值,所有的节点的修改值只能从头部开始。

put 方法

代码如下:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

通过 key 找到 Segment,再在 Segment 中执行 put 操作。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 加锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue; // 旧值
    try {
        HashEntry<K,V>[] tab = table; // table 数组
        int index = (tab.length - 1) & hash; //找到在 table 中的索引
        HashEntry<K,V> first = entryAt(tab, index); // 根据索引获取到 HashEntry
        // 遍历 HashEntry 链表
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                 // key 存在,更新值
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); //扩容
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock(); // 释放锁
    }
    return oldValue;
}

由于 HashEntry 不能保证原子性(volatile 只能保证可见性),因此要对 put 操作要做加锁处理。

get 方法

public V get(Object key) {
    Segment<K,V> s; 
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get 逻辑比较简单,如下:

Remove 方法

    V remove(Object key, int hash, Object value) {
        lock(); //加锁
        try {
            int c = count - 1;
            HashEntry<K, V>[] tab = table;
            //根据 hash 找到 table 的索引
            int index = hash & (tab.length - 1);
            //找到索引对应的那个桶
            HashEntry<K, V> first = tab[index];
            HashEntry<K, V> e = first;
            while (e != null && (e.hash != hash || !key.equals(e.key)))
                e = e.next;

            V oldValue = null;
            if (e != null) {
                V v = e.value;
                if (value == null || value.equals(v)) { //找到要删除的节点
                    oldValue = v;
                    ++modCount;
                    HashEntry<K, V> newFirst = e.next;// 待删节点的后继结点
                    for (HashEntry<K, V> p = first; p != e; p = p.next)
                        //头插法
                        newFirst = new HashEntry<K, V>(p.key, p.hash,
                                newFirst, p.value);
                    //把桶链接到新的头结点
                    //新的头结点是原链表中,删除节点之前的那个节点
                    tab[index] = newFirst;
                    count = c;     
                }
            }
            return oldValue;
        } finally {
            unlock(); //释放锁
        }
    }

整个操作是在持有段锁的情况下执行的,需要注意的是删除结点后,该删除结点前面的结点会倒序,如下图:


1.8 版本

数据结构

  1. 8 的实现抛弃了 Segment 分段锁机制,利用 CAS + synchronized 来保证并发更新的安全,底层采用数组 + 链表 + 红黑树的数据结构,用 Node 替换 了 HashEntry(名称改变了而已),如下图所示:


put 方法

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode()); //计算 hash
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); // 初始化 table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // 利用 CAS 尝试写入数据
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f); // 扩容处理
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                synchronized (f) { // 利用 sychronized 获取锁
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

get 方法

   public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); // 计算 hash 值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                //在桶上直接返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                // 通过红黑树获取值
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 链表单向查询
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

总结

1.7 版本主要使用了分段锁的机制,将数据分成许多个 Segment,每个 Segment 拥有一把锁,当一个线程占用锁访问其中一个 Segment 的时候,其他 Segment 的数据也能被其他线程访问。有些方法需要跨 Segment,比如 size() 和 containsValue(),它们可能需要锁定整个表而而不仅仅是某个Segment,这需要按顺序锁定所有 Segment,操作完毕后,又按顺序释放所有段的锁,否则可能会出现死锁。

1.8 版本 抛弃了 1.7 版本的分段锁机制,引入了 CAS + Synchronized 的机制,数据结构也变成了 数组 + 链表 + 红黑树 的形式,CAS 是非常轻量级的同步操作,而且也跟新版本虚拟机对 Synchronized 的优化有关,有兴趣的同学可以去具体了解。

参考

上一篇下一篇

猜你喜欢

热点阅读