JAVA并发(11)——ReentrantReadWriteLo
ReentrantReadWriteLock的使用
ReentrantReadWriteLock是JDK提供的读写锁机制,写锁是排他锁,读锁是共享锁。
//创建一个读写锁
ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
//根据读写锁对象分别创建写锁和读锁
Lock w = reentrantLock.writeLock();
Lock r = reentrantLock.readLock();
ReentrantReadWriteLock把AQS表示锁状态的字段state逻辑上分为了两个部分:高16位是同一个锁被获取共享锁(读锁)的次数,低16位是同一个锁被获取排他锁(写锁)的次数。
默认情况下,创建ReentrantReadWriteLock对象也是按照非公平策略:
public ReentrantReadWriteLock(boolean fair) {
//sync 的实现与ReentrantLock一样
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
由下面代码可以知道,ReadLock和WriteLock使用的相同的sync变量:
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
写锁加锁
执行写锁w.lock()方法,来获取写锁,w.lock方法的实现最终会调用Sync类的tryAcquire方法
写锁的tryAcquire方法实现如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//获取锁状态
int c = getState();
//获取排他锁的个数,也就是写锁的个数
int w = exclusiveCount(c);
//锁状态个数不为0,且写锁为0或者持有锁的线程不是当前线程,那么失败
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 如果是当前线程之前已经获取该锁
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
主要逻辑:
1.判断锁状态
2.计算获取写锁的次数
(1)当前锁状态为0,执行 (2),否则执行下面
a 如果当前锁状态是读锁或者 b当前锁状态是写锁,但是持有锁的线程不是当前线程,那么获取失败,返回false
(2)设置锁的装状态,如果成功这设置获取锁的线程为当前线程,返回true。否则获取锁失败,返回false
如果写锁的tryAcquire方法返回true
那么会把当前线程封装为一个Node节点对象,把该node节点对象加入AQS的阻塞队列中。
写锁释放
写锁的释放,会调用ReentrantReadWriteLock内部静态类Sync的tryRelease方法,下面是tryRelease具体实现:
protected final boolean tryRelease(int releases) {
//判断当前申请释放锁的线程是不是持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//判断排它锁值状态是不是0
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
当tryRelease返回true之后,此时没有线程持有锁,接着就可以从AQS的阻塞队列中释放一个线程去执行:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //从阻塞对列中获取一个线程
return true;
}
return false;
}
读锁获取
读锁也被称为共享锁,获取共享锁的方法tryAcquireShared实现如下
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//排它锁不等于0并且拥有排他锁的线程不是当前线程,那么返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取共享锁数量
int r = sharedCount(c);
//这里有个readerShouldBlock()是为了避免在非公平策略下写锁一直处于饥饿状态,
//readerShouldBlock实现主要是判断如果当阻塞队列的第一个等待锁的线程是写操作
//那么就要阻塞当前读线程,使得让写线程能有机会获取到写锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//如果当前锁是读锁,但是持有锁的线程不是当前线程,执行下面的操作
//给获取一个存储在ThreadLocal对象中的缓存对象HoldCounter,该值主要是存储
//每个线程获取读锁的个数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();//readHolds 是Threadlocal对象
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//下面的方法是处理CAS失败或者是在tryAcquireShared没有处理的重入读锁
return fullTryAcquireShared(current);
}
线程从tryAcquireShared方法中返回大于0表示成功获取到共享锁,如果返回小于0表示获取共享锁失败,那么就需要执行doAcquireShared()方法,把当前线程加入到等待队列中。
读锁释放
读锁释放的主要逻辑:
1.判断获取读锁的第一个线程是不是当前线程,如果是修改锁数量,然后执行3,如果不是当前线程,执行2
2.获取当前线程自己的HoldCounter 对象,修改HoldCounter 中读锁数量值,执行3
3.使用CAS修改AQS的锁状态信息,并返回AQS的缩状态
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}