ConcurrentHashMap(一)
类继承
ConcurrentHashMap继承AbstractMap,该抽象类定义了一些基本的操作,HashMap同样继承了这个类,其次还实现了ConcurrentMap,该接口继承的Map接口,除Map中的方法自己同样定义了一些操作。
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
类重要属性
//最大table容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
//数组可能得最大值,(非2次幂)需要与toArray和相关联方法
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,已经不用了,兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6
static final int UNTREEIFY_THRESHOLD = 6;
//树化链表的最小table容量
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容线程每次最少要迁移16个hash桶
private static final int MIN_TRANSFER_STRIDE = 16;
//用于生成当前数组对应的基数戳\生成sizeCtl所使用的bit位数
private static int RESIZE_STAMP_BITS = 16;
//最大扩容线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//sizeCt中记录大小的位移
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/**
* Node的hash字段编码
*/
static final int MOVED = -1; // hash for forwarding nodes forwarding Node的hash值
static final int TREEBIN = -2; // hash for roots of trees 树的根节点的hash值
static final int RESERVED = -3; // hash for transient reservations 暂时保留的hash值
//普通节点hash值的可用位数,此值是为了消除负hash 0x7fffffff int的最大值 2^31
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//Node数组,存储元素的数组 size为2的次幂,加了volatile关键字
transient volatile Node<K,V>[] table;
//默认为null,扩容时新生成的数组,其大小为原数组的两倍。
private transient volatile Node<K,V>[] nextTable;
/**
* 默认为0,用来控制table的初始化和扩容操作
* 负数代表正在进行初始化或扩容操作
* -1代表正在初始化
* -N 表示有N-1个线程正在进行扩容操作
* 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
private transient volatile int sizeCtl;
重要的内部类
Node
保存key,value及key的hash值的数据结构
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
...省略部分代码
}
TreeNode
TreeBins的节点,树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。
但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
...省略部分代码
TreeBin
在HashMap中桶中存放的是TreeNode,而实际的ConcurrentHashMap桶中,存放的是TreeBin对象。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;//树的根节点
volatile TreeNode<K,V> first;//第一个树节点
volatile Thread waiter;//等待线程
volatile int lockState;//锁状态
// values for lockState
static final int WRITER = 1; // set while holding write lock 持有写锁
static final int WAITER = 2; // set when waiting for write lock 等待写锁
static final int READER = 4; // increment value for setting read lock 设置读锁时增加
......省略部分代码
构造器
//空构造器
public ConcurrentHashMap() {
}
//初始化容量的构造器,需要保证
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//初始化容量
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 找出大于等于参数容量的2次幂数
//初始化sizeCtl
this.sizeCtl = cap;
}
//带有容器的构造类,将容器内元素全部put进map中
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
//这里加1是为了下面算cap,找出适合initialCapacity的2次幂的数值,赋值给sizeCtl
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
重要的常用方法
put(K key,V value):具体方法看putVal(K key, V value, boolean onlyIfAbsent)
public V put(K key, V value) {
return putVal(key, value, false);
}
putVal(K key, V value, boolean onlyIfAbsent):首先看JDK中源码是怎么实现的
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果key等于null或者value为null,抛出空指针异常
if (key == null || value == null) throw new NullPointerException();
//计算key的hash值
//1.调用object方法获取key的hash值
//2.调用spread()重新计算--降低hash冲突
int hash = spread(key.hashCode());
// bin计数变量
int binCount = 0;
//将公用变量table赋值给tab,开始死循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;//声明几个变量
if (tab == null || (n = tab.length) == 0)//如果tab为null或者tab长度为0,初始化tab
tab = initTable();//初始化table,然后进入下次循环
//如果tab不为null或者length不为0,进入下面else if 中
//1.首先利用hash值计算出数组的下标
//2.利用tab数组和下标值,找出对应下标的Node,详情看tabAt方法
//3.判断是否为null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果当前位置为null,没有被占用---此时有线程安全问题,可能有多个线程到这里
//所以这里通过CAS操作数组,无需加锁,各线程可见
//两种情况:
//1.操作成功:跳出循环
//2.操作失败:有其他线程修改了节点,重新进入下次循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
//操作成功
break; // no lock when adding to empty bin 添加到空箱时没有锁定
}
//走到这里说明,由hash值算出的下标,数组当前位置已经存在元素,下面还有其他判断
// 如果当前位置的元素的hash值为MOVED,即为-1,说明此节点为forwarding nodes
// 意味有其它线程正在扩容,则一起进行扩容操作,帮助扩容完成
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);//帮助扩容
else {
//走到这里,说明当前下标位置已经有节点,hash值冲突,也没有扩容操作,那么以链表或者红黑树的形式插入节点
V oldVal = null;//为旧节点的value声明一个临时变量
synchronized (f) { //给当前头节点加锁,不影响其他下标位置插入节点,相比segment锁粒度更小,提高并发
if (tabAt(tab, i) == f) {//这里需要再次判断下刚才获取到的头节点是否z这段时间内被其他线程修改,例如:被remove,
if (fh >= 0) {//当fh的值大于等于0时,说明是链表头节点
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//循环链表,binCount迭加
K ek;
//如果当前e所指向的节点与要插入的节点key相同,则根据onlyIfAbsent,覆盖value或者不变,然后退出循环
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//如果上面对比发现两者不同,那么将e赋值为当前节点的下个节点继续对比
//如果下个节点为null,则直接新建节点插入链表
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果fh.hash<0 说明此节点为树节点,则进入putTreeVal()方法中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//tab[i]的链表过长,转成红黑树或者扩容(tab.length过短,优先扩容)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
下面我们分析下这个方法,首先可以理清一下插入节点的逻辑:
1.计算key值的hash值
2.进入循环,判断table是否初始化,长度是否为0,如果没有则先初始化table,调用initTable(),然后进入下次循环
3.如果table已经初始化,根据hash值计算出插入的元素应该放到数组哪个位置,接着查看此位置是否已经有元素存在,这里查看的方法为tabAt(tab, i = (n - 1) & hash),如果没有值,那么填入元素,这里填入的方法为casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)),填入成功,跳出循环,失败则重新进入循环。
4.如果该下标位置已经有元素的话,那么还有两种情况,如果此处的元素的hash值为MOVED,即为-1,那么这代表此处的节点为forwarding nodes,可以理解为一个占位节点,这说明有线程正在进行扩容操作,那么当前线程可以帮助一起扩容
5.如果该下标位置的元素不为MOVED,那么此节点要么是链表头节点,要么是树的根节点,那么对应两种情况,分别插入即可。
其实ConcurrentHashMap中插入节点的逻辑和HashMap中大同小异,但是ConcurrentHashMap为了保证线程安全,在实现手法上和HashMap有很大变化,我们同样可以列出来看看:
- 在ConcurrentHashMap中table、sizeCtl、nextable等公用变量用volatile修饰,volatile可以保证线程之间变量的可见性,这应该是在多线程场景下的低配。在HashMap中是没有这样修饰的。
- 接着是初始化table,在HashMap中是没有初始化操作这一步骤的,而是扩容操作,这里暂且不讨论扩容操作,在put里面执行初始化,是可以并发执行的,那么怎么保证只初始化一次呢,在initTable()的实现中,sizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。当有线程执行了Unsafe.compareAndSwapInt方法(CAS)就会修改sizeCtl为-1,此时其他线程就会调用Thread.yield()让出CPU时间片等待table初始化完成,而且修改sizeCtl前后进行了两次table检查,这样就确保了只有一个线程可以初始化table,这样我们就安全的生成一个table了,剩下就是往里面放元素。
- 在检查数组下标是否有对应元素f时,使用了tabAt(tab, i = (n - 1) & hash)方法,在tabAt方法中使用了Unsafe.getObjectVolatile来获取,在Java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是被volatile修饰了,但是不能保证每次从table中取出最新元素(volatile的数组只针对数组的引用具有volatile的语义,而不是它的元素)(可参考并发编程网volatile是否能保证数组中元素的可见性?),所以Doug Lea采用Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
- 如果此时检查到没有头节点,调用casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)这个方法插入头节点,这里同样利用Unsafe.compareAndSwapObject(CAS)方法插入Node节点。(这里CAS成功,可以保证后来的线程在上一步查询的时候感知到;如果CAS失败,那么说明其他线程已经修改了,接着就自旋重新尝试插入节点)
- 如果检查到头节点已经有了节点,接着判断该节点的hash值,如果hash值等MOVED此静态int值,说明table正在扩容节点,那么此阶段插入节点并不是安全的,因为此时table状态并不稳定,那么此线程就帮助一起扩容,扩容完成后进入下次循环插入节点。
- 如果头节点的值不是MOVED,那么此时是可以进行插入操作,接着就是ConcurrentHashMap不同HashMap的区别了,在插入节点之前,首先会锁定当前下标位置的头节点,即synchronized (f),不同于jdk1.7用segment实现的分段锁,1.8的分段锁粒度更小,进一步减少并发冲突的概率,当其他线程来了,只能等待当前线程修改完毕才能进入代码块。接下来插入节点的操作因为都在同步快里,所以和HashMap大同小异了。
以上是ConcurrentHashMap的put操作过程,和HashMap已经有这么多的不同之处了,看来一篇文章是不能讲完了,其他方法就在分篇放在其他文章吧。文章中可能对CAS、Unsafe类以及更底层的设计没有写的很好的写出来,我也只是理解的表面些,希望大佬可以指教。
此文章的观点是自己思考以及从网上大佬的文章中汲取的,如有错误,请勿喷(Ծ‸Ծ),非常欢迎大家指出,我也是个菜鸟而已,谢谢大家!