03 ConcurrentHashMap1.7源码深入剖析

2022-09-05  本文已影响0人  攻城老狮

1 ConcurrentHashMap 机制

ConcurrentHashMap 在 1.7版本中采用分段锁机制实现线程安全的支持高并发的HashMap集合类。ConcurrentHashMap 在对象中保存了一个 Segment 数组,即将整个Hash 表划分为多个分段;而每个 Segment 元素,即每个分段则类似于一个Hashtable;这样,在执行 put 操作时首先根据 hash 算法定位到元素属于哪个 Segment,然后对该Segment 加锁即可。因此,ConcurrentHashMap 在多线程并发编程中可是实现多线程 put 操作。

多线程环境下,使用 Hashmap 进行 put 操作会引起死循环,所以在并发情况下不能使用HashMap。

HashTable为保证线程安全付出的代价太大,get()、put()等方法都是 synchronized 的,这相当于给整个哈希表加了一把大锁。在并发调用HashTable的方法时就会造成大量的时间损耗。

2 构造方法

//初始化参数数值
static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* 无参构造方法,调用本身的传递三个参数的构造方法
* 参数1:initialCapacity 初始化容量
* 参数2:loadFactor 加载因子
* 参数3:concurrencyLevel 默认并发度(用于构建Segment的数组长度,但必须保证2的幂次方,不一定是最终使用的数值)
*/
public ConcurrentHashMap() {
  this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* 用于对 ConcurrentHashMap进行初始化的细节均在该构造方法中实现,
* 构造方法中会确认Segment数值的长度,每个Segment数值里面HashTable的长度,
* 并初始化首个s[0]的Segment元素。
*/
public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
  // 判别传入的参数是否合法
  if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
    throw new IllegalArgumentException();
  // 判断默认级别是否超过允许的最大开辟长度,猜测防止无限扩容Segment长度导致oom
  // => static final int MAX_SEGMENTS = 1 << 16;
  if (concurrencyLevel > MAX_SEGMENTS)
    concurrencyLevel = MAX_SEGMENTS;
  // Find power-of-two sizes best matching arguments
  // 保存ssize是2的几次幂
  int sshift = 0;
  // 用于初始化segments大小
  int ssize = 1;
  // 循环遍历找到大于等于concurrencyLevel的最小2次幂,例如:concurrencyLevel = 17,则大于等于 17 的最小2次幂为 32
  while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
  }
  // 用于后续计算Segments数组下标,使用高位参与运算
  this.segmentShift = 32 - sshift;
  // 掩码,用于位操作计算下标
  this.segmentMask = ssize - 1;
  // 初始容量不能超过规定的最大值
  // static final int MAXIMUM_CAPACITY = 1 << 30;
  if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
  // c 表示每个Segment元素中HashTable的大小
  int c = initialCapacity / ssize;
  // 确保总segment中table的容量大于等于initialCapacity
  // 例如 initialCapacity = 17,ssize = 16 则 c = 1, 1 * 16 < 17,所以 ++c 才可以满足总容量的需求
  if (c * ssize < initialCapacity)
    ++c;
  // 实际 hashtable 使用的容量,确保为2的幂次,hashtable 最小为 2
  // static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
  int cap = MIN_SEGMENT_TABLE_CAPACITY;
  // 如果容量不满足要求需要以2的幂次扩容,例如 cap = 2, c = 3   需要对 cap 扩容至 4
  while (cap < c)
    cap <<= 1;
  // create segments and segments[0]
  // 初始化index为0的segment,方便后面创建的segment对象可以直接使用这个segment的参数,不需要重复计算
  Segment<K,V> s0 =
    new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                     (HashEntry<K,V>[])new HashEntry[cap]);
  // 根据 ssize 大小创建 Segments 数组
  Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  // 将 s0 放入 Segments 数组中
  UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  // 赋值
  this.segments = ss;
}

3 put() 方法

/**
* put 方法为核心方法,是可以保证多线程安全的方法。
* 主要逻辑是首先查看是否对应下标位置存在 Segment 元素,
* 如果不存在,则进行线程安全的初始化;
* 如果存在,则调用该位置的 Segment 对象的 put 方法具体存放数据到 HashTable 中。
*/
public V put(K key, V value) {
  Segment<K,V> s;
  // 不支持 value == null 的情况
  if (value == null)
    throw new NullPointerException();
  // 计算 key 的 hash 值
  int hash = hash(key);
  // 计算在 Segments 数组中的元素下标,使用hash值的高位参与运算获取下标位置
  int j = (hash >>> segmentShift) & segmentMask;
  // 如果下标位置不存在Segment元素,需要进行Segment对象的初始化,主要在 ensureSegment() 方法中实现
  if ((s = (Segment<K,V>)UNSAFE.getObject        // nonvolatile; recheck
       (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
    s = ensureSegment(j);
  // put 操作逻辑
  return s.put(key, hash, value, false);
}

4 ensureSegment() 方法

/**
*   ensureSegment() 方法主要负责对 Segment 对象的初始化,
* 涉及到并发安全的问题,主要采用 CAS 的方式进行无锁自旋。
*/
private Segment<K,V> ensureSegment(int k) {
  final Segment<K,V>[] ss = this.segments;
  // Segment元素下标
  long u = (k << SSHIFT) + SBASE; // raw offset
  Segment<K,V> seg;
  // 当前对应Segment元素为null,说明需要进行对象的初始化
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
    // 直接使用Segment[0]的属性来初始化,不需要二次计算参数数据
    Segment<K,V> proto = ss[0]; // use segment 0 as prototype
    int cap = proto.table.length;
    float lf = proto.loadFactor;
    int threshold = (int)(cap * lf);
    HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
    // 再检测一下是否存在,如果存在就不必进行new对象的操作逻辑,提升效率
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
        == null) { // recheck
      // new Segment 对象
      Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
      // CAS 的方式将新创建的Segment对象添加到Segments数组中
      while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
             == null) {
        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
          break;
      }
    }
  }
  // 返回新创建或者其他线程新创建的Segment对象
  return seg;
}

5 Segment 的 put() 方法

/**
* Segment的put()方法实现具体将key插入到Segment本身的table中,插入前需要先加锁,
* 之后遍历插入位置的链表,遇到相同的元素则修改并返回旧值,
* 到尾节点依旧没有找到相同元素,则创建新的entry元素插入。
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  // 如果没有获取到锁,则阻塞在这一步
  HashEntry<K,V> node = tryLock() ? null :
  scanAndLockForPut(key, hash, value);
  // 往下执行,代表已获取到锁
  V oldValue;
  try {
    HashEntry<K,V>[] tab = table;
    // 计算 key 应该在该 Segment 元素的 HashTable 插入的下标
    int index = (tab.length - 1) & hash;
    // 获取对应在segment中的头节点
    HashEntry<K,V> first = entryAt(tab, index);
    // 循环遍历头节点
    for (HashEntry<K,V> e = first;;) {
      // 头节点不为null
      if (e != null) {
        K k;
        // 判断是否存在相同元素
        if ((k = e.key) == key ||
            (e.hash == hash && key.equals(k))) {
          // 旧值
          oldValue = e.value;
          // 覆盖旧值
          if (!onlyIfAbsent) {
            e.value = value;
            ++modCount;
          }
          break;
        }
        // 遍历链表下一个节点
        e = e.next;
      }
      // 头节点为null
      else {
        // node已初始化
        if (node != null)
          // 直接使用 scanAndLockForPut() 方法得到的 node,这里 first 一定为null,感觉大哥李写成 null 更容易理解
          node.setNext(first);
        // 需要初始化node
        else
          node = new HashEntry<K,V>(hash, key, value, first);
        // 长度+1
        int c = count + 1;
        // 判断是否需要扩容
        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
          rehash(node);
        // 直接将 node 设置在tab的index上
        else
          setEntryAt(tab, index, node);
        ++modCount;
        count = c;
        oldValue = null;
        break;
      }
    }
  } finally {
    // 解锁
    unlock();
  }
  // 返回旧值
  return oldValue;
}

6 scanAndLockForPut() 方法

/**
* scanAndLockForPut() 方法主要是在不断 tryLock() 尝试加锁,每次尝试加锁失败后,可以额外做些判断逻辑。
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
  // 获取在Segment对应位置的首节点
  HashEntry<K,V> first = entryForHash(this, hash);
  HashEntry<K,V> e = first;
  HashEntry<K,V> node = null;
  // 标志位,用于控制下面 if-else 的分支逻辑
  int retries = -1; // negative while locating node
  // 尝试加锁
  while (!tryLock()) {
    HashEntry<K,V> f; // to recheck first below
    // 循环遍历e,直到找到key值相等或为null
    if (retries < 0) {
      if (e == null) {
        // 当前节点是否为null
        if (node == null) // speculatively create node
          // 初始化node
          node = new HashEntry<K,V>(hash, key, value, null);
        retries = 0;
      }
      // key值相等,则设置retries为0
      else if (key.equals(e.key))
        retries = 0;
      // 继续找链表的下一个元素
      else
        e = e.next;
    }
    // 判断重试次数是否大于最大重试次数
    // static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    else if (++retries > MAX_SCAN_RETRIES) {
      // 直接加锁
      lock();
      break;
    }
    // 每隔2次判断,重现获取的首节点值与first不相等时,重置标志位。说明其他线程已经修改,需要重新获取最新的数据
    else if ((retries & 1) == 0 &&
             (f = entryForHash(this, hash)) != first) {
      e = first = f; // re-traverse if entry changed
      retries = -1;
    }
  }
  return node;
}

7 rehash() 方法

/**
* rehash() 方法用于数组的扩容操作,首先将新数组容量定义为原数组的2倍,之后将旧元素计算hash值,插入到新数组中。再将新元素采用头插法插入新数组,最后完成数组的覆盖,完成扩容。
*/
private void rehash(HashEntry<K,V> node) {
  HashEntry<K,V>[] oldTable = table;
  int oldCapacity = oldTable.length;
  // Segment中的新hashtable数组容量扩容为原来的2倍
  int newCapacity = oldCapacity << 1;
  threshold = (int)(newCapacity * loadFactor);
  // 新创建一个两倍原容量的table数组
  HashEntry<K,V>[] newTable =
    (HashEntry<K,V>[]) new HashEntry[newCapacity];
  int sizeMask = newCapacity - 1;
  // 遍历原table
  for (int i = 0; i < oldCapacity ; i++) {
    HashEntry<K,V> e = oldTable[i];
    // 原数组下标位置有元素
    if (e != null) {
      HashEntry<K,V> next = e.next;
      // 计算在新数组的下标
      int idx = e.hash & sizeMask;
      // 当前列表只有个首节点,则直接赋值到newTable的index值上,由于调用该函数的上层已经加锁,所以直接操作即可,线程安全的
      if (next == null)   //  Single node on list
        newTable[idx] = e;
      // 需要遍历链表
      else { // Reuse consecutive sequence at same slot
        // 此处逻辑是找到链表最后末尾连续几个元素,这些元素的hash相同,可以插入到新数组的相同下标位置
        HashEntry<K,V> lastRun = e;
        int lastIdx = idx;
        for (HashEntry<K,V> last = next;
             last != null;
             last = last.next) {
          int k = last.hash & sizeMask;
          if (k != lastIdx) {
            lastIdx = k;
            lastRun = last;
          }
        }
        // 直接将找到的最后一段相同节点的首节点赋值
        newTable[lastIdx] = lastRun;
        // Clone remaining nodes
        // 从头节点开始遍历lastRun的前一节点,一个一个的插入剩余元素
        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
          V v = p.value;
          int h = p.hash;
          int k = h & sizeMask;
          HashEntry<K,V> n = newTable[k];
          newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
        }
      }
    }
  }
  // 前面已经将原数组的旧元素添加到新数组,此处再将新元素插入
  int nodeIndex = node.hash & sizeMask; // add the new node
  // 头插法
  node.setNext(newTable[nodeIndex]);
  newTable[nodeIndex] = node;
  // 赋值新数组
  table = newTable;
}
上一篇下一篇

猜你喜欢

热点阅读