ReentrantReadWriteLock之一
ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何“读/读”、“读/写”、“写/写”操作都不能同时发生,这在一定程度上降低了吞吐量。然而读操作之间不存在数据竞争问题,如果"读/读"操作能够以共享锁的方式进行,那会进一步提升性能。因此引入了ReentrantReadWriteLock,顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。
读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能,读写锁主要应用于读多写少的场景。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景是读多于写的。在读多写少的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量,读写锁主要具有以下特性:
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
读写锁中同样依赖队列同步器Sync(AQS)实现同步功能,而读写状态就是其同步器的同步状态。下面从例子中来说明:读读共享,读写互斥,写写互斥。
读 | 写 | |
---|---|---|
读 | 共享 | 互斥 |
写 | 同线程共享,不同线程互斥 | 互斥 |
下面是个cache 的例子
package com.concurreny.aqs;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.xml.crypto.Data;
//读写锁ReentrantReadWriteLock:读读共享,读写互斥,写写互斥!
public class ReentrantWriteReadLockTest {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReadLock readLock = lock.readLock();
WriteLock writeLock = lock.writeLock();
public void read(){
try {
readLock.lock();
System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
Thread.sleep(3000);
System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
readLock.unlock();
}
}
public void write(){
try {
writeLock.lock();
System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
Thread.sleep(3000);
System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
writeLock.unlock();
}
}
public static void main(String[] args) {
final ReentrantWriteReadLockTest wr = new ReentrantWriteReadLockTest();
Thread t1 = new Thread(new Runnable() {
public void run() {
wr.read();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
public void run() {
wr.read();
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
public void run() {
wr.write();
}
}, "t3");
Thread t4 = new Thread(new Runnable() {
public void run() {
wr.write();
}
}, "t4");
t1.start();
t2.start();
//t3.start();
//t4.start();
}
}
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return (String[]) m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
源码解读
image.png我们先来看下 ReentrantReadWriteLock 类的整体结构:
/** 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {}
类的继承关系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}
说明:可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。
类的内部类
ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。内部类的关系如下图所示。
image.png
说明:如上图所示,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock实现了Lock接口、WriteLock也实现了Lock接口。
Sync类:
(1)类的继承关系
abstract static class Sync extends AbstractQueuedSynchronizer {}
说明:Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持。
(2)类的内部类
Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,其中,HoldCounter源码如下。
// 计数器
static final class HoldCounter {
// 计数
int count = 0;
// Use id, not reference, to avoid garbage retention
// 获取当前线程的TID属性的值
final long tid = getThreadId(Thread.currentThread());
}
说明:HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下
// 本地线程计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}
说明:ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。
(3)类的属性
abstract static class Sync extends AbstractQueuedSynchronizer {
// 版本序列号
private static final long serialVersionUID = 6317671515068378041L;
// 高16位为读锁,低16位为写锁
static final int SHARED_SHIFT = 16;
// 读锁单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁最大数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 本地线程计数器
private transient ThreadLocalHoldCounter readHolds;
// 缓存的计数器
private transient HoldCounter cachedHoldCounter;
// 第一个读线程
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
}
说明:该属性中包括了读锁、写锁线程的最大量。本地线程计数器等。
(4)类的构造函数
// 构造函数
Sync() {
// 本地线程计数器
readHolds = new ThreadLocalHoldCounter();
// 设置AQS的状态
setState(getState()); // ensures visibility of readHolds
}
说明:在Sync的构造函数中设置了本地线程计数器和AQS的状态state。
读写状态的设计
同步状态在重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。
假设当前同步状态值为S,get和set的操作如下:
(1)获取写状态:
S&0x0000FFFF:将高16位全部抹去
(2)获取读状态:
S>>>16:无符号补0,右移16位
(3)写状态加1:
S+1
(4)读状态加1:
S+(1<<16)即S + 0x00010000
在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。
写锁的获取与释放
看下WriteLock类中的lock和unlock方法:
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
可以看到就是调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。
写锁的获取,看下tryAcquire:
protected final boolean tryAcquire(int acquires) {
//当前线程
Thread current = Thread.currentThread();
//获取状态
int c = getState();
//写线程数量(即获取独占锁的重入数)
int w = exclusiveCount(c);
//当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
if (c != 0) {
// 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;
// 如果写锁状态不为0且写锁没有被当前线程持有返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//判断同一线程获取写锁是否超过最大次数(65535),支持可重入
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//更新状态
//此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
setState(c + acquires);
return true;
}
//到这里说明此时c=0,读锁和写锁都没有被获取
//writerShouldBlock表示是否阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置锁为当前线程所有
setExclusiveOwnerThread(current);
return true;
}
其中exclusiveCount方法表示占有写锁的线程数量,源码如下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
说明:直接将状态state和(2^16 - 1)做与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。
从源代码可以看出,获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程图如下:
image.png