23. 并发终结之ReentrantReadWriteLock
ReentrantReadWriteLock是可重入锁,支持锁的降级(Downgrade),即一个线程如果持有写锁的情况下,可以继续持有读锁;不支持锁的升级,即持有读锁的情况下,要先释放读锁,然后再申请写锁。
与ReentrantLock类似,ReentrantReadWriteLock底层也是基于AQS实现,且同样提供了公平Fair和非公平NonFair的实现,所以我们分析源码主要分析tryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared()这些由ReentrantReadWriteLock提供的实现,而AQS提供的acquireQueued(), release(), doAcquireShared(), releaseShared()我们分别在ReentrantLock和Semaphore文章里分析过。
Sync(AQS)源码分析
在分析之前,先看下ReentrantReadWriteLock的相关数据结构。
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
Sync作为AQS的实现在这里提供了ryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared()这些重要实现。
abstract static class Sync extends AbstractQueuedSynchronizer {
/*
* 使用低16位表示Exclusive Lock(Write Lock)独占锁
* 高16位表示Shared Lock(Read Lock)共享锁
*/
static final int SHARED_SHIFT = 16;
//共享锁范围从2^16次方开始
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 独占锁和共享锁总个数一样都是 2^16-1 = 65535个
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//独占锁范围到2^16次方结束
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回共享锁个数 c<65535,则返回0,c=65536,则返回1*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回独占锁个数 比如 c=65535,则返回65535,c>65536,则返回0 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
* 一个共享锁计数器
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
//针对每个线程的共享锁计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* 返回当前线程所持有的共享锁数量
* 在共享锁数量降为0的时候移除.
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* 最后一个线程成功获得共享锁时共享锁的数量
*/
private transient HoldCounter cachedHoldCounter;
/**
* firstReader 是第一个获取到共享锁的线程
* firstReaderHoldCount 是第一个获取共享锁线程的持有共享锁数量。
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
//这两个方法是有Fair和NonFair分别提供的实现
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
最后这两个shouldBlock方法就是Fair和NonFair的区别所在
static final class NonfairSync extends Sync {
//非公平实现里,写锁的获取不需要判断Sync Queue是不是有在等待的节点
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* 非公平实现里,读锁的获取要看Sync Queue里head的后继节点是共享锁还是独占锁,是独占锁则需要等待。
*/
return apparentlyFirstQueuedIsExclusive();
}
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
//看head的后继节点是不是共享锁,不是共享锁则return false
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
//公平实现里,写锁和读锁的获取都需要排队,不允许插队。
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
tryAcquire分析
独占锁尝试获取锁流程:
1.如果读锁不为0或者写锁不为0且独占线程不是当前线程,则尝试获取锁失败。
2.如果锁数量饱和,也不能继续申请锁。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or
* queue policy allows it. If so, update state and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//c!=0,w=0表示有共享锁,则不能申请独占锁
//或者w!=0,表示有独占锁,如果持有线程不是当前线程(另一个线程正在写),则也不能申请。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果锁的数量饱和了,也不能申请锁
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire,能走到这就是同一个线程继续申请锁,重入,则直接更新state值
setState(c + acquires);
return true;
}
//writerShouldBlock就看是Fair还是NonFair的实现,然后CAS更新state,更新失败return false;
//否则更新成功,将当前线程设置为独占锁线程
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryAcquireShared分析
上面分析完tryAcquire申请独占锁的代码,现在分析下申请共享锁的代码tryAcquireShared。
- 如果写锁被其他线程持有,则获取共享锁失败。这里意味着如果当前线程持有写锁,那么也是可以申请读锁的,即锁降级。
2.如果tryAcquireShared没有检查可重入的部分,而是在readerShouldBlock允许读之后就CAS设置state的值,并且更新firstReaderHoldCount的值+1。
3.如果2里面CAS更新失败或者readerShouldBlock不允许又或者锁的数量饱和了导致2失败,那么就需要fullTryAcquireShared来进行循环尝试获取共享锁。
protected final boolean tryAcquireShared(int acquires) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for lock wrt state,
* so ask if it should block because of queue policy.
* If not, try to grant by CASing state and updating count
* Note that step does not check reentrant acquires, which postponed to full version
* to avoid having to check hold count in the more typical non-reentrant case.
* 3. If step 2 fails either because thread apparently not eligible or CAS fails or
* count saturated. chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//如果共享锁count不为0,且当前线程不是独占锁线程,则return -1
//这里如果当前线程持有写锁,那么是可以继续申请读锁的,即锁的降级。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//拿到共享锁个数
int r = sharedCount(c);
//如果readerShouldBlock不需要block,且共享锁个数不饱和,就进行CAS更新state
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//更新state成功,如果开始共享锁为0,则设置当前线程为firstReader和更新firstReaderHoldCount+1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
//如果共享锁不为0,且firstReader是当前线程,则firstReaderHoldCount+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//这里主要更新readHolds的值
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//如果上面readerShouldBlock为true,即要block,或者共享锁个数不饱和,又或者CAS更新失败,走下面方法
return fullTryAcquireShared(current);
}
// 死循环获取读锁。包含锁降级策略(代码逻辑与tryAcquireShared类似)
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
tryRelease分析
独占锁的释放流程tryRelease:
1.如果不是当前线程持有独占锁,则抛出IllegalMonitorStateException异常。
2.查看state释放之后的值是否为0,如果是0,则将独占锁线程设为null。
3.更新state的值。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
tryReleaseShared分析
共享锁释放流程:
1.判断firstReader是不是当前线程,如果是,就将firstReaderHoldCount - 1(如果等于1则将firstReader设为null)。
2.如果不是当前线程持有读锁,则从cachedHoldCounter拿到上一次获取读锁的HoldCounter。主要还是对count进行-1
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
//如果当前线程是第一次获取读锁的线程,更新firstReaderHoldCount的值
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
//如果firstReaderHoldCount = 1,将firstReader设为null
firstReader = null;
else
firstReaderHoldCount--;
} else {
//如果当前线程不是第一次获取共享锁的线程,
//则主要从ThreadLocalHoldCounter获取对应的HolderCounter,进行-1操作。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//for循环尝试CAS更新state
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
tryWriteLock分析
类似于ReentrantLock提供的tryLock()方法,写锁也提供tryWriteLock()方法,在尝试获取写锁失败,则直接返回false。
大体逻辑与tryAcquire()类似只是不需要判断writerShouldBlock。
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
//w==0表示是共享锁
//current != getExclusiveOwnerThread())表示不是当前线程持有写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryReadLock分析
读锁也提供tryReadLock()方法,在尝试获取读锁失败,则直接返回false。
大体逻辑同样与tryAcquireShared()方法类似。
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}