转:ConcurrentHashMap JDK 7 源码分析
参考文章:http://www.jianshu.com/p/bd972088a494
数据结构
image构造函数
put 的实现
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
//定位Segment,让Hash右移动segmentShift位,默认情况下就是28位(总长32位),之后和segmentMask(0XFF)做与操作,得到段的索引
int j = (hash >>> segmentShift) & segmentMask;
//利用UNSAFE.getObject中的方法获取到目标的Segment。
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//如果没有取到目标Segment,所以需要保证能取到这个Segment,没有的话创建一个Segment
s = ensureSegment(j);
//代理到Segment的put方法
return s.put(key, hash, value, false);
}
首先是(Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE))
, UNSAFE这种用法是在JDK1.6中没有的,主要是利用Native方法来快速的定位元素。
还有一个点是:ensureSegment()
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//getObjectVolatile是以Volatile的方式获得目标的Segment,Volatile是为了保证可见性。
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//如果没有取到,那么证明指定的Segment不存在,那么需要新建Segment,方式是以ss[0]为镜像创建。
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次检查
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);//创建新Segment
//以CAS的方式,将新建的Segment,set到指定的位置。
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
上面的代码就是保证,在put之前,要保证目标的Segment是存在的,不存在需要创建一个Segment。
put方法代理到了Segment的put方法,Segment extends 了ReentrantLock,以至于它能当做一个Lock使用。那么我们看一下Segment的put的实现:
进入Segment的put操作时先进行加锁保护。如果加锁没有成功,调用scanAndLockForPut方法(详细步骤见下面scanAndLockForPut()源码分析)进入自旋状态,该方法持续查找key对应的节点链中是已存在该机节点,如果没有找到,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入加锁等待状态,自旋结束并返回节点(如果返回了一个非空节点,则表示在链表中没有找到相应的节点)。对最大尝试次数,目前的实现单核次数为1,多核为64。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//因为put操作会改变整体的结构,所以需要保证段的线程安全性,所以首先tryLock
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//新建tab引用,避免直接引用Volatile导致性能损耗,
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
//Volatile读,保证可见性
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//遍历HashEntry数组,寻找可替换的HashEntry
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果不存在可替换的HashEntry,如果在scanAndLockForPut中建立了此Node直接SetNext,追加到链表头
if (node != null)
node.setNext(first);
else
//如果没有则新建一个Node,添加到链表头
node = new HashEntry<K,V>(hash, key, value, first);
//容量计数+1
int c = count + 1;
//如果容量不足,那么扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//以Volatile写的方式,替换tab[index]的引用
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
put方法是做了加锁操作的,所以不用过多的考虑线程安全的问题,但是get操作为了保证性能是没有加锁的,所以需要尽量的保证数据的可见性,能让get得到最新的数据。
让我们看看 scanAndLockForPut(key, hash, value) 在做什么:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
当put操作尝试加锁没成功时,它不是直接进入等待状态,而是调用了scanAndLockForPut()操作,该操作持续查找key对应的节点链中是已存在该机节点,如果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入等待状态,计所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64。
在这段逻辑中,它先获取key对应的节点链的头,然后持续遍历该链,如果节点链中不存在要插入的节点,则预创建一个节点,否则retries值资增,直到操作最大尝试次数而进入等待状态。这里需要注意最后一个else if中的逻辑:当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。
为什么要这么做呢?为了事先做数据的缓存,让这些数据缓存在CPU的cache中,这样后续在使用时能避免Cache missing。ps:scanAndLockForPut有个孪生兄弟scanAndLock,作用都差不多。
和 JDK 1.6 的实现的不同:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
JDK 1.6 的实现和 JDK 1.7 的实现比较相似,但是主要区别是,没有使用一些 UNSAFE 的方法去保证内存的可见性,而是通过一个Volatile变量——count去实现。在开始的时候读count保证lock的内存语意,最后写count实现unlock的内存语意。
但是这里存在一个问题,new HashEntry操作存在重排序问题,导致在getValue的时候tab[index]不为null,但是value为null。所以在 get 的时候会有一步重新检查的步骤,如果 value 为 null 的话就把 segment 段加锁在重新 get 一次。具体原因后面详细说明。
get 的实现
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
可以看出来,get方法很简单,同时get是没有加锁的,那么get是如何保证可见性的呢?首先获取指定index的Segment,利用getObjectVolatile获取指定index的first HashEntry,之后遍历HashEntry链表,这里比较关键的是HashEntry的数据结构:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
两个变量是volatile的,也就是说,两个变量的读写能保证数据的可见性。
所以在变量HashEntry时,总能保证得到最新的值。
JKD1.6的get方法的实现:
V get(Object key, int hash) {
if (count != 0) { // read-volatile 当前桶的数据个数是否为0
HashEntry<K,V> e = getFirst(hash); 得到头节点
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
首先是读取count变量,因为内存的可见性,总是能返回最新的结构,但是对于getFirst可能得到的是过时的HashEntry。接下来获取到HashEntry之后getValue。但是这里为什么要做一个value的判空,原因就是上一步put的重排序问题,如果为null,那么只能加锁,加锁之后进行重新读取。但是这样确实会带来一些开销。
为什么 JDK 1.6 的实现是弱一致性的?
这里比较重要的一点就是,为什么JDK1.6的是弱一致性的?因为JDK1.6的所有可见性都是以count实现的,当put和get并发时,get可能获取不到最新的结果,这就是JDK1.6中ConcurrentHashMap弱一致性问题,主要问题是 tab[index] = new HashEntry<K,V>(key, hash, first, value);
不一定 happened before getFirst(hash);
如下图可以说明:
执行put的线程 | 执行get的线程 |
---|---|
⑧tab[index] = new HashEntry<K,V>(key, hash, first, value) | |
③if (count != 0) | |
②count = c | |
⑨HashEntry e = getFirst(hash); |
对volatile的count的读时间上发生在对count的写之前,我们无法得出② hb ⑨这层关系了。因此,通过count变量,在这个轨迹上是无法得出⑧ hb ⑨的。
而JDK1.7的实现,对于每一个操作都是Volatile变量的操作,能保证线程之间的可见性,所以不存在弱一致性的问题。(对这句话持有保留观点)
remove 的实现(具体实现以及和 1.6 对比要补上)
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
这里的 remove 实现我在公司里看到一篇文章,但是记不清在哪里看到的了。大致上说的是,remove 操作的时候不需要向 JDK 1.6 的那样,将要删除的节点赋值一遍,然后删除对应的元素, 再将最后一个元素的 next 元素连接上去。 而是用了 CAS 的操作实现的删除。
size 的实现
public int size() {
// 主要的方法是先遍历 2 次 Segment,如果两次遍历的结果得到的 size 相同
//那么就认为 size 的结果是准确的,否则就要对每一个 Segment 加锁重新计算 size 的大小
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
主要的方法是先遍历 2 次 Segment,如果两次遍历的结果得到的 size 相同,那么就认为 size 的结果是准确的,否则就要对每一个 Segment 加锁重新计算 size 的大小。获取锁的操作在这里
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
JDK 6 和 7 的区别
总体来说变化挺多,不过总体的结构并没有发生改变,还是采用了 segment 分断锁的实现。 1.6 主要用 count 的 volatile 语义来实现put、get。1.7 主要使用 unsafe 的 CAS 操作来实现。