【Java并发编程】ReentrantReadWriteLock

2019-10-29  本文已影响0人  长大后简单很幸福_f63e

概述

在并发编程中,为了解决线程安全问题,我们会高频率的使用独占式锁,一般我们使用java提供的Synchronized关键字或java.util.concurrent中的ReentrantLock,他们都是排他锁,这些锁在同一时刻只允许一个线程进行访问。而接下来要学习的读写锁,同一时刻可以允许多个读线程访问,当时在写线程访问的时候,所有的读线程和其它的写线程都会被阻塞。读写锁维护了一对锁,一个读锁一个写锁,通过分离读锁和写锁,是的并发性相比于一般的排他锁有了很大的提升。

在一些实际的业务场景中,大部分时间只需要获取读锁,只有少部分需要获取写锁,但是写操作在完成之后,需要对后续的读操作可见。在Java5之前,要实现上述的业务场景,只能使用Java的等待通知机制,就是当写操作开始时,所有晚于当前写操作的读写操作均会进入等待状态,只有当前写操作完成并进行通知之后,所有等待的读操作才能继续进行(写操作可以依靠Synchronized来交互),这样是为了使读操作能够读到正确的数据,避免脏读。而在Java5之后,要实现上述场景,只需要在读操作的时候获取读锁,写操作的时候获取写锁即可。

一般情况下,读写锁的性能要比排他锁的性能好,因为大多数场景都是读多余写,在这种情况下,读写锁能够提供更好的并发性和吞吐量。java.util.concurrent包下的ReentrantReadWriteLock就是对读写锁的实现,它提供的特性可以总结如下表:

特性 描述
公平性选择 支持非公平(默认)和公平锁的不同获取方式,吞吐量还是非公平锁优于公平锁
重入性 该锁支持重进入,读线程在获取到读锁之后,还可以获取读锁;而写线程在获取到写锁之后,还可以获取读锁或写锁。
锁降级 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级为读锁

我们想要完全理解这三个特性的实现原理,则需要更加深入的去了解ReentrantReadWriteLock的源码,接下来我们带着这几个问题去看源码:

  1. 读写锁是如何实现读写状态的记录的?
  2. 读锁和写锁分别是怎么样获取和释放的?
  3. 是如何实现写锁排他,读锁共享,读锁排斥写锁的?
  4. 锁降级是如何完成的?

写锁详解

下面我们先写个例子看如何运用ReentrantReadWriteLock:

import java.time.LocalDateTime;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author xiongchenyang
 * @Date 2019/10/28
 **/
public class ReentrantReadWriteLockTest {

    public static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    public static Lock readLock = readWriteLock.readLock();
    public static Lock writeLock = readWriteLock.writeLock();

    public static void main(String[] args) {
        for(int i = 0;i < 20;i++){
            if(i%2 == 0){
                Thread readThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println(LocalDateTime.now() + ":"+Thread.currentThread().getName() + "准备获取读锁!");
                            readLock.lock();
                            System.out.println(LocalDateTime.now() + ":"+Thread.currentThread().getName() + "获取到读锁了");
                            Thread.sleep(1000*5);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            System.out.println(LocalDateTime.now() + ":"+Thread.currentThread().getName() + "释放了读锁");
                            readLock.unlock();
                        }
                    }
                });
                readThread.start();

            }else{
                Thread writeThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println(LocalDateTime.now() + ":"+Thread.currentThread().getName() + "准备获取写锁");
                            writeLock.lock();
                            System.out.println(LocalDateTime.now() + ":"+Thread.currentThread().getName() + "获取到写锁了");
                            Thread.sleep(1000*5);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            System.out.println(LocalDateTime.now() + ":"+Thread.currentThread().getName() + "释放了写锁");
                            writeLock.unlock();
                        }
                    }
                });
                writeThread.start();
            }
        }
    }
}

从上面的demo,我们可以看到,读锁通过readWriteLock.readLock()来创建,写锁通过readWriteLock.writeLock()来创建。而读写锁的获取和释放锁都是通过lock和unlock方法。下面我们具体看下写锁的获取和释放源码

读写锁的获取与释放

写锁的获取

写锁在同一时刻只能被一个线程所获取,很显然,写锁是独占式锁,该锁的同步语义是通过AQS中的tryAcquire实现的。源码如下;

protected final boolean tryAcquire(int acquires) {
        /*
         * Walkthrough:
         * 1. If read count nonzero or write count nonzero
         *    and owner is a different thread, fail.
         * 2. If count would saturate, fail. (This can only
         *    happen if count is already nonzero.)
         * 3. Otherwise, this thread is eligible for lock if
         *    it is either a reentrant acquire or
         *    queue policy allows it. If so, update state
         *    and set owner.
         */
        Thread current = Thread.currentThread();
        //获取同步状态
        int c = getState();
        //获取独占保留数也就是写锁获取的次数
        int w = exclusiveCount(c);
        //如果同步状态不为0
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            //如果写锁获取数为0,或独占线程不是当前线程,那么返回false,加锁失败
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //如果重入锁次数大于最大限定次数,那么抛出异常,最大限定次数为2^16 = 65536
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            //更新同步状态,加锁成功返回true
            setState(c + acquires);
            return true;
        }
        //同步状态为0,那么调用CAS方法尝试修改同步状态,失败则返回false,成功则返回true
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

整段代码的具体实现逻辑可以看代码中的注释,而在这段代码中比较重要的一个方法就是exclusiveCount(int c),我们看下它相关的一小段源码:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

 /** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

可以看出其中EXCLUSIVE_MASK为1左移16位减1,即低16为全部为1,高16为全为0。exclusiveCount方法是将同步状态与EXCLUSIVE_MASK做与运算,即只取同步状态的低16位。这是我们可以看出同步状态的低16位,表示写锁的获取次数

写锁的释放

写锁的释放,通过重写tryRelease(int arg)方法,源码如下

protected final boolean tryRelease(int releases) {
    //如果占有写锁的线程不是当前线程,那么抛出非法异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //得到新的同步状态
    int nextc = getState() - releases;
    //得到新同步状态的低16位,判断是否为0
    boolean free = exclusiveCount(nextc) == 0;
    //为0,则说明写锁全部释放,将该锁的占有线程置为null
    if (free)
        setExclusiveOwnerThread(null);
    //更新同步状态,返回释放情况
    setState(nextc);
    return free;
}

这里的代码逻辑就比较简单了,大体上与ReentrantLock的实现方法差不多,只需要注意int nextc = getState() - releases;这里可以直接用同步状态减去写状态是因为同步状态的低16位是用来表示写操作的获取次数的。

读锁的获取

读锁不是独占式锁,同一时刻该锁允许被多个读线程获取到,也就是一种共享式锁。按照之前对AQS的源码分析,这里肯定需要通过tryAcquireShared和tryReleaseShared方法实现,具体获取方法源码如下:

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            如果同步状态低16位,也就是写锁获取次数不为0,且占有线程不为当前线程,那么返回-1,加锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //获取同步状态高16位
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                //当前线程获取读锁
                compareAndSetState(c, c + SHARED_UNIT)) {
                //如果读锁获取次数为0,那么要记录第一次获取读锁的线程
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                //如果第一次获取读锁的线程就是当前线程,那么获取读锁次数加一
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    //记录当前线程获取读锁的次数
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //如果CAS获取读锁失败,那么自旋尝试
            return fullTryAcquireShared(current);
        }

如上代码,在获取读锁时,会优先判断是否有其他线程已经获取了写锁,如果有,那么该线程获取读锁失败,返回-1,进入等待状态;如果没有,那么尝试获取读锁,成功则返回1,失败则自旋。另外这里需要注意的是compareAndSetState(c, c + SHARED_UNIT),尝试获取同步状态,需要加上SHARED_UNIT(1 << 16),是因为同步状态中高16位表示读锁的获取次数。下面是读锁和写锁被获取时,同步状态的修改示意图:

读写锁同步状态修改示意图.png

读锁的释放

读锁的释放,主要调用tryReleaseShared方法,下面我们看下该方法的源码:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //这里是实现了getReadHoldCount等新功能
    //如果当前线程是第一个获取读锁的线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        //那么如果第一个读锁线程获取次数是否为1,那么将firstReader置为null,否则firstReaderHoldCount减1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        //如果当前线程不是第一个获取读锁的线程,那么从该线程的ThreadLocal中获取到该线程的获取次数,如果获取次数<=1,那么从ThreadLocal里面移除,否则获取次数减1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    
    for (;;) {
        int c = getState();
        //获取锁状态,将同步状态,减去读状态
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

以上代码比较简单,这里就不赘述了,代码逻辑可以看注释。

锁降级

锁降级指的是写锁降级为读锁。如果当前线程拥有了写锁,然后将其释放,再获取读锁,这个过程不能称之为锁降级。锁降级指的是持有当前获取的写锁,然后获取到读锁,再释放之前持有的写锁的过程。下面看下ReentrantReadWriteLock中锁降级的相关代码:

class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            // Must release read lock before acquiring write lock
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // Recheck state because another thread might have
                // acquired write lock and changed state before we did.
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                // Downgrade by acquiring read lock before releasing write lock
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); // Unlock write, still hold read
            }
        }

        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

这段代码在ReentrantReadWriteLock的注释中,讲述了锁降级的使用过程。在已经获取写锁的线程中,获取读锁,然后释放写锁的过程。当数据发生变化后,被volatile修饰的布尔类型cacheValid被设置为false,此时所有访问processCacheData方法的线程都能够感知到变化,但是只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。

锁降级中读锁的获取是不是必须的呢?答案是肯定的。这是为了保证数据的可见性,如果当前线程不获取读锁而直接释放写锁,那么此时可能有另一个线程T获取到了写锁,并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取到读锁,再释放写锁,那么线程T将会被阻塞,知道当前线程释放读锁之后,线程T才能获取写锁修改数据。同理,为了保证数据可见性,ReentrantReadWriteLock是不支持锁升级的,假设多个线程获取到读锁,如果能够锁升级,其中某个线程获取到了写锁,修改数据,那么其余线程是不可见的。

上一篇下一篇

猜你喜欢

热点阅读