Java源码分析

ConcurrentHashMap(一)

2019-03-06  本文已影响0人  DevilN

类继承

ConcurrentHashMap继承AbstractMap,该抽象类定义了一些基本的操作,HashMap同样继承了这个类,其次还实现了ConcurrentMap,该接口继承的Map接口,除Map中的方法自己同样定义了一些操作。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

类重要属性

//最大table容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
//数组可能得最大值,(非2次幂)需要与toArray和相关联方法
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,已经不用了,兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6
static final int UNTREEIFY_THRESHOLD = 6;
//树化链表的最小table容量
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容线程每次最少要迁移16个hash桶
private static final int MIN_TRANSFER_STRIDE = 16;
//用于生成当前数组对应的基数戳\生成sizeCtl所使用的bit位数
private static int RESIZE_STAMP_BITS = 16;
//最大扩容线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//sizeCt中记录大小的位移
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/**
* Node的hash字段编码
*/
static final int MOVED     = -1; // hash for forwarding nodes forwarding Node的hash值
static final int TREEBIN   = -2; // hash for roots of trees 树的根节点的hash值
static final int RESERVED  = -3; // hash for transient reservations 暂时保留的hash值
//普通节点hash值的可用位数,此值是为了消除负hash  0x7fffffff int的最大值 2^31
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//Node数组,存储元素的数组 size为2的次幂,加了volatile关键字
transient volatile Node<K,V>[] table;
//默认为null,扩容时新生成的数组,其大小为原数组的两倍。
private transient volatile Node<K,V>[] nextTable;
/**
 * 默认为0,用来控制table的初始化和扩容操作
 * 负数代表正在进行初始化或扩容操作
 *      -1代表正在初始化
 *      -N 表示有N-1个线程正在进行扩容操作
 *      正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
private transient volatile int sizeCtl;

重要的内部类

Node

保存key,value及key的hash值的数据结构

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    ...省略部分代码
}

TreeNode

TreeBins的节点,树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。
但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
    ...省略部分代码

TreeBin

在HashMap中桶中存放的是TreeNode,而实际的ConcurrentHashMap桶中,存放的是TreeBin对象。

static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;//树的根节点
    volatile TreeNode<K,V> first;//第一个树节点
    volatile Thread waiter;//等待线程
    volatile int lockState;//锁状态
    // values for lockState
    static final int WRITER = 1; // set while holding write lock 持有写锁
    static final int WAITER = 2; // set when waiting for write lock 等待写锁
    static final int READER = 4; // increment value for setting read lock 设置读锁时增加
    ......省略部分代码

构造器

//空构造器
public ConcurrentHashMap() {
}
//初始化容量的构造器,需要保证
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}
//初始化容量
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
                // 找出大于等于参数容量的2次幂数
    //初始化sizeCtl
    this.sizeCtl = cap;
}
//带有容器的构造类,将容器内元素全部put进map中
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
//
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    //这里加1是为了下面算cap,找出适合initialCapacity的2次幂的数值,赋值给sizeCtl
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

重要的常用方法

put(K key,V value):具体方法看putVal(K key, V value, boolean onlyIfAbsent)

public V put(K key, V value) {
    return putVal(key, value, false);
}

putVal(K key, V value, boolean onlyIfAbsent):首先看JDK中源码是怎么实现的

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //如果key等于null或者value为null,抛出空指针异常
    if (key == null || value == null) throw new NullPointerException();
    //计算key的hash值
    //1.调用object方法获取key的hash值
    //2.调用spread()重新计算--降低hash冲突
    int hash = spread(key.hashCode());
    // bin计数变量
    int binCount = 0;
    //将公用变量table赋值给tab,开始死循环
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;//声明几个变量
        if (tab == null || (n = tab.length) == 0)//如果tab为null或者tab长度为0,初始化tab
            tab = initTable();//初始化table,然后进入下次循环
        //如果tab不为null或者length不为0,进入下面else if 中
        //1.首先利用hash值计算出数组的下标
        //2.利用tab数组和下标值,找出对应下标的Node,详情看tabAt方法
        //3.判断是否为null
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果当前位置为null,没有被占用---此时有线程安全问题,可能有多个线程到这里
            //所以这里通过CAS操作数组,无需加锁,各线程可见
            //两种情况:
            //1.操作成功:跳出循环
            //2.操作失败:有其他线程修改了节点,重新进入下次循环
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                //操作成功
                break;                   // no lock when adding to empty bin 添加到空箱时没有锁定
        }
        //走到这里说明,由hash值算出的下标,数组当前位置已经存在元素,下面还有其他判断
        // 如果当前位置的元素的hash值为MOVED,即为-1,说明此节点为forwarding nodes
        // 意味有其它线程正在扩容,则一起进行扩容操作,帮助扩容完成
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);//帮助扩容
        else {
            //走到这里,说明当前下标位置已经有节点,hash值冲突,也没有扩容操作,那么以链表或者红黑树的形式插入节点
            V oldVal = null;//为旧节点的value声明一个临时变量
            synchronized (f) { //给当前头节点加锁,不影响其他下标位置插入节点,相比segment锁粒度更小,提高并发
                if (tabAt(tab, i) == f) {//这里需要再次判断下刚才获取到的头节点是否z这段时间内被其他线程修改,例如:被remove,
                    if (fh >= 0) {//当fh的值大于等于0时,说明是链表头节点
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {//循环链表,binCount迭加
                            K ek;
                            //如果当前e所指向的节点与要插入的节点key相同,则根据onlyIfAbsent,覆盖value或者不变,然后退出循环
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            //如果上面对比发现两者不同,那么将e赋值为当前节点的下个节点继续对比
                            //如果下个节点为null,则直接新建节点插入链表
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果fh.hash<0 说明此节点为树节点,则进入putTreeVal()方法中
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //tab[i]的链表过长,转成红黑树或者扩容(tab.length过短,优先扩容)
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

下面我们分析下这个方法,首先可以理清一下插入节点的逻辑:

1.计算key值的hash值

2.进入循环,判断table是否初始化,长度是否为0,如果没有则先初始化table,调用initTable(),然后进入下次循环

3.如果table已经初始化,根据hash值计算出插入的元素应该放到数组哪个位置,接着查看此位置是否已经有元素存在,这里查看的方法为tabAt(tab, i = (n - 1) & hash),如果没有值,那么填入元素,这里填入的方法为casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)),填入成功,跳出循环,失败则重新进入循环。

4.如果该下标位置已经有元素的话,那么还有两种情况,如果此处的元素的hash值为MOVED,即为-1,那么这代表此处的节点为forwarding nodes,可以理解为一个占位节点,这说明有线程正在进行扩容操作,那么当前线程可以帮助一起扩容

5.如果该下标位置的元素不为MOVED,那么此节点要么是链表头节点,要么是树的根节点,那么对应两种情况,分别插入即可。

其实ConcurrentHashMap中插入节点的逻辑和HashMap中大同小异,但是ConcurrentHashMap为了保证线程安全,在实现手法上和HashMap有很大变化,我们同样可以列出来看看:

以上是ConcurrentHashMap的put操作过程,和HashMap已经有这么多的不同之处了,看来一篇文章是不能讲完了,其他方法就在分篇放在其他文章吧。文章中可能对CAS、Unsafe类以及更底层的设计没有写的很好的写出来,我也只是理解的表面些,希望大佬可以指教。

此文章的观点是自己思考以及从网上大佬的文章中汲取的,如有错误,请勿喷(Ծ‸Ծ),非常欢迎大家指出,我也是个菜鸟而已,谢谢大家!

上一篇下一篇

猜你喜欢

热点阅读