ConcurrentHashMap源码分析(一)
ConcurrentHashMap在面试中也是会被经常问道的问题,所以今天就来学习一下,看看ConcurrentHashMap的线程安全是如何实现的。需要注意的是JDK1.8和1.7版本的实现是不同的,我使用的是1.8版本。其实以前自己也看过一部分源码,但是其实理解的不是很透彻,我觉得有必要再仔细的看一看。另外看源码的时候看代码注释是很重要的,可以帮你更直观的了解作者的想法和意图,甚至一些很小的需要注意事项。不过就是英文看起来比较吃力,但是这个困难必须克服才行。
一、基本常量
private static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量,必须为2的次幂
private static final intDEFAULT_CAPACITY = 16;//table的默认大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 最大数组大小,不需要是2的次幂,在toArray和相关方法时需要
private static finalint DEFAULT_CONCURRENCY_LEVEL = 16;//默认并发级别,好像没什么用
private static final float LOAD_FACTOR = 0.75f;//负载因子
static final int TREEIFY_THRESHOLD = 8; // 链表转树的阀值,即如果这个值大等于8会从链表转成树,必须大于2
static final int UNTREEIFY_THRESHOLD = 6; //树转链表阀值,小于等于6,且小于TREEIFY_THRESHOLD。仅在扩容tranfer时才可能树转链表。
static final int MIN_TREEIFY_CAPACITY = 64;//树化的最小容量,为避免冲突最小应为TREEIFY_THRESHOLD的4倍。
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;//sizeCtl中用于生成戳记的位数。在32个bit数组中最小为6
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 帮助resize的最大线程数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //sizeCtl中记录size大小标记的偏移量
static final int MOVED = -1; //forwarding节点的哈希值
static final int TREEBIN = -2; //树根节点的哈希值
static final int RESERVED = -3; // hash for transient reservations(ReservationNode的hash值)
static final int HASH_BITS = 0x7fffffff; //正常节点哈希值可用位数
static final int NCPU = Runtime.getRuntime().availableProcessors(); // 可用CPU数量
有些常量看了注释也还是不太懂具体说明意思,等看源码时遇到再说吧,下面看看具体属性有哪些。
2、相关属性
transient volatile Node<K,V>[] table;//bins数组,大小为2次幂,第一次插入时才会初始化
private transient volatile Node<K,V>[] nextTable;//下一个table,仅在resize过程non-null
private transient volatile long baseCount;//基本计数器值,主要在没有争用时使用。通过CAS更新。
private transient volatile int sizeCtl;//table大小控制值,为负数时表示正在初始化或resize。-1表示初始化,-(N+1)表示有N个线程在执行resize。当table为null时保留初始表的大小,或者默认为0。初始化之后,保存下一个元素计数值,在该值上调整表的大小。这个值很重要。
private transient volatile int transferIndex;//resize时要分割的下一个表索引(加1)。
private transient volatile int cellsBusy;//resize以及创建CounterCell时使用的自旋锁(通过CAS)。
private transient volatile CounterCell[] counterCells;//counter cell表,non-null时,大小是2的次幂
// view
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
三、构造方法
// 空构造函数,创建新的、空的map,默认初始化table大小为16
public ConcurrentHashMap() { }
// 构造指定大小容量的map,实际大小并不是指定的容量,因为容量必须为2的次幂,因此需要执行tableSizeFor()方法计算容量。然后将计算出的容量(2的次幂)赋值给sizeCtl。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 根据给定的map构造新的ConcurrentHashMap
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//根据制定的初始化容量、负载因子构建map,
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//根据制定的初始化容量、负载因子和并发级别构建map,
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
可能会有人觉得好奇,感觉这些构造方法好像什么都没做,最多就是将容量赋值给sizeCtl。因为上面解释过,ConcurrentHashMap的初始化是在第一次插入时才会执行的,所以构造函数执行并不执行初始化操作,并且初始化时会使用保存在sizeCtl中的值。下面我们专门看一下初始化方法。
四、table初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
根据上面代码可以发现,初始化时如果table为null或者长度为0时表示table还没有初始化,然后根据sizeCtl判断有没有其他线程对其进行操作,上面说过如果sizeCtl为负数,表明table正在初始化或resize,那么线程让步,将CPU资源让给其他的线程。关于Thread.yield()方法建议看下相关资料,这里不细述。代码注释也说了,失去初始化竞争条件,仅仅自旋。
如果没有其他线程对table进行操作,那么调用unsafe的compareAndSwapInt方法,将内存中的sizeCtl更新为-1,表明正在对table进行初始化。然后再判断table是否为null或为空,是则根据原sizeCtl的值构建一个Node数组,并将其赋值给table,终止while循环,最后返回初始化table。
到这里大概了解ConcurrentHashMap的初始化是怎么样一个过程了,那么它到底如何保证线程安全的呢?接着往下看,我们主要看下它的增删该这个几个操作。
五、增加、修改操作
1、PUT操作
先上代码吧
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {//
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {//没有后续节点,新建节点,并赋值给next
pred.next = new Node<K,V>(hash, key,value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {// binCount == 0,表示的是链表的头结点
if (binCount >= TREEIFY_THRESHOLD)//链表转成树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put方法实际上调用的putVal方法,这个方法if-else分支挺多的,一点一点看吧。
进入方法之后先进行参数校验,有空参数则抛出异常信息。接下来是计算需要添加key的hash值,关于spread()方法可以看一下注释,知道计算的是hash值就可以了。
如果table为null或为空,则执行table的初始化方法。如果根据table的下标和当前key的hash值进行与运算,计算出当前key在table中的位置,从而获取存放的Node,如果Node为null,表示当前位置为空,那么根据key value以及key的hash值,构建一个Node,并存放到相应位置,结束循环。
这里说下tabAt和casTabAt这两个方法,这两个方法也都是unsafe类的方法,看下代码:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
tabAt方法实际调用的是getObjectVolatile方法,这个方法获取的是内存中的最新值,volatile关键字保证每次读取都是最新的数值,所以这个方法返回的是table某个位置上最新的Node值。
casTabAt方法调用的是compareAndSwapObject方法,其实就是CAS,即传入当前的table,以及偏移位置,期望值和更新值,这个用的地方也挺多的。
如果根据当前要添加的key拿到对应的Node以后,该Node的hash值等于MOVED(值为-1),表明这个节点是一个ForwardingNode节点,然后调用helpTransfer方法,即帮助进行扩容。ForwardingNode节点内部保存了一 nextTable 引用,它指向一张 hash 表。在进行扩容操作时,我们需要对每个桶中的结点进行分离和转移,如果某个桶结点中所有节点都已经迁移完成了(即已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。关于helpTransfer方法后面有时间再细述吧,现在先知道可以了。
以上条件不满足的时候,即表明要插入key value的位置是一个普通节点时,使用Java内置的synchronized关键字锁定该节点。如果返回该位置头节点的hash值大于等于0,表明这是一个普通节点。接下来就是一些比较,如果获取到节点的hash值等于要添加key的hash值,并且当前节点的key等于要添加的key,说明添加的位置就是当前节点的位置,直接更新节点的value值就行,然后结束循环。否则以添加的key、value、hash为参数新建Node节点,并将其赋值给其前一个节点的next属性,结束循环。如果返回该位置头节点的hash值小于0,如果该节点是一个treeBin实例,那么调用TreeBin的putTreeVal方法,向该树添加相应的值。binCount为添加key的链表长度,如果binCount大于TREEIFY_THRESHOLD,则将链表转成树,如果该节点的旧值不为空,说明这次的put操作是更新操作,直接返回旧值即可。否则需要继续向下执行addCount方法,这个方法通过CAS更新baseCount的值,并判断是否需要扩容,具体方法还是需要看下具体实现,这里暂时略过。
2、remove操作
remove操作实际调用的是replaceNode方法(replace也是调用的replaceNode),所以ConcurrentHashMap的删除和修改操作,只需要看replaceNode就行了。replaceNode方法需要三个参数,分别是原有的key、value以及要更新的value值。下面看下代码:
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)//修改操作
e.val = value;
else if (pred != null)//表示不是链表头结点
pred.next = e.next;
else
setTabAt(tab, i, e.next);//说明是头节点,用下一个节点替代头结点
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
同样第一步通过spread方法先计算key的hash值。如果当前map的table数组为空(null或者长度为0)或者根据当前key未找到相应的Node,返回null。反之如果找到了节点,但是该节点的hash属性等于MOVED(-1),表明这个节点是一个ForwardingNode,因此执行helpTransfer方法,基本和putVal一样。接下来同样是给节点加上锁(我看有的说是桶,我觉得一个意思吧),然后定位到链表上具体的节点,如果是更新,将更新值赋给原值即可,如果是删除则判断该结点是不是链表的头结点,不是头结点,则将当前节点的next赋值给前节点的next;是头结点则将调用setTabAt方法,用头结点的下个节点替换当前头结点。
如果根据key查到的链表为树,则通过findTreeNode根据key以及其hash定位到对应的节点,进行更新或者删除。
最后如果更新值为null,即说明书删除操作,执行addCount,修改baseCount的值,且不需要扩容。
到这里关于ConcurrentHashMap的添加、修改(删除算是一种特殊情况)就看完了,但是实际上这两个方法里面又涉及到了其他一些方法,比如helpTransfer,下面看看代码:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);//返回扩容大小为tab.length的table的标记位
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
这个方法先执行必须满足,当前ConcurrentHashMap不为空,且节点是一个ForwardingNode,并且当前节点的nextTable属性值也不为空,即表明这个节点所在的桶中的所有节点已经迁移到nextTable指向的节点数组。resizeStamp方法返回数组扩容的标记位。当table处于扩容状态时,校验数组扩容标记位的值,如果没通过校验或者table扩容已经完成了(关于这几个比较条件自己有点懵,还需要好好想想),直接退出循环。否则调用unsafe将扩容标记的值加1,然后调用transfer进行扩容操作,最后返回迁移后的Node数组。
这里又涉及到transfer方法,这个方法有点点复杂,只能下次再看了,而且自己看源码看得还是有点头晕(感觉画个结构图,然后看的话感觉效果会更好点)。
关于ConcurrentHashMap源码先到这里吧,感觉自己写的挺乱的。首先从它的一些属性和一些常量值开始,了解它们各自的含义,接着是构造方法和初始化方法,后面主要看了它的增加和修改节点元素的方法,了解它实现程程安全也是通过synchronized实现的,不过同步代码块需要的是节点对象锁,这样锁的粒度更细,从而有更好的并发性能。自己网上也看了一些别人写的文章,感觉不管思路也好,表达能力也好比自己好多了,推荐:
为并发而生的 ConcurrentHashMap(Java 8)
ConcurrentHashMap源码分析(JDK8版本)
我觉得学习源码,首先一定要对相关数据结构有很清晰明了的认识,比方说链表、树等,再一个我觉得最好可以自己动手画一个示意图出来,看源码时候结合示意图,这样能帮助我们更直观的学习。