JUC原理之ReentrantReadWriteLock

2020-03-18  本文已影响0人  AlienPaul

什么是ReentrantReadWriteLock

ReentrantReadWriteLockReentrantLock同样是重入锁,支持公平方式和非公平方式获取锁。

ReentrantReadWriteLock同时支持共享锁和独占锁。对于共享资源的读写操作而言,我们允许多个线程同时读取,但是同一时刻仅允许一个线程写入,并且写入操作进行的时候其他线程无法读取。ReentrantReadWriteLock正是这样的一种锁,内部维护了两个锁:读锁和写锁。如下所示:

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

其中读锁是共享锁,写锁是独占锁。

实现方式

ReentrantReadWriteLock使用同一个AQS队列来处理共享锁和独占锁的请求。

通过上一篇JUC原理之ReentrantLock我们知道AQS的state表示某一线程的重入深度。但是在ReentrantReadWriteLock中仍然这样记录是不能满足要求的,原因如下:

下面将在代码的分析中逐个说明如何解决上述的两个问题。

为了解决同时存放读锁和写锁个数的问题,ReentrantReadWriteLock的同步状态(state)这个整型变量分为两部分:

如下方代码所示(摘自Sync类):

// 共享数量偏移量,使用state的高16位存放,自然偏移16
static final int SHARED_SHIFT   = 16;
// 共享锁数量的1在state中对应着是几,当然是高16位的最低位+1了
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 共享锁或独占锁的最大持有数量,state变量分成了2部分使用,每种所的持有最大量变为2的16次方-1
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 返回一个低16位全为1的整型
// 和子网掩码用法类似,state和EXCLUSIVE_MASK按位与,可以得到当前独占锁持有的数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
// 获取当前持有共享锁总数
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
// 获取当前持有的独占锁数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

获取共享锁的过程

注意一种特殊情况:允许同一个线程先获取独占锁,没有释放的时候又获取共享锁。

我们分析下tryAcquireShared方法的实现:

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    
    // 获取当前线程
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果有线程持有独占锁,并且独占线程不是当前线程,获取锁失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
        
    // 获取共享锁数量
    int r = sharedCount(c);
    // 判断是否可以获取读锁(共享锁),这个方法在公平锁和非公平锁中的实现不同,稍后分析
    // r值必须小于MAX_COUNT,否则会溢出
    // CAS尝试读锁个数增加1(高16位)
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果目前没有线程得到共享锁
        if (r == 0) {
            // 设置firstReader(第一个读线程)为当前线程
            firstReader = current;
            // 设置firstReader的重入数量为1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 这个分支含义为第一个读线程为当前线程,并且它再次尝试获得锁(重入)
            firstReaderHoldCount++;
        } else {
            // 这里要更新线程持有的读锁数目(线程的重入深度)
            // HoldCounter是一个线程持有锁数量的计数器,保存了线程id和锁计数两个变量
            // cachedHoldCounter是最后一个获取共享锁的线程对应的HoldCounter,通常来说释放锁的线程就是上一个获取锁的线程。把这个线程的HoldCounter缓存可以减少ThreadLocal查找的时间
            HoldCounter rh = cachedHoldCounter;
            // 如果rh为空,或者rh的线程ID不是当前的线程ID,更新cachedHoldCounter为当前线程对应的HoldCounter
            // readHolds变量在Sync构造函数中创建,线程第一次访问的时候返当前线程对应的HoldCounter,count值为0
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
                
            // 如果rh为当前线程的cache,并且count为0(线程第一次获取共享锁)
            // 记录当前线程的HoldCounter到ThreadLocal
            else if (rh.count == 0)
                readHolds.set(rh);
            
            当前线程的共享锁数量加1
            rh.count++;
        }
        // 此处获取读锁成功,返回1
        return 1;
    }
    // 如果上面的if条件没有满足,需要走完整的获取共享锁流程
    // 上面的步骤只是获取共享锁的快速方法
    return fullTryAcquireShared(current);
}

注意:这里有三个变量:firstReader, firstReaderHoldCountcachedHoldCounter。他们的作用如下:

HoldCounter类用于保存每个线程持有的共享锁数量,需要在ThreadLocal中使用。代码如下:

/**
 * A counter for per-thread read hold counts.
 * Maintained as a ThreadLocal; cached in cachedHoldCounter
 */
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

readHolds保存了每一个线程的共享锁数量,它是ThreadLocalHolderCounter类型。它继承了ThreadLocal类,代码如下:

/**
 * ThreadLocal subclass. Easiest to explicitly define for sake
 * of deserialization mechanics.
 */
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

Sync初始化的时候readHolds变量也一同被初始化。线程第一次调用readHolds.get()时候,自动调用initialValue方法,创建并返回一个当前线程对应的HoldCounter

接下来分析完整版本的获取共享锁方法fullTryAcquireShared,多用于处理CompareAndSetState失败时候的逻辑(tryAcquireSharedCompareAndSetState失败后会执行此方法)。

/**
 * Full version of acquire for reads, that handles CAS misses
 * and reentrant reads not dealt with in tryAcquireShared.
 */
final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    // 自旋
    for (;;) {
        // 获取当前状态
        int c = getState();
        // 如果独占锁的持有数量不是0
        if (exclusiveCount(c) != 0) {
            // 检查持有独占锁的线程是不是当前线程,如果不是返回-1,获取锁失败
            // 此处如果持有独占锁的线程是当前线程,方法不会返回
            // 意思是允许同一个线程获取独占锁之后又获取共享锁
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // 如果不允许获取共享锁
            // 字面上是这个意思,但逻辑上的含义为如果readerShouldBlock()返回true,只允许重入,不允许新线程获得锁
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 如果firstReader不是当前线程
                if (rh == null) {
                    rh = cachedHoldCounter;
                    // 如果cachedHoldCounter为空或者是cachedHoldCounter的HoldCounter和当前线程不对应
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // 获取当前线程的HoldCounter
                        rh = readHolds.get();
                        // 如果count为0,从readHolds清除
                        // 该线程已不持有共享锁,不需要再记录该线程的共享锁数量
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 如果rh的count为0,说明当前线程没有获得共享锁,获取失败返回-1
                // 如果rh的count不为0,说明当前线程持有共享锁,继续向下进行,允许再次获取共享锁
                if (rh.count == 0)
                    return -1;
            }
        }
        // 如果共享锁数量为最大值,抛出异常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // CAS增加共享锁持有数量
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 如果成功,后面的逻辑和tryAcquireShared方法一致,不再赘述
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

获取独占锁的过程

获取独占锁的入口为tryAcquire方法,代码如下:

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
     
    // 获取当前线程
    Thread current = Thread.currentThread();
    int c = getState();
    // 获取独占锁数量
    int w = exclusiveCount(c);
    // 如果有独占锁或者共享锁
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 如果独占锁数量为0,说明有共享锁,获取独占锁失败,返回false
        // 如果独占锁数量不为0,持有锁的线程不是当前线程,说明其他线程持有独占锁。本线程获取独占锁失败,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 如果获取独占锁之后,独占锁计数大于MAX_COUNT,说明独占锁计数溢出
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 更新独占锁数量
        setState(c + acquires);
        // 获取独占锁成功
        return true;
    }
    // 如果没有任何锁
    // 允许当前线程获取独占锁
    // 如果CAS设置state失败,代表获取锁失败,返回false
    // 如果不允许当前线程获取独占锁,直接返回false
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 如果成功,设置独占线程为当前线程,返回true
    setExclusiveOwnerThread(current);
    return true;
}

释放共享锁的过程

释放共享锁的方法为tryReleaseShared,分析如下:

protected final boolean tryReleaseShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 如果当前线程是firstReader
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        // 如果firstReader持有锁数量为1,设置firstReader为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            // 否则firstReader持有锁数量减1
            firstReaderHoldCount--;
    } else {
        // 获得当前线程对应的HoldCounter
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        // 如果count <=0,需要从readHolds变量中移除此HoldCounter
        // 意味着该线程不再持有共享锁,无需再记录重入次数
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 重入数量减1
        --rh.count;
    }
    // 死循环
    for (;;) {
        // 反复尝试CAS将state共享锁数量减1,成功后返回
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            // 返回结果为共享锁是否全部释放
            return nextc == 0;
    }
}

释放独占锁的方法

释放独占锁的方法为tryRelease,分析如下:

protected final boolean tryRelease(int releases) {
    // 如果持有独占锁的线程不是当前线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 计算新的同步状态
    int nextc = getState() - releases;
    // 判断独占锁是否全部释放
    boolean free = exclusiveCount(nextc) == 0;
    // 如果释放完后不再有独占锁,设置独占线程为null
    if (free)
        setExclusiveOwnerThread(null);
    // 设置新的同步状态
    setState(nextc);
    // 返回独占锁是否全部释放
    return free;
}

FairSync

ReentrantLock一样,ReentrantReadWriteLock也有一个Sync对象。它也有两个子类FairSyncNonFairSync,分别代表了共享锁和独占锁的实现。

对于FairSync而言,设想一种场景:队列中有线程等待独占锁排队,后面跟随多个等待共享锁的线程排队。这时如果有队列外线程需要获取独占锁,必须在这些等待共享锁的线程后排队。

FairSync的代码如下:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        // 和ReentrantLock的公平独占锁一样
        return hasQueuedPredecessors();
    }
    // 是否允许没有共享锁的线程获取共享锁(无论返回true或false,已经有共享锁的线程都允许再次获得共享锁,即允许重入)
    // 没有独占锁的时候,共享锁是可以直接获取的,所以此时共享锁不会入队列
    // 如果有需要获取独占锁的线程排队,或者是获取到独占锁的线程正在执行,后续需要共享锁的线程必须依次排队
    // 有一种情况,如果一个线程持有独占锁尚未释放,后续没有其他线程排队,此时hasQueuedPredecessors也会返回false
    // 感觉像是允许获取共享锁。但是tryAcquireShared方法先保证了没有独占锁的情况下才会获取共享锁,因此这个问题不存在
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

NonFairSync

如果队列中有等待独占锁的线程排队,后面跟随多个等待共享锁的线程排队。队列中第一个线程会和队列外线程抢占独占锁。如果队列外线程抢占失败,进入队列排队(位于等待共享锁的线程之后)。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        // 永远允许获取独占锁
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        /* As a heuristic to avoid indefinite writer starvation,
         * block if the thread that momentarily appears to be head
         * of queue, if one exists, is a waiting writer.  This is
         * only a probabilistic effect since a new reader will not
         * block if there is a waiting writer behind other enabled
         * readers that have not yet drained from the queue.
         */
        // 这个方法用来防止等待获取独占锁的线程饥饿
        // 后面详细介绍
        return apparentlyFirstQueuedIsExclusive();
    }
}

问题:apparentlyFirstQueuedIsExclusivehasQueuedPredecessors在行为上会带来什么区别?
比方说AQS队列中的node为:
独占,共享,共享,独占。

第一个独占锁释放后,紧随的两个共享锁未来的及唤醒的时候,如果又有队列外线程要求获得共享锁,会抢在队列末尾的独占锁之前(apparentlyFirstQueuedIsExclusive返回false,因为这时候队列head之后的node为共享)。这点和公平锁的行为是不同的(hasQueuedPredecessors方法在这种情况下返回的是true)。

apparentlyFirstQueuedIsExclusive方法的代码如下:

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 如果head节点的下一个节点是独占类型节点,并且不是空节点(s.thread != null)返回true
    // 对于共享锁,可以直接获取,无需进入等待队列
    // 所以说,如果有线程要求独占锁进入队列排队,它一定是紧随head之后的节点
    // 如果返回true,说明有线程需要获取独占锁
    // 为了需要独占锁的线程避免线程饥饿,在此之后需要获取共享锁的线程,不允许直接获取到锁,必须排队等待之前排队的独占锁获取并释放之后才能恢复运行
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。

上一篇下一篇

猜你喜欢

热点阅读