ReadWriteLock
2、 ReadWriteLock 接口
ReentrantReadWriteLock是ReentrentLock接口实现之一,具有以下特性:
- 提供了非公平模式(默认)和公平模式。
- 支持重入。
- 支持锁降级。写锁可降级为读锁,但是读锁不可升级为写锁。
- 支持中断。读锁和写锁都支持锁获取期间的中断。
- 支持Condition。
ReadWriteLock提供了一对锁,读锁和写锁。其中读锁是共享锁,同一时间可以有多个线程访问;写锁是独占锁(排它锁),同一时间只能有一个线程访问。相对于可重入锁,读写锁通过读写分离、读锁可共享的方式进一步提高了并发性能。
读写锁简单示例:
/**
* 使用ReentrantReadWriteLock实现线程安全的HashMap
*/
public class ReadWriteLockDemo {
// 读写锁实例
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁
private Lock rLock = lock.readLock();
// 写锁
private Lock wLock = lock.writeLock();
// 共享变量
private Map<String, String> map = new HashMap<>();
// 写
public void put(String key, String value) {
wLock.lock();
try {
map.put(key, value);
} finally {
wLock.unlock();
}
}
// 读
public void get(String key) {
rLock.lock();
try {
map.get(key);
} finally {
rLock.unlock();
}
}
}
ReentrantReadWriteLock源码结构整体上与ReentrantLock相似,部分源码:
// 同步器
abstract static class Sync extends AbstractQueuedSynchronizer{}
// 非公平锁同步器
static final class NonfairSync extends Sync{}
// 公平锁同步器
static final class FairSync extends Sync{}
// 读锁
public static class ReadLock implements Lock, java.io.Serializable{}
// 写锁
public static class WriteLock implements Lock, java.io.Serializable{}
关于AbstractQueuedSynchronizer前文已有介绍,这里重点要介绍一下state变量。前文在分析ReentrantLock的时候介绍过state记录了同步状态。那么对于读写锁呢?如何用一个变量同时记录读状态和写状态呢?
答案是将state变量的高16位和低16位拆分,高16位记录读状态,低16位记录写状态即可。当然这种设计模式也决定了ReentrantReadWriteLock最多提供65535个重入写锁、65535个读锁。ReentrantReadWriteLock使用了位运算来对state变量进行加一、减一的操作。
state值操作示例:
// 获取写状态值
state & 0x0000FFFF
// 获取读状态值
state >>> 16
// 写状态值加1
state + 1
// 读状态值加1
state << 16
// 写状态值减1
state - 1
// 读状态值减1
state - (1 << 16)
2.1 锁降级
锁降级:对于一个线程T,先获取写锁,再获取读锁,最后释放写锁。
JDK官方示例:
/**
* ReentrantReadWriteLock类注释中的锁降级示例
*/
public class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
// 在获取写锁之前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
// 重新检查状态,因为其他线程可能在我们之前已经写锁并更改了状态。
if (!cacheValid) {
// 修改data(写操作)
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
// ① 通过在释放写锁之前获取读锁来降级
rwl.readLock().lock();
} finally {
// Unlock write, still hold read
// 释放写锁,持有读锁
rwl.writeLock().unlock();
}
}
try {
// 使用data(读操作)
use(data);
} finally {
// 释放降级读锁
rwl.readLock().unlock();
}
}
}
processCachedData方法通过cacheValid变量值兼顾了读锁和写锁的功能:若cacheValid为true,为读锁;若cacheValid为false为写锁(兼读锁)。
- 为什么要进行锁降级
可重入读写锁可以说是对可重入锁的一种改进,其目的之一就是为了提高并发性能。假如因为use(data)方法耗时较长而导致写锁不释放,那么其他的读锁将无法使用data。所以使用锁降级,也能够有效的提高并发性能。 - 为什么必须在释放写锁之前获取读锁(对应代码标记①)
- 假线程T1没有获取读锁,直接释放写锁,在调用use(data)方法前,线程T2获取了写锁并修改了data,随后T1开始调用use(data)方法,那么线程T1就会读取到脏数据,所以这一步是必须的,其目的就是为了解决数据可见性的问题。
- 如果线程T1先获取读锁,再释放写锁。那么线程T2将会在获取写锁时阻塞,待T1释放了读锁之后,T2才能获取写锁并操作data,从而保证了数据可见性。
接下来开始分析读写锁的源码,注意,以下代码若无特别声明,均为非公平模式。
2.2 读锁
结合上文的示例,开始分析读锁的加锁、解锁过程。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
- 调用tryAcquireShared尝试直接获取读锁。
- 若未能获取到读锁,调用doAcquireShared将当前现场构造成Node节点,入队阻塞,直至获取到读锁。
2.2.1 tryAcquireShared
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// ①
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// ②
int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// ③
return fullTryAcquireShared(current);
}
变量解释:
- firstReader,记录第一个获取读锁的线程。
- firstReaderHoldCount,记录第一个获取读锁的线程的重入次数。
- cachedHoldCounter,成功获取readLock的最后一个线程的持有计数。在通常情况下,下一个要释放的线程是最后一个要获取的线程,这样可以节省ThreadLocal查找。
- readHolds,当前线程持有的可重入读锁的数量。继承自ThreadLocal类。持有HoldCounter,记录了线程id、线程数。
tryAcquireShared分为3步:
- 持有写锁的线程非当前线程。返回-1,即失败。从这里也可以看出,同一个线程持有写锁的同时,可以再次获取读锁。
- 如果读锁不应被阻塞,且读锁未饱和,且成功更新了同步状态,接下来还要做三个判断:
- 读锁数为0,设置firstReader和firstReaderHoldCount,用来记录第一个获取读锁的线程及其重入次数。
- firstReader与当前线程相同,代表读锁重入,将firstReaderHoldCount加1。
- 上述两个条件均不满足,说明已经有其他线程获取读锁。则将当前线程缓存至readHolds(该变量为ThreadLocal类型)
- 上述两步都未满足,执行获取读锁的“全量版本”。
需要详细解释一下readerShouldBlock()方法:对于非公平锁,其目的是为了防止获取写锁的线程饥饿;而对于公平锁,其目的是为保证线程获取锁的公平性。
可以通过下面的代码来测试:
/**
* 测试readerShouldBlock()方法
*/
public class ReaderShouldBlock {
static ReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
// T1
new Thread(() -> {
rwl.writeLock().lock();
// ①
rwl.readLock().lock();
try {
} finally {
rwl.readLock().unlock();
rwl.writeLock().unlock();
}
}).start();
// T2
new Thread(() -> {
rwl.writeLock().lock();
try {
} finally {
rwl.writeLock().unlock();
}
}).start();
}
}
测试的时候,需要在代码①处和readerShouldBlock()方法同时断点,则readerShouldBlock()会返回true。对于非公平锁:判断AQS队列中是否有写锁等待获取线程;对于公平锁:判断AQS中是否有其他线程比当前线程更早的获取锁。
以全量版本获取读锁:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// ①
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
// ②
else if (readerShouldBlock()) {
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// ③
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// ④
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
fullTryAcquireShared通过”CAS“再次获取锁:
- 其他线程持有写锁,
- 当前线程非持有写锁的线程,返回-1,即失败。
- 当前线程是持有写锁的线程,阻塞,死锁。
- 需要阻塞读锁。这里阻塞的前提我认为应该是只发生在公平锁的情况下,不知道对不对。
- 读锁饱和,抛出”Maximum lock count exceeded“错误
- 获取锁成功。
2.2.2 doAcquireShared
如果tryAcquireShared方法未能获取到读锁,则通过doAcquireShared方法将当前线程构造成Node节点,入队并阻塞,忽略中断,直至获取到同步状态。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该过程与Lock接口的逻辑相似,且较简单,可参考前文。
2.3 写锁
// ReentrentReadWriteLock
public void lock() {
sync.acquire(1);
}
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
与Lock接口的实现类ReentrentLock一样,ReentrentReadWriteLock类也是先调用了AQS的acquire方法。但是具体的实现细节肯定是不尽相同的。
2.3.1 tryAcquire
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCou****nt(c);
// ①
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// ②
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
// ③
setExclusiveOwnerThread(current);
return true;
}
- 同步状态不为0,说明已经有线程获取到锁(可能是读锁,也可能是写锁)
- 写锁数为0,则说明其他线程持有读锁,失败;写锁数不为0但持有锁的线程非当前线程,失败。
- 写锁数饱和,抛出“Maximum lock count exceeded”错误。
- 上述条件均不满足,则说明当前写锁为重入锁,更新同步状态。
- 同步状态为0
- 获取写锁的线程是否应被阻塞,对于非公平锁来说,总是返回true。该方法是实现公平锁和非公平锁的关键。其依然是通过hasQueuedPredecessors()方法来判断,可参考前文Lock接口。
- CAS设置同步状态失败。失败。
- 获取同步状态成功。
2.3.2 acquireQueued
acquireQueued方法与ReentrentLock一致,参考前文即可。
2.4 其他
关于tryLock、lockInterruptibly等方法与前文重入锁分析相似,不再赘述。