一些收藏收藏

ReentrantReadWriteLock之一

2022-04-01  本文已影响0人  程序员札记

ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,实际上独占锁是一种相对比较保守的锁策略,在这种情况下任何“读/读”、“读/写”、“写/写”操作都不能同时发生,这在一定程度上降低了吞吐量。然而读操作之间不存在数据竞争问题,如果"读/读"操作能够以共享锁的方式进行,那会进一步提升性能。因此引入了ReentrantReadWriteLock,顾名思义,ReentrantReadWriteLock是Reentrant(可重入)Read(读)Write(写)Lock(锁),我们下面称它为读写锁。

读写锁内部又分为读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而提升程序性能,读写锁主要应用于读多写少的场景。

一般情况下,读写锁的性能都会比排它锁好,因为大多数场景是读多于写的。在读多写少的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量,读写锁主要具有以下特性:

而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
读写锁中同样依赖队列同步器Sync(AQS)实现同步功能,而读写状态就是其同步器的同步状态。下面从例子中来说明:读读共享,读写互斥,写写互斥。

共享 互斥
同线程共享,不同线程互斥 互斥

下面是个cache 的例子

package com.concurreny.aqs;

import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import javax.xml.crypto.Data;
//读写锁ReentrantReadWriteLock:读读共享,读写互斥,写写互斥!
public class ReentrantWriteReadLockTest {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReadLock readLock = lock.readLock();
    WriteLock writeLock = lock.writeLock();
    
    public void read(){
        try {
            readLock.lock();
            System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
            Thread.sleep(3000);
            System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            readLock.unlock();
        }
    }
    
    public void write(){
        try {
            writeLock.lock();
            System.out.println("线程"+Thread.currentThread().getName()+"进入。。。");
            Thread.sleep(3000);
            System.out.println("线程"+Thread.currentThread().getName()+"退出。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            writeLock.unlock();
        }
    }
    
 
    public static void main(String[] args) {
        final ReentrantWriteReadLockTest wr = new ReentrantWriteReadLockTest();
        Thread t1 = new Thread(new Runnable() {
            public void run() {
                wr.read();
            }
        }, "t1");
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                wr.read();
            }
        }, "t2");
        Thread t3 = new Thread(new Runnable() {
            public void run() {
                wr.write();
            }
        }, "t3");
        Thread t4 = new Thread(new Runnable() {
            public void run() {
                wr.write();
            }
        }, "t4");
        
        t1.start();
        t2.start();
        //t3.start();
        //t4.start();
    }

}
class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock();
        try { return (String[]) m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
 }

源码解读

image.png

我们先来看下 ReentrantReadWriteLock 类的整体结构:

    /** 读锁 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 写锁 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;    
    /** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }
    /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    /** 返回用于写入操作的锁 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }    
    /** 返回用于读取操作的锁 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    static final class NonfairSync extends Sync {}
    static final class FairSync extends Sync {}
    public static class ReadLock implements Lock, java.io.Serializable {}
    public static class WriteLock implements Lock, java.io.Serializable {}

类的继承关系

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}
说明:可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。

类的内部类

ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。内部类的关系如下图所示。


image.png

说明:如上图所示,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock实现了Lock接口、WriteLock也实现了Lock接口。

Sync类:
(1)类的继承关系
abstract static class Sync extends AbstractQueuedSynchronizer {}
说明:Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持。
(2)类的内部类
Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,其中,HoldCounter源码如下。

// 计数器
static final class HoldCounter {
    // 计数
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    // 获取当前线程的TID属性的值
    final long tid = getThreadId(Thread.currentThread());
}

说明:HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下

// 本地线程计数器
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

说明:ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。

(3)类的属性

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 版本序列号
    private static final long serialVersionUID = 6317671515068378041L;        
    // 高16位为读锁,低16位为写锁
    static final int SHARED_SHIFT   = 16;
    // 读锁单位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 读锁最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 写锁最大数量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 本地线程计数器
    private transient ThreadLocalHoldCounter readHolds;
    // 缓存的计数器
    private transient HoldCounter cachedHoldCounter;
    // 第一个读线程
    private transient Thread firstReader = null;
    // 第一个读线程的计数
    private transient int firstReaderHoldCount;
}

说明:该属性中包括了读锁、写锁线程的最大量。本地线程计数器等。
(4)类的构造函数

// 构造函数
Sync() {
    // 本地线程计数器
    readHolds = new ThreadLocalHoldCounter();
    // 设置AQS的状态
    setState(getState()); // ensures visibility of readHolds
}

说明:在Sync的构造函数中设置了本地线程计数器和AQS的状态state。

读写状态的设计

同步状态在重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。

image.png

假设当前同步状态值为S,get和set的操作如下:
(1)获取写状态:
S&0x0000FFFF:将高16位全部抹去
(2)获取读状态:
S>>>16:无符号补0,右移16位
(3)写状态加1:
S+1
(4)读状态加1:
  S+(1<<16)即S + 0x00010000
在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。

写锁的获取与释放

看下WriteLock类中的lock和unlock方法:

public void lock() {
    sync.acquire(1);
}
 
public void unlock() {
    sync.release(1);
}

可以看到就是调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。
写锁的获取,看下tryAcquire:

protected final boolean tryAcquire(int acquires) {
    //当前线程
    Thread current = Thread.currentThread();
    //获取状态
    int c = getState();
    //写线程数量(即获取独占锁的重入数)
    int w = exclusiveCount(c);
    
    //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
    if (c != 0) {
        // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;
        // 如果写锁状态不为0且写锁没有被当前线程持有返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;        
        //判断同一线程获取写锁是否超过最大次数(65535),支持可重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //更新状态
        //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
        setState(c + acquires);
        return true;
    }    
    //到这里说明此时c=0,读锁和写锁都没有被获取
    //writerShouldBlock表示是否阻塞
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;    
    //设置锁为当前线程所有
    setExclusiveOwnerThread(current);
    return true;
}

其中exclusiveCount方法表示占有写锁的线程数量,源码如下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
说明:直接将状态state和(2^16 - 1)做与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。
从源代码可以看出,获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程图如下:


image.png
上一篇下一篇

猜你喜欢

热点阅读