深入理解ReentrantReadWriteLock
为什么会有ReentrantReadWriteLock?
我们了解到synchronized和ReentrantLock都是独占式锁
。也就是同一时刻只有一个线程获取到锁。但是在一些业务场景中,大部分是读数据,写数据很少,如果仅仅是读数据的话并不会影响数据正确性,如果在读数据很频繁情况下,依然使用独占式锁
,会大大降低性能。所以在这种读多写少的情况下,Java提供另一个lock接口的实现子类ReentrantReadWriteLock
(可重入读写锁)。
读写锁是允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程与其他的写线程均会阻塞。
那么写线程获取到锁 的前提是:没有任何读写线程获取到锁。
同一时刻可以有多个读线程获取到锁,但并不意味着读锁是无锁,比如当写线程获取到锁,读线程不可以再获取到锁,如果是无锁,读线程可以直接工作,这显然是错误的。
一个例子理解读写锁:
比如说在办黑板报时:小米很开心的办黑板报,但是由于她很害羞,不希望在创作的过程被打扰,也就是她在黑板上写的时候,不能有其他同学在黑板上写,更不能有同学欣赏她的作品,这个过程相当于写锁;而当小米创作结束后,大家都来欣赏漂亮的板报,在大家欣赏的时候,小米没有创作,这个过程相当于读锁。
public interface ReadWriteLock {
/**
* 返回读锁
*/
Lock readLock();
/**
* 返回写锁
*/
Lock writeLock();
}
写锁的获取
写锁的lock方法如下:
public void lock() {
sync.acquire(1);
}
AQS的acquire方法如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
从源码可以看见,写锁使用的是AQS的独占模式
。首先尝试首先尝试获取锁,如果获取失败,那么将会把该线程加入到等待队列中。 (同之前ReentrantLock)
Sync实现了tryAcquire
方法用于尝试获取锁,源码如下:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
//得到调用lock方法的当前线程
Thread current = Thread.currentThread();
int c = getState();
//得到写锁的个数
int w = exclusiveCount(c);
//如果当前有写锁或者读锁
if (c != 0) {
// 如果同步状态不为0,但是写锁为0或者当前线程不是独占线程(不符合重入),返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果写锁的个数超过了最大值,抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入,返回true
setState(c + acquires);
return true;
}
//如果当前没有写锁或者读锁,如果写线程应该阻塞或者CAS失败,返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//否则将当前线程置为获得写锁的线程,返回true
setExclusiveOwnerThread(current);
return true;
}
写锁获取源码中需要注意一个方法:exclusiveCount( c ),源码如下:
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
其 中 EXCLUSIVE_MASK 为 : static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; EXCLUSIVE
_MASK为1左移16位然后减1,即为0x0000FFFF。而exclusiveCount方法是将同步状态(state为int类型)与
0x0000FFFF相与,即取同步状态的低16位。因为同步状态的低16位用来表示写锁的获取次数。
而对于读锁来说,同步状态的高16位用来表示读锁的获取次数。
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT;
}
该方法是获取读锁被获取的次数,是将同步状态(int c)右移16次,即取同步状态的高16位,现在我们可以得出另外一个结论同步状态的高16位用来表示读锁被获取的次数。
writerShouldBlock( )方法,FairSync的实现是如果等待队列中有等待线程,则返回false,说明公平模式下,只要队列中有线程在等待,那么后来的这个线程也是需要记入队列等待的;NonfairSync中的直接返回的直接是false,说明不需要阻塞。从上面的代码可以得出,当没有锁时,如果使用的非公平模式下的写锁的话,那么返回false,直接通过CAS就可以获得写锁。
获取写锁总结:
当读锁已经被读线程获取或者写锁已经被其他写线程获取,则写锁获取失败;否则,获取成功并支持重入,增加写状态。
写锁释放
WriteLock的unlock方法如下:
public void unlock() {
sync.release(1);
}
Sync的release方法使用的AQS中的,如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease尝试释放锁,一旦释放成功了,先判断头结点是否有效,最后用unparkSuccessor()启动后续等待的线程。
Sync需要实现tryRelease方法,如下:
protected final boolean tryRelease(int releases) {
//如果没有线程持有写锁,但是仍要释放,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 同步状态减去写状态
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//当前写状态是否为0,为0则释放写锁
if (free)
setExclusiveOwnerThread(null);
//更新状态
setState(nextc);
return free;
}
读锁的获取
读锁是一种共享式锁。即同一时刻可以有多个线程获取读锁。
当需要使用读锁时,首先调用lock方法,源码如下:
public void lock() {
sync.acquireShared(1);
}
从代码可以看到,读锁使用的是AQS的共享模式,AQS的acquireShared方法如下:
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到同步队列中。
Sync实现tryAcquireShared()源码如下:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//如果当前有写线程并且本线程不是写线程,不符合重入,失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//得到读锁的个数
int r = sharedCount(c);
//如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值,成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果当前读锁为0
if (r == 0) {
//第一个读线程就是当前线程
firstReader = current;
firstReaderHoldCount = 1;
}
//如果当前线程重入了,记录firstReaderHoldCount
else if (firstReader == current) {
firstReaderHoldCount++;
}
//当前读线程和第一个读线程不同,记录每一个线程读的次数
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//3.否则(读线程应该被阻塞/读锁达到上限/CAS获取失败,循环尝试
return fullTryAcquireShared(current);
}
对于readerShouldBlock():对于公平锁,只要队列中有线程在等待,就会返回true,即读线程应该被阻塞;对于非公平锁,如果有线程获取写锁,返回true,否则返回false。如果读线程不被阻塞,那么该读线程将有机会获取到读锁。
读锁获取:
1.当写锁被其他线程获取后,读锁获取失败,也就是如果一个线程获取到写锁后,可以再次获取到读锁(写锁可重入)。
2.如果没有写线程或者该线程就是写线程,将有可能获取到读锁,接下来判断是否应该阻塞阻塞读线程并且读锁的个数是否小于最大值,并且CAS改变同步状态。
3.如果该读线程应该被阻塞或者读锁达到了上线或者CAS失败,有其他线程在并发更新state,那么会调动fullTryAcquireShared方法。
读锁释放
ReadLock的unlock方法如下:
public void unlock() {
sync.releaseShared(1);
}
调用了Sync的releaseShared方法,该方法在AQS中提供,如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
调用tryReleaseShared方法尝试释放锁,如果释放成功,调用doReleaseShared尝试唤醒下一个节点。
AQS的子类需要实现tryReleaseShared方法,Sync中的实现如下:
protected final boolean tryReleaseShared(int unused) {
//得到调用unlock的线程
Thread current = Thread.currentThread();
//如果是第一个获得读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
//否则,是HoldCounter中计数-1
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//死循环
for (;;) {
int c = getState();
//释放一把读锁,读锁释放,将同步状态减去读状态即可
int nextc = c - SHARED_UNIT;
//如果CAS更新状态成功,返回读锁是否等于0;失败的话,则重试
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
读锁的释放:只有当读锁状态为0时,才可以释放成功。
读写锁的应用场景:缓存的实现,代码如下:
package CODE.多线程;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWrite {
static Map<String,Object> map=new HashMap<>();
static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
static Lock readLock=rwl.readLock(); //读锁
static Lock writeLock=rwl.writeLock(); //写锁
/**
* 线程安全的根据一个key获取value
* @param key
* @return
*/
public static final Object get(String key)
{
readLock.lock();
try
{
return map.get(key);
}finally {
readLock.unlock();
}
}
/**
* 线程安全的根据key设置value,并返回旧的value
* @param key
* @param value
* @return
*/
public static final Object put(String key,Object value)
{
writeLock.lock();
try
{
return map.put(key,value);
}finally {
writeLock.unlock();
}
}
/**
* 线程安全的清空所有value
*/
public static final void clear()
{
writeLock.lock();
try
{
map.clear();
}finally {
writeLock.unlock();
}
}
}
读写锁降级
读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁,不支持锁升级, 关
于锁降级下面的示例代码摘自ReentrantWriteReadLock源码中:
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
总结:
1.公平性选择:支持费公平性(默认)和公平的锁获取方式。
2.重入性:支持重入,读锁获取后可以再次获取读锁,写锁获取之后能够再次获取写锁,同时当前也能获取读锁;
3.锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁。