源码分析:Redis hash和Java HashMap、Con

2021-06-12  本文已影响0人  史啸天

简述

大家在开发工作中经常使用的key-value的数据结构,像Java中有HashMap,线程安全的ConcurrentHashMap,在Redis中hash结构更是最基础的;
这里我们来分析一下它们的底层数据结构,包括如何扩容的;

HashMap

Java的HashMap是比较传统的数据结构了,网上关于HashMap的数据结构介绍有很多,就不再过多赘述了,这里咱们以JDK1.8的为主;
我们重点关注HashMap的几个常用方法,话不多说,上代码。

构造方法

1、无参数构造方法

public HashMap() {
  // 哈希加载因子,赋予默认值0.75
  this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}

2、1个参数的构造方法

public HashMap(int initialCapacity) {
  // 调用两个参数的构造方法,DEFAULT_LOAD_FACTOR=0.75
  this(initialCapacity, DEFAULT_LOAD_FACTOR);
}

3、2个参数的构造方法

public HashMap(int initialCapacity, float loadFactor) {
  // 入参小于0,抛出异常
  if (initialCapacity < 0)
    throw new IllegalArgumentException("Illegal initial capacity: " +
                                               initialCapacity);
  // 超过最大值,赋予最大值
  if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
  // 加载因子小于0或者非数字,抛出异常
  if (loadFactor <= 0 || Float.isNaN(loadFactor))
            throw new IllegalArgumentException("Illegal load factor: " +
                                               loadFactor);
  // 赋值加载因子
  this.loadFactor = loadFactor;
  // 赋值初始化大小
  this.threshold = tableSizeFor(initialCapacity);
}

4、参数是Map的构造方法

public HashMap(Map<? extends K, ? extends V> m) {
  // 赋值默认加载因子
  this.loadFactor = DEFAULT_LOAD_FACTOR;
  // 实现是循环调用put方法初始化一个HashMap
  putMapEntries(m, false);
}
/**
 * 初始化一个HashMap
 */
final void putMapEntries(Map<? extends K, ? extends V> m, boolean evict) {
  int s = m.size();
  if (s > 0) {
    // 如果Node数组为null,开始初始化
    if (table == null) { // pre-size
      float ft = ((float)s / loadFactor) + 1.0F;
      int t = ((ft < (float)MAXIMUM_CAPACITY) ? 
                      (int)ft : MAXIMUM_CAPACITY);
      if (t > threshold)
        threshold = tableSizeFor(t);
     // 如果传入的map大于初始长度,开始执行扩容方法
     } else if (s > threshold){
      resize();
     }
    // 循环调用putVal方法,开始构造map
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
      K key = e.getKey();
      V value = e.getValue();
      putVal(hash(key), key, value, false, evict);
    }
  }
}
put方法
/**
  *  首先key使用hash()方法,返回一个int值,底层也是调用putVal方法
  */
public V put(K key, V value) {
  return putVal(hash(key), key, value, false, true);
}
/**
  *  
  */
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
  Node<K,V>[] tab; Node<K,V> p; int n, I;
  // Node数组为null或者长度为0,调用resize扩容方法
  if ((tab = table) == null || (n = tab.length) == 0)
    n = (tab = resize()).length;
  // 找出元素存放桶的位置,并判断该位置是否为null,如果是null创建新的Node放入数组中
  if ((p = tab[i = (n - 1) & hash]) == null)
    tab[i] = newNode(hash, key, value, null);
  else {
    // 桶的位置有元素,则执行以下方法
    Node<K,V> e; K k;
    // 判断当前位置的key的hash值和旧key的hash值是否相同
    if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k))))
      // 新旧key的hash值相同,进行覆盖
      e = p;
    // 校验当前Node已经是红黑树结构
    else if (p instanceof TreeNode)
      // 执行红黑树的put操作
      e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
    else {
      // 遍历Node上的链表
      for (int binCount = 0; ; ++binCount) {
        // 判断链表的下一个节点是否为空,为空则代表已经达到链表末尾
        if ((e = p.next) == null) {
          // 在链表末尾增加一个元素
          p.next = newNode(hash, key, value, null);
          // 判断是否达到了红黑树的阈值,大于等于8-1
          if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
            // 尝试转换为红黑树
            treeifyBin(tab, hash);
          // 跳出,结束循环
          break;
        }
        // 判断key是否与链表中的某个key相等(包括hash值),完全相等则跳出
        if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k))))
          break;
         // 将p赋值下一个节点,e=p.next,通过这种方式实现遍历链表
         p = e;
      }
    }
    // 如果e不为null,则表示链表上存在key相同的元素
    if (e != null) { // existing mapping for key
      V oldValue = e.value;
      // 判断是否要替换值
      if (!onlyIfAbsent || oldValue == null)
        // 替换value值
        e.value = value;
      // 空的钩子函数,回调函数
      afterNodeAccess(e);
      // 返回链表重复旧的value值
      return oldValue;
    }
  }
  // 次数加1
  ++modCount;
  // size加1,如果大于容量,执行扩容操作
  if (++size > threshold)
    resize();
  // 又是一个空的钩子函数,回调函数
  afterNodeInsertion(evict);
  // 返回null
  return null;
}
/**
 * 尝试将链表转换成红黑树
 */
final void treeifyBin(HashMap.Node<K,V>[] tab, int hash) {
    int n, index; HashMap.Node<K,V> e;
    // Node数组是空,或者Node数组小于64,进行扩容操作
    if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
        resize();
    // 否则,判断是否为null,进行红黑树转换
    else if ((e = tab[index = (n - 1) & hash]) != null) {
        HashMap.TreeNode<K,V> hd = null, tl = null;
        do {
            HashMap.TreeNode<K,V> p = replacementTreeNode(e, null);
            if (tl == null)
                hd = p;
            else {
                p.prev = tl;
                tl.next = p;
            }
            tl = p;
        } while ((e = e.next) != null);
        if ((tab[index] = hd) != null)
            hd.treeify(tab);
    }
}
resize方法(扩容方法)
/**
 * 扩容方法
 */
final HashMap.Node<K,V>[] resize() {
    // 旧的Node数组
    HashMap.Node<K,V>[] oldTab = table;
    // 旧的数组长度
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    // 旧的Map容量
    int oldThr = threshold;
    int newCap, newThr = 0;
    // 如果旧的数组长度大于0
    if (oldCap > 0) {
        // 旧的数组长度大于最大值,直接返回旧的Node数组
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        // 扩容,新的Node.length乘以2,容量也乘以2
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // double threshold
    }
    // 如果旧数组长度不大于0,但是容量大于0,则代表要进行初始化操作
    else if (oldThr > 0) // initial capacity was placed in threshold
        newCap = oldThr;
    // 都不大于0,则代表初始化,赋予默认大小
    else {               // zero initial threshold signifies using defaults
        // DEFAULT_INITIAL_CAPACITY = 1 << 4 是16
        newCap = DEFAULT_INITIAL_CAPACITY;
        // 容量是加载因子0.75 乘以 默认桶容量16
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }
    // 如果新的容量为0,代表执行的是初始化操作,给予新容量赋值
    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                (int)ft : Integer.MAX_VALUE);
    }
    // 赋值新的容量
    threshold = newThr;
    // 创建新的Node数组,并赋值给全局Node数组
    HashMap.Node<K,V>[] newTab = (HashMap.Node<K,V>[])new HashMap.Node[newCap];
    table = newTab;
    // 如果旧的Node数组不是null,代表不仅是初始化操作,开始进行数组拷贝
    if (oldTab != null) {
        // 循环
        for (int j = 0; j < oldCap; ++j) {
            HashMap.Node<K,V> e;
            // 非空判断
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                //  Node的当前数组,不存在链表结构
                if (e.next == null)
                    // 直接进行新Node数组的位置计算,并赋值
                    newTab[e.hash & (newCap - 1)] = e;
                // 如果是红黑树,进行红黑树的扩容
                else if (e instanceof HashMap.TreeNode)
                    ((HashMap.TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // preserve order
                    // 不需要移动的链表
                    HashMap.Node<K,V> loHead = null, loTail = null;
                    // 需要移动的链表
                    HashMap.Node<K,V> hiHead = null, hiTail = null;
                    // Node数组的链表
                    HashMap.Node<K,V> next;
                    do {
                        next = e.next;
                        // 将链表拆分成需要移动和不需要移动的两个
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    // 循环操作链表
                    } while ((e = next) != null);
                    // 不需要移动的原位置赋值
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    // 需要移动的原位置+旧数组长度位置
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    // 返回新的扩容后的Node数组
    return newTab;
}
remove方法
/**
 * 删除操作
 */
public V remove(Object key) {
    HashMap.Node<K,V> e;
    return (e = removeNode(hash(key), key, null, false, true)) == null ?
            null : e.value;
}
/**
 * 真正的删除方法
 */
final HashMap.Node<K,V> removeNode(int hash, Object key, Object value,
                                   boolean matchValue, boolean movable) {
    HashMap.Node<K,V>[] tab; HashMap.Node<K,V> p; int n, index;
    // 校验空和初始化操作,根据hash运算找出key所在桶位置
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (p = tab[index = (n - 1) & hash]) != null) {
        HashMap.Node<K,V> node = null, e; K k; V v;
        // 如果桶位置的key完全相等
        if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
            node = p;
        // 判断是否存在链表,链表不为空执行下面方法
        else if ((e = p.next) != null) {
            // 判断是否是红黑树结构
            if (p instanceof HashMap.TreeNode)
                // 调用红黑树的方法,找到树的具体节点
                node = ((HashMap.TreeNode<K,V>)p).getTreeNode(hash, key);
            else {
                // 循环找到元素在链表的具体位置
                do {
                    if (e.hash == hash &&
                            ((k = e.key) == key ||
                                    (key != null && key.equals(k)))) {
                        node = e;
                        break;
                    }
                    p = e;
                } while ((e = e.next) != null);
            }
        }
        if (node != null && (!matchValue || (v = node.value) == value ||
                (value != null && value.equals(v)))) {
            // node节点如果是红黑树
            if (node instanceof HashMap.TreeNode)
                // 调用红黑树的移除方法
                ((HashMap.TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
            // 经过上序操作,node如果和p相等,则表示这个位置不存在链表
            else if (node == p)
                // 将当前桶位置赋值为空,因为当前桶位置不存在链表,所以node.next是个空
                tab[index] = node.next;
            else
                // 移动链表,将后续链表值赋予当前位置
                p.next = node.next;
            // 修改次数加1
            ++modCount;
            // 长度减1
            --size;
            // 钩子回调函数
            afterNodeRemoval(node);
            // 返回当前节点,最终remove操作会返回value值
            return node;
        }
    }
    return null;
}
总结

需要注意几个细节,在面试中可能会被经常问到

1、直接new出来的HashMap容量是多少?
答:我们从源码可以看到,只有入参是Map的构造方法会进行Node数组的初始化和赋值;其他的构造方法new出来的HashMap是不会进行初始化的,只有在第一个元素put进去的时候,会调用resize方法,进行数组的初始化;

2、什么情况下会触发扩容?链表长度达到8一定会转换红黑树吗?
-首先,Map的size大于16(容量)*0.75(加载因子)的时候会触发resize()扩容操作;
-其次,再新增元素到链表中时,会判断是否大于等于8-1,满足的话会 调用treeifyBin()方法,尝试进行红黑树转换;
-最后,在转换红黑树之前,会判断Node数组长度是否小于64,如果小于64就放弃转换,进行扩容操作;
基于源码我们可以理解扩容是基于两个逻辑:
        一个是HashMap中数组元素过多,容易频繁触发hash冲突;
        另一个是元素虽然不多,但是元素都集中在某一个位置上,严重影响效率,就会通过扩容,来重新计算hash,让其分布的更加均匀;

3、put有返回值嘛?返回的是什么?
答:从源码我们可以看到,put方法会返回放入的value值,但是只有存入的元素被放置在链表的位置才会有返回,如果放入数组位置,返回的是null;

ConcurrentHashMap

构造方法

1、无参构造方法

// 真的什么都不做,只创建一个ConcurrentHashMap对象
public ConcurrentHashMap() {
}

2、1个参数的构造方法

public ConcurrentHashMap(int initialCapacity) {
    // 如果入参小于0,抛出异常
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    // 进行初始化容量
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
            MAXIMUM_CAPACITY :
            tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

3、2个参数的构造方法

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    // 调用3个参数的构造方法,并发级别默认为1
    this(initialCapacity, loadFactor, 1);
}

4、3个参数的构造方法

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 校验入参,小于0则抛出异常
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 初始容量小于并发级别
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    // 初始化容量
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

5、参数是Map的构造方法

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    // 默认容量16
    this.sizeCtl = DEFAULT_CAPACITY;
    // 调用putAll方法,初始化
    putAll(m);
}
/**
  * 将Map批量添加
  */
public void putAll(Map<? extends K, ? extends V> m) {
    // 初始化大小
    tryPresize(m.size());
    // 循环初始化Map
    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
        putVal(e.getKey(), e.getValue(), false);
}
put方法
/**
  * 添加元素方法
  **/
public V put(K key, V value) {
    // 调用put方法
    return putVal(key, value, false);
}
/**
  *  真正的put方法
  **/
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key或者value是null,抛出NPE异常
    if (key == null || value == null) throw new NullPointerException();
    // 计算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 循环
    for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
        ConcurrentHashMap.Node<K,V> f; int n, i, fh;
        // 如果数组是null,或者length是0,进行初始化操作
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 存入桶的位置为null
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 比较并替换(cas)操作,将元素放入桶中,如果成功,跳出循环
            if (casTabAt(tab, i, null,
                    new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 根据hash值判断是否正在进行扩容
        else if ((fh = f.hash) == MOVED)
            // 协助进行扩容,在扩容中详细注释
            tab = helpTransfer(tab, f);
        // 当前桶位置存在元素(hash冲突),链表形式添加元素
        else {
            V oldVal = null;
            // 对当前桶位进行加锁
            synchronized (f) {
                // 链表头节点
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 循环链表
                        for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // key完全相同,替换value值,并跳出
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            ConcurrentHashMap.Node<K,V> pred = e;
                            // next是null,代表链表末尾
                            if ((e = e.next) == null) {
                                // 添加到链表末尾
                                pred.next = new ConcurrentHashMap.Node<K,V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    // 如果是红黑树
                    else if (f instanceof ConcurrentHashMap.TreeBin) {
                        ConcurrentHashMap.Node<K,V> p;
                        binCount = 2;
                        if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 是否大于等于8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 转换红黑树,并非一定会转红黑树
                    treeifyBin(tab, i);
                // 返回value
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 统计节点个数,检察是否需要扩容操作
    addCount(1L, binCount);
    return null;
}
/**
 * 链表转红黑树方法
 */
private final void treeifyBin(ConcurrentHashMap.Node<K,V>[] tab, int index) {
    ConcurrentHashMap.Node<K,V> b; int n, sc;
    // 为空判断
    if (tab != null) {
        // 校验Node数组长度是否小于64
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 扩容操作
            tryPresize(n << 1);
        // 需要链表转红黑树的桶位置
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 该桶位加锁
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    ConcurrentHashMap.TreeNode<K,V> hd = null, tl = null;
                    // 循环链表,创建treeNode替换原来的Node
                    for (ConcurrentHashMap.Node<K,V> e = b; e != null; e = e.next) {
                        ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>(e.hash, e.key, e.val, null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new ConcurrentHashMap.TreeBin<K,V>(hd));
                }
            }
        }
    }
}
扩容

在上面的方法中会看到很多扩容的方法,比如:addCount()、helpTransfer()、tryPresize()等,有的是协助扩容,有的是本身进行扩容,他们的底层调用都是transfer()方法进行扩容操作;
transfer()方法有两个参数,tab(当前Node数组)和nextTab(扩容Node数组),当nextTab不为空时,会进行协助扩容;

/**
 * 扩容方法
 */
private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果nextTab为空
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 创建一个原来两倍长度的数组
            ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 循环开始元素移动
    for (int i = 0, bound = 0;;) {
        ConcurrentHashMap.Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有节点都完成移动,
            if (finishing) {
                // 清空扩容的nextTable
                nextTable = null;
                // table赋值扩容后的tab
                table = nextTab;
                // 扩容阈值变更
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 遍历到的节点是空,则放入ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 已处理过的位置
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 加锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    ConcurrentHashMap.Node<K,V> ln, hn;
                    // 链表扩容执行
                    if (fh >= 0) {
                        int runBit = fh & n;
                        // 最后一个节点,也就是反序链表
                        ConcurrentHashMap.Node<K,V> lastRun = f;
                        for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 红黑树的处理,过程与上面类似
                    else if (f instanceof ConcurrentHashMap.TreeBin) {
                        ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                        ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
                        ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}
总结

ConcurrentHashMap做为JUC包中应对并发操作的Map,在JDK1.8中使用了很多Unsafe中的CAS操作,有兴趣的同学可以再深入研究一下;下面我们来总结下可能遇到的面试问题;

1、ConcurrentHashMap也会转红黑树吗?
答:是的,从源码可以看到ConcurrentHashMap和HashMap一样,在链表长度达到8时不一定会触发红黑树操作,只有size大于64,且链表长度大于8时才会触发转红黑树的操作;

2、ConcurrentHahMap的扩容方法也叫resize方法吗?
答:不是的,我们看到ConcurrentHashMap采用的是协同扩容,如果检测到正在进行扩容,会加入到扩容操作中,不管是扩容还是协同扩容,他们调用的扩容方法都是transfer(),只不过入参不同进行的步骤就不一样;

3、协同扩容有操作上限吗?假设正在进行扩容,此时有100个put操作,这100个put都会加入到扩容操作中吗?
答:不是的,在transfer方法中我们可以看到,有下面一行判断

private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
... 后面省略

其中stride参数控制扩容线程数量,这个判断了机器cpu核数,来控制扩容加入扩线程的数量;

Redis hash

大家用过redis的都可以感受到,整个redis都是基于这种key-value的方式存储;在redis中基于dict方法构建的,我们先来看下数据结构;

数据结构
/*
 * 字典
 */
typedef struct dict {

    // 类型特定函数
    dictType *type;

    // 私有数据
    void *privdata;

    // 哈希表
    dictht ht[2];

    // rehash 索引
    // 当 rehash 不在进行时,值为 -1
    int rehashidx; /* rehashing not in progress if rehashidx == -1 */

    // 目前正在运行的安全迭代器的数量
    int iterators; /* number of iterators currently running */

} dict;
/*
 * 哈希表
 *
 * 每个字典都使用两个哈希表,从而实现渐进式 rehash 。
 */
typedef struct dictht {
    
    // 哈希表数组
    dictEntry **table;

    // 哈希表大小
    unsigned long size;
    
    // 哈希表大小掩码,用于计算索引值
    // 总是等于 size - 1
    unsigned long sizemask;

    // 该哈希表已有节点的数量
    unsigned long used;

} dictht;
/*
 * 哈希表节点
 */
typedef struct dictEntry {
    
    // 键
    void *key;

    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
    } v;

    // 指向下个哈希表节点,形成链表
    struct dictEntry *next;

} dictEntry;
初始化方法

在创建一个字典的时候,会调用初始化方法,初始化两个dictht哈希表;

/*
 * 创建一个新的字典
 *
 * T = O(1)
 */
dict *dictCreate(dictType *type,
        void *privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);

    return d;
}
/*
 * 初始化哈希表
 *
 * T = O(1)
 */
int _dictInit(dict *d, dictType *type,
        void *privDataPtr)
{
    // 初始化两个哈希表的各项属性值
    // 但暂时还不分配内存给哈希表数组
    _dictReset(&d->ht[0]);
    _dictReset(&d->ht[1]);

    // 设置类型特定函数
    d->type = type;

    // 设置私有数据
    d->privdata = privDataPtr;

    // 设置哈希表 rehash 状态
    d->rehashidx = -1;

    // 设置字典的安全迭代器数量
    d->iterators = 0;

    return DICT_OK;
}
/*
 * 重置(或初始化)给定哈希表的各项属性值
 *
 * p.s. 上面的英文注释已经过期
 *
 * T = O(1)
 */
static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}
新增操作
/*
 * 将给定的键值对添加到字典中,如果键已经存在,那么删除旧有的键值对。
 *
 * 如果键值对为全新添加,那么返回 1 。
 * 如果键值对是通过对原有的键值对更新得来的,那么返回 0 。
 *
 * T = O(N)
 */
int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, auxentry;

    // 尝试直接将键值对添加到字典
    // 如果键 key 不存在的话,添加会成功
    // T = O(N)
    if (dictAdd(d, key, val) == DICT_OK)
        return 1;

    // 运行到这里,说明键 key 已经存在,那么找出包含这个 key 的节点
    // T = O(1)
    entry = dictFind(d, key);

    // 先保存原有的值的指针
    auxentry = *entry;
    // 然后设置新的值
    // T = O(1)
    dictSetVal(d, entry, val);
    // 然后释放旧值
    // T = O(1)
    dictFreeVal(d, &auxentry);

    return 0;
}
/*
 * 尝试将给定键值对添加到字典中
 *
 * 只有给定键 key 不存在于字典时,添加操作才会成功
 *
 * 添加成功返回 DICT_OK ,失败返回 DICT_ERR
 *
 * 最坏 T = O(N) ,平滩 O(1) 
 */
int dictAdd(dict *d, void *key, void *val)
{
    // 尝试添加键到字典,并返回包含了这个键的新哈希节点
    // T = O(N)
    dictEntry *entry = dictAddRaw(d,key);

    // 键已存在,添加失败
    if (!entry) return DICT_ERR;

    // 键不存在,设置节点的值
    // T = O(1)
    dictSetVal(d, entry, val);

    // 添加成功
    return DICT_OK;
}
/*
 * 尝试将键插入到字典中
 *
 * 如果键已经在字典存在,那么返回 NULL
 *
 * 如果键不存在,那么程序创建新的哈希节点,
 * 将节点和键关联,并插入到字典,然后返回节点本身。
 *
 * T = O(N)
 */
dictEntry *dictAddRaw(dict *d, void *key)
{
    int index;
    dictEntry *entry;
    dictht *ht;

    // 如果条件允许的话,进行单步 rehash
    // T = O(1)
    if (dictIsRehashing(d)) _dictRehashStep(d);

    // 计算键在哈希表中的索引值
    // 如果值为 -1 ,那么表示键已经存在
    // T = O(N)
    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    // T = O(1)

    // 如果字典正在 rehash ,那么将新键添加到 1 号哈希表
    // 否则,将新键添加到 0 号哈希表
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    // 为新节点分配空间
    entry = zmalloc(sizeof(*entry));
    // 将新节点插入到链表表头
    entry->next = ht->table[index];
    ht->table[index] = entry;
    // 更新哈希表已使用节点数量
    ht->used++;

    // 设置新节点的键
    // T = O(1)
    dictSetKey(d, entry, key);

    return entry;
}
查找操作
/*
 * 获取包含给定键的节点的值
 *
 * 如果节点不为空,返回节点的值
 * 否则返回 NULL
 *
 * T = O(1)
 */
void *dictFetchValue(dict *d, const void *key) {
    dictEntry *he;

    // T = O(1)
    he = dictFind(d,key);

    return he ? dictGetVal(he) : NULL;
}
/*
 * 返回字典中包含键 key 的节点
 *
 * 找到返回节点,找不到返回 NULL
 *
 * T = O(1)
 */
dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    unsigned int h, idx, table;

    // 字典(的哈希表)为空
    if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */

    // 如果条件允许的话,进行单步 rehash
    if (dictIsRehashing(d)) _dictRehashStep(d);

    // 计算键的哈希值
    h = dictHashKey(d, key);
    // 在字典的哈希表中查找这个键
    // T = O(1)
    for (table = 0; table <= 1; table++) {

        // 计算索引值
        idx = h & d->ht[table].sizemask;

        // 遍历给定索引上的链表的所有节点,查找 key
        he = d->ht[table].table[idx];
        // T = O(1)
        while(he) {

            if (dictCompareKeys(d, key, he->key))
                return he;

            he = he->next;
        }

        // 如果程序遍历完 0 号哈希表,仍然没找到指定的键的节点
        // 那么程序会检查字典是否在进行 rehash ,
        // 然后才决定是直接返回 NULL ,还是继续查找 1 号哈希表
        if (!dictIsRehashing(d)) return NULL;
    }

    // 进行到这里时,说明两个哈希表都没找到
    return NULL;
}
删除操作
/*
 * 查找并删除包含给定键的节点
 *
 * 参数 nofree 决定是否调用键和值的释放函数
 * 0 表示调用,1 表示不调用
 *
 * 找到并成功删除返回 DICT_OK ,没找到则返回 DICT_ERR
 *
 * T = O(1)
 */
static int dictGenericDelete(dict *d, const void *key, int nofree)
{
    unsigned int h, idx;
    dictEntry *he, *prevHe;
    int table;

    // 字典(的哈希表)为空
    if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */

    // 进行单步 rehash ,T = O(1)
    if (dictIsRehashing(d)) _dictRehashStep(d);

    // 计算哈希值
    h = dictHashKey(d, key);

    // 遍历哈希表
    // T = O(1)
    for (table = 0; table <= 1; table++) {

        // 计算索引值 
        idx = h & d->ht[table].sizemask;
        // 指向该索引上的链表
        he = d->ht[table].table[idx];
        prevHe = NULL;
        // 遍历链表上的所有节点
        // T = O(1)
        while(he) {
        
            if (dictCompareKeys(d, key, he->key)) {
                // 超找目标节点

                /* Unlink the element from the list */
                // 从链表中删除
                if (prevHe)
                    prevHe->next = he->next;
                else
                    d->ht[table].table[idx] = he->next;

                // 释放调用键和值的释放函数?
                if (!nofree) {
                    dictFreeKey(d, he);
                    dictFreeVal(d, he);
                }
                
                // 释放节点本身
                zfree(he);

                // 更新已使用节点数量
                d->ht[table].used--;

                // 返回已找到信号
                return DICT_OK;
            }

            prevHe = he;
            he = he->next;
        }

        // 如果执行到这里,说明在 0 号哈希表中找不到给定键
        // 那么根据字典是否正在进行 rehash ,决定要不要查找 1 号哈希表
        if (!dictIsRehashing(d)) break;
    }

    // 没找到
    return DICT_ERR; /* not found */
}
扩容

扩容触发条件 — 在redis中触发扩容的条件有很多
1、每次新增dictEntry时都会调用_dictExpandIfNeeded()函数;
2、在往hash表中添加元素时也会调用到_dictExpandIfNeeded()函数;
在_dictExpandIfNeeded()函数中可以看到,如果ht[0]是0,会初始化出一个大小是4的哈希表;或者ht[0].used大于等于ht[0].size,会触发扩容函数;

/*
 * 创建一个新的哈希表,并根据字典的情况,选择以下其中一个动作来进行:
 *
 * 1) 如果字典的 0 号哈希表为空,那么将新哈希表设置为 0 号哈希表
 * 2) 如果字典的 0 号哈希表非空,那么将新哈希表设置为 1 号哈希表,
 *    并打开字典的 rehash 标识,使得程序可以开始对字典进行 rehash
 *
 * size 参数不够大,或者 rehash 已经在进行时,返回 DICT_ERR 。
 *
 * 成功创建 0 号哈希表,或者 1 号哈希表时,返回 DICT_OK 。
 *
 * T = O(N)
 */
int dictExpand(dict *d, unsigned long size)
{
    // 新哈希表
    dictht n; /* the new hash table */

    // 根据 size 参数,计算哈希表的大小
    // T = O(1)
    unsigned long realsize = _dictNextPower(size);

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    // 不能在字典正在 rehash 时进行
    // size 的值也不能小于 0 号哈希表的当前已使用节点
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    // 为哈希表分配空间,并将所有指针指向 NULL
    n.size = realsize;
    n.sizemask = realsize-1;
    // T = O(N)
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    // 如果 0 号哈希表为空,那么这是一次初始化:
    // 程序将新哈希表赋给 0 号哈希表的指针,然后字典就可以开始处理键值对了。
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    // 如果 0 号哈希表非空,那么这是一次 rehash :
    // 程序将新哈希表设置为 1 号哈希表,
    // 并将字典的 rehash 标识打开,让程序可以开始对字典进行 rehash
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;

    /* 顺带一提,上面的代码可以重构成以下形式:
    
    if (d->ht[0].table == NULL) {
        // 初始化
        d->ht[0] = n;
    } else {
        // rehash 
        d->ht[1] = n;
        d->rehashidx = 0;
    }

    return DICT_OK;

    */
}
/*
 * 计算第一个大于等于 size 的 2 的 N 次方,用作哈希表的值
 *
 * T = O(1)
 */
static unsigned long _dictNextPower(unsigned long size)
{
    unsigned long i = DICT_HT_INITIAL_SIZE;

    if (size >= LONG_MAX) return LONG_MAX;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }
}

扩容大小
在dictExpand()函数中会创建一个ht[1]做为扩容后的hash表,大小根据入参决定,没有入参或入参小于默认大小4,则创建一个默认值*2大小的哈希表;
渐进式扩容 — 元素迁移
以上的函数只是创建了一个新的hash表,但并没有直接执行元素迁移操作,真正的元素迁移是在dictRehash()函数中进行的;
渐进式扩容 — 执行时机
在新增、删除、查询和随机获取key(dictGetRandomKey函数)时会进行单步扩容,调用dictRehash()函数,传递参数1表示进行1次元素的迁移;
需要注意的是,当redis长时间没有增删改查操作进行时,也会定时触发元素迁移,是由redis的删除过期键的定时任务调用的,在指定时间内每次执行100的元素迁移;

/* 
 *
 * 执行 N 步渐进式 rehash 。
 *
 * 返回 1 表示仍有键需要从 0 号哈希表移动到 1 号哈希表,
 * 返回 0 则表示所有键都已经迁移完毕。
 *
 *
 * 注意,每步 rehash 都是以一个哈希表索引(桶)作为单位的,
 * 一个桶里可能会有多个节点,
 * 被 rehash 的桶里的所有节点都会被移动到新哈希表。
 *
 * T = O(N)
 */
int dictRehash(dict *d, int n) {

    // 只可以在 rehash 进行中时执行
    if (!dictIsRehashing(d)) return 0;

    // 进行 N 步迁移
    // T = O(N)
    while(n--) {
        dictEntry *de, *nextde;

        /* Check if we already rehashed the whole table... */
        // 如果 0 号哈希表为空,那么表示 rehash 执行完毕
        // T = O(1)
        if (d->ht[0].used == 0) {
            // 释放 0 号哈希表
            zfree(d->ht[0].table);
            // 将原来的 1 号哈希表设置为新的 0 号哈希表
            d->ht[0] = d->ht[1];
            // 重置旧的 1 号哈希表
            _dictReset(&d->ht[1]);
            // 关闭 rehash 标识
            d->rehashidx = -1;
            // 返回 0 ,向调用者表示 rehash 已经完成
            return 0;
        }

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        // 确保 rehashidx 没有越界
        assert(d->ht[0].size > (unsigned)d->rehashidx);

        // 略过数组中为空的索引,找到下一个非空索引
        while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;

        // 指向该索引的链表表头节点
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        // 将链表中的所有节点迁移到新哈希表
        // T = O(1)
        while(de) {
            unsigned int h;

            // 保存下个节点的指针
            nextde = de->next;

            /* Get the index in the new hash table */
            // 计算新哈希表的哈希值,以及节点插入的索引位置
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;

            // 插入节点到新哈希表
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;

            // 更新计数器
            d->ht[0].used--;
            d->ht[1].used++;

            // 继续处理下个节点
            de = nextde;
        }
        // 将刚迁移完的哈希表索引的指针设为空
        d->ht[0].table[d->rehashidx] = NULL;
        // 更新 rehash 索引
        d->rehashidx++;
    }

    return 1;
}
总结

dict结构在redis中是最常用到的结构,所以一定要看一遍源码,需要注意的地方,在上面也已经讲的很详细了,这里附一个有注解的redis3.0源码;
https://github.com/huangz1990/redis-3.0-annotated

上一篇下一篇

猜你喜欢

热点阅读