Java一些收藏

Java并发编程——ReentrantReadWriteLock

2021-12-08  本文已影响0人  小波同学

一、读写锁

有这样一种场景:

Java中的ReentrantReadWriteLock正是为这种场景提供的锁。该类里面包括了读锁和写锁。

1.1、可获取读锁的情况

1.2、可获取写锁的情况

1.3、读写锁特点

读写锁可以增加更新不频繁而读取频繁的共享数据结构的吞吐量。

二、ReentrantReadWriteLock读写锁

ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。

读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

ReentrantReadWriteLock支持以下功能:

三、ReentrantReadWriteLock使用

3.1 更新缓存

public class CachedData {

    private Map<String,String> cacheData = new HashMap<>();

    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public String queryCachedData(String key) {
        //获取读锁
        lock.readLock().lock();
        try{
            //如果缓存有效, 直接使用data
            String data = cacheData.get(key);
            if(!StringUtils.isEmpty(data)){
                return data;
            }
        }finally {
            //释放读锁
            lock.readLock().unlock();
        }

        //获取写锁
        lock.writeLock().lock();
        try{
            //如果缓存无效,更新cache;
            String data = loadCachedData(key);
            cacheData.put(key,data);
            return data;
        }finally {
            //释放写锁
            lock.writeLock().unlock();
        }
    }
}

3.2 支持并发读写的ArrayList

public class ReadWriteList<E> {

    private List<E> list = new ArrayList<>();
    
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
    private final Lock readLock = lock.readLock();    //读锁
    
    private final Lock writeLock = lock.writeLock();    //写锁


    public ReadWriteList(E... initialElements) {
        list.addAll(Arrays.asList(initialElements));
    }

    public void add(E element) {
        writeLock.lock();
        try {
            list.add(element);
        } finally {
            writeLock.unlock();
        }
    }

    public E get(int index) {
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }

    public int size() {
        readLock.lock();
        try {
            return list.size();
        } finally {
            readLock.unlock();
        }
    }
}

四、实现原理

ReentrantReadWriteLock是可重入读写锁的实现。我们先来看看涉及到的类:


可以看到,ReentrantReadWriteLock中也具有非公平锁NonfairSync和公平锁FairSync的实现。同时ReentrantReadWriteLock组合了两把锁:写锁WriteLock和读锁ReadLock。

看看具体的构造函数:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {

    }
}   

可以发现,ReentrantReadWriteLock默认是非公平锁,可以通过参数fair控制是创建非公平锁还是公平锁。同时ReentrantReadWriteLock持有了写锁和读锁。

而本质上,读锁和写锁都是通过持有ReentrantReadWriteLock.sync来进行加锁和释放锁的,用的是同一个AQS,Sync类提供

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    public static class ReadLock implements Lock, java.io.Serializable {

        private final Sync sync;

        protected ReadLock(ReentrantReadWriteLock lock) {
            // 引用的是ReentrantReadWriteLock的sync实例
            sync = lock.sync;
        }
    }

    public static class WriteLock implements Lock, java.io.Serializable {

        private final Sync sync;

        protected WriteLock(ReentrantReadWriteLock lock) {
            // 引用的是ReentrantReadWriteLock的sync实例
            sync = lock.sync;
        }
    }       
}   

基于对AQS原理的理解,知道sync是读写锁实现的关键,而aqs中核心是state字段和双端等待队列。下面我们来看看具体的实现。

4.1 提前了解的内容

在查看ReentrantReadWriteLock之前,您需要了解以下内容:

4.1.1、Sync.HoldCounter类

读锁计数器类,为每个获取读锁的线程进行计数。Sync类中有一个cachedHoldCounter字段,该字段主要是缓存上一个线程的读锁计数器,节省ThreadLocal查找次数。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final class HoldCounter {
            // 某个读线程的重入次数
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            // 某个线程的tid字段
            final long tid = getThreadId(Thread.currentThread());
        }
    }   
}   

4.1.2、Sync.ThreadLocalHoldCounter类

当前线程持有的可重入读锁的数量,当数量下降到0的时候进行删除。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    }   
}   

4.1.3、Sync类的属性

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 高16位为读锁,低16位为写锁
        static final int SHARED_SHIFT   = 16;
        // 读锁单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 读锁最大数量
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写锁最大数量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        // 本地线程计数器
        private transient ThreadLocalHoldCounter readHolds;
        // 缓存的计数器
        private transient HoldCounter cachedHoldCounter;
        // 第一个读线程
        private transient Thread firstReader = null;
        // 第一个读线程的计数
        private transient int firstReaderHoldCount;     
    }
}

该属性中包括了读锁、写锁线程的最大量。本地线程计数器等。

4.1.4、Sync类的构造函数

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync() {
            // 本地线程计数器
            readHolds = new ThreadLocalHoldCounter();
            // 设置AQS的状态
            setState(getState()); // ensures visibility of readHolds
        }
    }
}

在Sync的构造函数中设置了本地线程计数器和AQS的状态state。

4.1.5、读写锁中AQS的state状态设计

AQS中的state为了能够同时记录读锁和写锁的状态,把32位变量分为了两部分:

如上图,高16位存储读状态,读锁是共享锁,这里记录持有读锁的线程数;低16位是写状态,写锁是排他锁,这里0表示没有线程持有,大于0表示持有线程对锁的重入次数。

假设当前同步状态值为S,get和set的操作如下:

在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。

4.1.6、关于读写锁的数据结构

虽然读写锁看起来有两把锁,但是底层用的都是同一个state,同一个等待队列。只不过是通过ReadLock和WriteLock分别提供了读锁和写锁的API,底层还是用同一个AQS。如下图:

公平和非公平是针对等待队列中的线程节点的处理来说的:

  • 公平模式一般都是从队列头开始处理,并且如果等待队列还有待处理节点,新的线程全部都入等待队列;
  • 非公平模式一般不管等待队列里面有没有待处理节点,都会先尝试竞争获取锁;特殊情况:如果等待队列中有写锁线程,那么新来的读锁线程必须排队让写锁线程先进行处理。

其实关于读写锁的原理就差不多是这么多了。

4.2 ReadLock实现原理

4.2.1、ReadLock.lock()

查看ReadLock的lock相关方法,调用的是AQS的acquireShared方法,该方法会以共享模式获取锁:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    public static class ReadLock implements Lock, java.io.Serializable {

        private final Sync sync;

        public void lock() {
            sync.acquireShared(1);
        }
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    public final void acquireShared(int arg) {
        // 尝试获取锁
        if (tryAcquireShared(arg) < 0)
            // 如果获取锁失败了,那么会进入ASQ的等待队列,等待被唤醒后重新尝试获取锁
            doAcquireShared(arg);
    }
}

下面看看关键获取锁的tryAcquireShared方法,该方法主要处理逻辑:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final int tryAcquireShared(int unused) {
            //获取当前线程
            Thread current = Thread.currentThread();
            //获取状态
            int c = getState();
             // 如果存在写锁,并且写锁不是当前线程,则直接失败让线程进入等待队列
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 读锁数量 
            int r = sharedCount(c);
            // 判断读锁是否应该被阻塞,公平模式下,先进入等待队列则先被处理;
            //非公平模式下写锁优先级比较高,如果头节点的下一个节点不是共享模式,
            //即是尝试获取写锁的线程,读锁需要让步
            if (!readerShouldBlock() &&
                // 读锁是否已到达获取上线
                r < MAX_COUNT &&
                // CAS修改读锁状态,+1
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 获取读锁成功
                if (r == 0) {
                    // 如果是第一个获取读锁的线程,也就是把读锁状态从0变到1的那个线程,那么存入firstReader中
                    firstReader = current;
                    // firstReader持有锁=1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // firstReader已经是当前线程,则firstReaderHoldCount++
                    firstReaderHoldCount++;
                } else {// 读锁数量不为0,并且第一个读线程不为当前线程
                    // 获取缓存读锁计数器
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        // 缓存读锁计数器为空或者计数器不是当前线程的,则尝试通过ThreadLocal获取当前线程对应的计数器
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 以上执行失败,则进入该逻辑
            return fullTryAcquireShared(current);
        }
    
    }
}

让我们接着看fullTryAcquireShared方法,这个方法可知,只有其他线程持有写锁,或者使用的是公平锁并且头节点后面还有其他等待的线程,或者头节点后面的节点不是共享模式,或者读锁计数器达到了上限,则阻塞,否则一直会循环尝试获取锁:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                // 如果存在写锁,并且写锁不是当前线程,则返回false
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                // 不存在写锁,继续判断是否应该阻塞:如果是公平锁并且头节点后有其他等待的线程,则阻塞,
                // 如果是非公平锁,判断头节点后面的节点是否共享模式,如果不是则阻塞
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    // 如果当前线程是firstReader,说明是重入
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        // 进入该分支,说明没有读写锁冲突,并且不是重入,当前线程也不是firstReader
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            // 判断上一个获取到锁的线程是否当前线程,不是则进入AQS等待队列
                            // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        // rh.count == 0 表示rh是刚新获取到的,直接返回,进入等待队列
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 读锁数量为最大值,抛出异常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 比较并且设置成功 
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    // 读线程数量为0
                    if (sharedCount(c) == 0) {
                        // 设置第一个读线程
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
    }
}

最后我们来看看doAcquireShared方法:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private void doAcquireShared(int arg) {
        // 添加一个共享等待节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 判断新增的节点的前一个节点是否头节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 是头节点,那么在此尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取成功,把当前节点变为新的head节点,
                        //并且检查后续节点是否可以在共享模式下等待,
                        //并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 阻塞节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                // 取消获取锁
                cancelAcquire(node);
        }
    }
}

4.2.2、ReadLock.unlock()

接下来我们看看释放锁的代码。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    public static class ReadLock implements Lock, java.io.Serializable {

        private final Sync sync;

        public void unlock() {
            sync.releaseShared(1);
        }
    }
}

AbstractQueuedSynchronizer.releaseShared()

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}

主要处理方法是tryReleaseShared,该方法主要是清理ThreadLocal中的锁计数器,然后CAS修改读锁个数减1:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryReleaseShared(int unused) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 当前线程为第一个读线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                // 读线程占用的资源数为1
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    // 减少占用的资源
                    firstReaderHoldCount--;
            } else {// 当前线程不为第一个读线程
                // 获取缓存的计数器
                HoldCounter rh = cachedHoldCounter;
                // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
                if (rh == null || rh.tid != getThreadId(current))
                    // 获取当前线程对应的计数器
                    rh = readHolds.get();
                // 获取计数 
                int count = rh.count;
                if (count <= 1) {// 计数小于等于1
                    // 移除
                    readHolds.remove();
                    if (count <= 0)
                        // 计数小于等于0,抛出异常
                        throw unmatchedUnlockException();
                }
                // 减少计数
                --rh.count;
            }
            //自旋CAS,减去1<<16
            for (;;) {// 无限循环
                // 获取状态
                int c = getState();
                // 获取状态
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))// 比较并进行设置
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
    }
}

4.3 WriteLock实现原理

4.3.1、WriteLock.lock()

查看WriteLock的lock锁相关方法,调用的是sync.acquire方法,该方法直接继承了ASQ的acquire()方法的实现:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    public static class WriteLock implements Lock, java.io.Serializable {

        private final Sync sync;

        public void lock() {
            sync.acquire(1);
        }
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

与ReentrantLock的实现区别在具体的tryAcquire()方法的实现,我们来看看ReentrantReadWriteLock.Sync中该方法的实现,主要做了以下事情:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            Thread current = Thread.currentThread();
            //获取状态
            int c = getState();
            //写线程数量(即获取独占锁的重入数)
            int w = exclusiveCount(c);
            //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // 不存在写锁,或者当前线程不是写锁持有的线程,那么直接失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                    
                // 写锁超多最大数量限制,也直接失败
                //判断同一线程获取写锁是否超过最大次数(65535),支持可重入               
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 写锁持有的线程重入,直接修改state即可
                setState(c + acquires);
                return true;
            }
            //到这里说明此时c=0,读锁和写锁都没有被获取
            // 判断是否应该阻塞:非公平模式,无需阻塞,公平模式如果前面有其他节点则需要排队阻塞
            if (writerShouldBlock() ||
                // 尝试获取写锁
                !compareAndSetState(c, c + acquires))
                return false;
            //设置锁为当前线程所有    
            setExclusiveOwnerThread(current);
            return true;
        }
    }
}

4.3.2、WriteLock.unlock()

查看WriteLock的unlock相关方法,调用的是sync.release方法,该方法直接继承了AQS的release实现

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    public static class WriteLock implements Lock, java.io.Serializable {

        private final Sync sync;

        public void unlock() {
            sync.release(1);
        }
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    public final boolean release(int arg) {
        // 尝试释放锁
        if (tryRelease(arg)) {
            // 释放锁成功,则唤醒队列中头节点后的一个线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
        
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryRelease(int releases) {
            // 如果当前线程没有获取写锁,则释放直接抛异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //写锁的新线程数   
            int nextc = getState() - releases;
            //如果独占模式重入数为0了,说明独占模式被释放
            boolean free = exclusiveCount(nextc) == 0;
            // 如果当前线程完全释放了写锁,则去除独占标识
            if (free)
                //若写锁的新线程数为0,则将锁的持有者设置为null
                setExclusiveOwnerThread(null);
            // 修改state
            /不管独占模式是否被释放,更新独占重入数
            setState(nextc);
            return free;
        }
    }
}

写锁的释放过程还是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,如果不是抛出异常。然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。

说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。其方法流程图如下。

总结

相比于ReentrantLock,读写锁的实现复杂一些,里面有很多的点很巧妙,比如下面几点:

参考:
https://www.itzhai.com/articles/introduction-and-use-of-reentrantreadwritelock.html

https://www.cnblogs.com/xiaoxi/p/9140541.html

https://www.cnblogs.com/zaizhoumo/p/7782941.html

https://www.cnblogs.com/gunduzi/p/13635002.html

上一篇 下一篇

猜你喜欢

热点阅读