ReentrantReadWriteLock代码浅析
介绍
除了重入锁ReentrantLock以外,Doug Lea大神还顺带实现了读写重入锁ReentrantReadWriteLock,依旧支持重入特性、公平与非公平模式,分出了读锁和写锁。
读锁
当读锁被持有后,会阻止其他线程获取写锁,但读锁不排斥其他线程持有读锁。
写锁
当写锁被持有后,既排斥其他线程申请读锁,还排斥其他线程申请写锁。
结构
具体的结构关系,直接上图:
流程
通过ReentrantReadWriteLock的readLock()获取读锁,writeLock()获取写锁,读锁和写锁都有lock()、lockInterruptibly()、tryLock()、tryLock(long timeout, TimeUnit unit),unlock()。
lock()
方法,遵循初始化时的公平锁或非公平锁模式请求锁,请求锁的过程不会被中断,直到持有锁为止。
lockInterruptibly()
方法,如果线程调用了interrupted(),请求锁的过程将会中断,并且抛出InterruptedException异常。如果线程没有被中断,那么会坚持到持有锁为止。
tryLock()
方法,无视初始化时公平与非公平模式,直接尝试一次请求锁的CAS操作,如果成功,返回true,失败,返回false。
tryLock(long timeout, TimeUnit unit)
方法,类似lockInterruptibly(),但是比lockInterruptibly()多了超时检查操作,当超时时,会中断请求锁过程。
unlock()
方法,释放锁对象当前持有的锁一次。
申请读锁
因为请求写锁锁的流程与普通重入锁大同小异,所以这里只展示读锁的lock()方法的请求流程,lock()
方法中通过sync调用acquireShared()
方法,acquireShared()
方法的实现在AbstractQueuedSynchronizer中:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
所有读锁会进行一次tryAcquireShared(int arg)
和doAcquireShared(int arg)
。获取写锁则调用acquire(int arg)
方法,方法中if条件中调用tryAcquire(arg)
和 acquireQueued(final Node node, int arg)
,与读锁获取的流程有些小差别。
回到正题,tryAcquireShared(int arg)
方法被Sync实现,再看ReentrantReadWriteLock.Sync中:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
这个方法中有两个比较重要的流程,都会在特定条件下返回,先看看进入这两个流程的if条件。
第一个if条件,exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
,先检查写锁重入数,如果不为0,再检查当前线程是否已经持有写锁,如果未持有写锁则返回-1表示失败,那么可以看出,同一个线程是可以同时持有写锁和读锁的。
第二个if条件,!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)
首先如果是公平锁,那么在readerShouldBlock()中会进入阻塞队列等待,如果非公平锁,就调用apparentlyFirstQueuedIsExclusive()
方法检查队列第一个节点是否在等待写锁,如果不是,再来检查读锁持有是否超过最大值,如果未超过,再来CAS修改读锁持有数量,CAS操作成功之后,这个if流程中会修改部分引用和缓存信息,这个具体作用后面再说。
在公平锁模式阻塞重新被唤醒后或者已有其他线程持有读锁,又或者读锁当前持有数现在已经到了MAX_COUNT时,都会跳过第二个if流程直接进入fullTryAcquireShared(Thread current)
方法中:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
//这里注释写了一种造成读写锁死锁的情况
//当前线程持有写锁,而队列中有其他线程正在等待写锁释放时,这个时候再来请求读锁时,就会导致死锁
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
//省略部分代码
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
//省略部分代码
}
return 1;
}
}
}
fullTryAcquireShared(Thread current)
方法中的请求锁的过程其实和tryAcquireShared(int unused)
中的过程十分相似,最重要的还是CAS操作的这个方法。只有compareAndSetState()
成功之后才会结束fullTryAcquireShared(Thread current)
方法中for循环,结束这次读锁请求操作。
exclusiveCount(int c)与sharedCount(int c)
在fullTryAcquireShared(Thread current)
方法与tryAcquireShared(int unused)
方法中中,变量c,就是持有锁的个数,通过getState()获取到值后,先exclusiveCount(int c),再经过sharedCount(int c)后赋值给r,找到和它们有关的代码:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);// 65536
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;// 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 65535
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
exclusiveCount(int c)操作会清除整型变量c高16位的所有数值,保留低16位数值。
sharedCount(int c) 操作清除整型变量c低16位,保留高16位数值。
所以r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)
就是在验证写锁持有数量,当c = 0
时c + SHARED_UNIT = 65536
,经过sharedCount(int c)
计算后得1,c + SHARED_UNIT
相当于对 c = (c >>> 16 + 1) << 16,对c高16位数值加1。
也就是说state值的高位保存着读锁数量,低位保存写锁数量,读锁与读锁最大持有数都是65535。
关于读写锁等待队列
不管是公平锁还是非公平锁,未能在其他线程释放锁时的特殊时刻正好获取到锁,都会进入等待锁队列,而这个队列被读锁与写锁共用,具体逻辑很多与重入锁高度重合,有兴趣可以看我写的这篇博客ReentrantLock源码通读(一)。
关于HoldCounter
HoldCounter遍布在所有读锁的请求和释放过程当中,并且为了缓存这个读锁持有数,代码占据了这些流程代码量大概两成左右,所以HoldCounter的重要性不言而喻,那么具体是什么作用呢?
在ReentrantReadWriteLock中对外提供了一个方法:
/**
* Queries the number of reentrant read holds on this lock by the
* current thread. A reader thread has a hold on a lock for
* each lock action that is not matched by an unlock action.
*
* @return the number of holds on the read lock by the current thread,
* or zero if the read lock is not held by the current thread
* @since 1.6
*/
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
//Sync中getReadHoldCount()
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
注释翻译过来就是,查询当前线程重入读锁的次数。不是可以通过Sync中的state全局变量获取重入次数吗,读锁与写锁是通过int的高低位来区分的,十分方便,为什么不能用这个state来获取读锁的重入次数呢?
事实上写锁的确是利用state的来获取写锁的重入次数的,但是写锁是一个线程独占的,它会排斥其他线程获取写锁以及读锁。
读锁,因为其可被共享的特殊性质,导致state中高位存储的是多个线程重入后的总数,所以state无法用来查询当前线程重入读锁的次数,当前线程无法知晓自己重入次数,这在某些复杂场景或者使用不规范的情况下,可能造成读锁无法被正常释放,而且排查起来也异常困难。
所以HoldCounter出现了,与HoldCounter配合当然少不了ThreadLocal,ReentrantReadWriteLock.Sync中包含有HoldCounter静态内部类和ThreadLocalHoldCounter
静态内部类。
HoldCounter
记录了重入计数与线程的tid。
ThreadLocalHoldCounter继承ThreadLocal,重载了initialValue()
方法。
并且为了更快的获取重入数,还额外赠送了两个全局变量,cachedHoldCounter和firstReaderHoldCount。
cachedHoldCounter
保存上个请求读锁线程的HoldCounter。
firstReaderHoldCount
保存第一个获取读锁线程的重入数。
所以Sync实现的getReadHoldCount()
方法根据return,分为四个分支,通过各种全局缓存变量,尽量高效的返回结果。
第一个分支,读取state的高位,如果为0返回。
第二个分支,检查当前线程是否等于firstReader
,如果是,返回firstReaderHoldCount
。
第三个分支,检查cachedHoldCounter中的tid是否等于当前线程的tid,如果是,返回cachedHoldCounter.count
。
第四个分支,就是遍历ThreadLocal中的threadMap找到当前线程的HoldCounter返回count
。
思考
一个线程是否可以同时获取到写锁与读锁?
肯定可以,不然也不会在注释里说明会产生死锁的场景了。
根据写锁的tryAcquire(int arg)
源码:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
先判断了state != 0
,再判断w==0 || current != getExclusiveOwnerThread()
直接堵死线程获取读锁在获取写锁的可能性。
所以能同时获取读锁和写锁的操作只能是先获取写锁,然后获取读锁。
如何能成功同时获取读锁和写锁呢?
在非公平模式下,线程持有写锁后,去请求读锁,而此时等待线程队列中第一个节点不是请求写锁的线程,即可成功获取读锁与写锁,但如果是请求写锁的线程,会返回失败状态值-1,之后进入等待线程队列,线程在持有写锁的情况下休眠并且等待通知被唤醒,由于唤醒超时线程的触发点实在有线程释放锁之后,所以这时无论是否设置过超时参数,都是无效的,依然会导致死锁,所以在使用时需要格外注意。
总结
读锁可以与其他线程共享,但排斥其他线程获取写锁。
写锁排斥其他线程获取锁,当前持有写锁后,仍然可以去获取读锁,完全独占,但是写锁竞争激烈时,完全独占的操作也很容易导致死锁。
读写锁的持有或重入上限均是65535。
当前线程重入计数,读写锁是分开保存的,state中保存的是读锁重入的总数,写锁不受影响。
当前线程读锁重入次数是使用ThreadLocal保存的,每个线程单独维护一个HoldCounter。使用的三个全局变量缓存特殊状态的线程重入计数变量,尽量快的返回结果。