ReentrantReadWriteLock代码浅析

2019-07-25  本文已影响0人  有个点丶

介绍

除了重入锁ReentrantLock以外,Doug Lea大神还顺带实现了读写重入锁ReentrantReadWriteLock,依旧支持重入特性、公平与非公平模式,分出了读锁和写锁。

读锁

当读锁被持有后,会阻止其他线程获取写锁,但读锁不排斥其他线程持有读锁。

写锁

当写锁被持有后,既排斥其他线程申请读锁,还排斥其他线程申请写锁。

结构

具体的结构关系,直接上图:


流程

通过ReentrantReadWriteLock的readLock()获取读锁,writeLock()获取写锁,读锁和写锁都有lock()、lockInterruptibly()、tryLock()、tryLock(long timeout, TimeUnit unit),unlock()。

lock()方法,遵循初始化时的公平锁或非公平锁模式请求锁,请求锁的过程不会被中断,直到持有锁为止。

lockInterruptibly()方法,如果线程调用了interrupted(),请求锁的过程将会中断,并且抛出InterruptedException异常。如果线程没有被中断,那么会坚持到持有锁为止。

tryLock()方法,无视初始化时公平与非公平模式,直接尝试一次请求锁的CAS操作,如果成功,返回true,失败,返回false。

tryLock(long timeout, TimeUnit unit)方法,类似lockInterruptibly(),但是比lockInterruptibly()多了超时检查操作,当超时时,会中断请求锁过程。
unlock()方法,释放锁对象当前持有的锁一次。

申请读锁

因为请求写锁锁的流程与普通重入锁大同小异,所以这里只展示读锁的lock()方法的请求流程,lock()方法中通过sync调用acquireShared()方法,acquireShared()方法的实现在AbstractQueuedSynchronizer中:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

所有读锁会进行一次tryAcquireShared(int arg)doAcquireShared(int arg)。获取写锁则调用acquire(int arg)方法,方法中if条件中调用tryAcquire(arg)acquireQueued(final Node node, int arg),与读锁获取的流程有些小差别。

回到正题,tryAcquireShared(int arg)方法被Sync实现,再看ReentrantReadWriteLock.Sync中:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

这个方法中有两个比较重要的流程,都会在特定条件下返回,先看看进入这两个流程的if条件。
第一个if条件,exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current),先检查写锁重入数,如果不为0,再检查当前线程是否已经持有写锁,如果未持有写锁则返回-1表示失败,那么可以看出,同一个线程是可以同时持有写锁和读锁的。

第二个if条件,!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)
首先如果是公平锁,那么在readerShouldBlock()中会进入阻塞队列等待,如果非公平锁,就调用apparentlyFirstQueuedIsExclusive()方法检查队列第一个节点是否在等待写锁,如果不是,再来检查读锁持有是否超过最大值,如果未超过,再来CAS修改读锁持有数量,CAS操作成功之后,这个if流程中会修改部分引用和缓存信息,这个具体作用后面再说。

在公平锁模式阻塞重新被唤醒后或者已有其他线程持有读锁,又或者读锁当前持有数现在已经到了MAX_COUNT时,都会跳过第二个if流程直接进入fullTryAcquireShared(Thread current)方法中:

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            //这里注释写了一种造成读写锁死锁的情况
            //当前线程持有写锁,而队列中有其他线程正在等待写锁释放时,这个时候再来请求读锁时,就会导致死锁
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                //省略部分代码
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            //省略部分代码
            }
            return 1;
        }
    }
}

fullTryAcquireShared(Thread current)方法中的请求锁的过程其实和tryAcquireShared(int unused)中的过程十分相似,最重要的还是CAS操作的这个方法。只有compareAndSetState()成功之后才会结束fullTryAcquireShared(Thread current)方法中for循环,结束这次读锁请求操作。

exclusiveCount(int c)与sharedCount(int c)

fullTryAcquireShared(Thread current)方法与tryAcquireShared(int unused)方法中中,变量c,就是持有锁的个数,通过getState()获取到值后,先exclusiveCount(int c),再经过sharedCount(int c)后赋值给r,找到和它们有关的代码:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 65536
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 65535
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

exclusiveCount(int c)操作会清除整型变量c高16位的所有数值,保留低16位数值。
sharedCount(int c) 操作清除整型变量c低16位,保留高16位数值。
所以r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)就是在验证写锁持有数量,当c = 0c + SHARED_UNIT = 65536,经过sharedCount(int c)计算后得1,c + SHARED_UNIT相当于对 c = (c >>> 16 + 1) << 16,对c高16位数值加1。
也就是说state值的高位保存着读锁数量,低位保存写锁数量,读锁与读锁最大持有数都是65535。

关于读写锁等待队列

不管是公平锁还是非公平锁,未能在其他线程释放锁时的特殊时刻正好获取到锁,都会进入等待锁队列,而这个队列被读锁与写锁共用,具体逻辑很多与重入锁高度重合,有兴趣可以看我写的这篇博客ReentrantLock源码通读(一)

关于HoldCounter

HoldCounter遍布在所有读锁的请求和释放过程当中,并且为了缓存这个读锁持有数,代码占据了这些流程代码量大概两成左右,所以HoldCounter的重要性不言而喻,那么具体是什么作用呢?
在ReentrantReadWriteLock中对外提供了一个方法:

/**
 * Queries the number of reentrant read holds on this lock by the
 * current thread.  A reader thread has a hold on a lock for
 * each lock action that is not matched by an unlock action.
 *
 * @return the number of holds on the read lock by the current thread,
 *         or zero if the read lock is not held by the current thread
 * @since 1.6
 */
public int getReadHoldCount() {
    return sync.getReadHoldCount();
}

//Sync中getReadHoldCount()
final int getReadHoldCount() {
    if (getReadLockCount() == 0)
        return 0;

    Thread current = Thread.currentThread();
    if (firstReader == current)
        return firstReaderHoldCount;

    HoldCounter rh = cachedHoldCounter;
    if (rh != null && rh.tid == getThreadId(current))
        return rh.count;

    int count = readHolds.get().count;
    if (count == 0) readHolds.remove();
    return count;
}

注释翻译过来就是,查询当前线程重入读锁的次数。不是可以通过Sync中的state全局变量获取重入次数吗,读锁与写锁是通过int的高低位来区分的,十分方便,为什么不能用这个state来获取读锁的重入次数呢?
事实上写锁的确是利用state的来获取写锁的重入次数的,但是写锁是一个线程独占的,它会排斥其他线程获取写锁以及读锁。
读锁,因为其可被共享的特殊性质,导致state中高位存储的是多个线程重入后的总数,所以state无法用来查询当前线程重入读锁的次数,当前线程无法知晓自己重入次数,这在某些复杂场景或者使用不规范的情况下,可能造成读锁无法被正常释放,而且排查起来也异常困难。
所以HoldCounter出现了,与HoldCounter配合当然少不了ThreadLocal,ReentrantReadWriteLock.Sync中包含有HoldCounter静态内部类和ThreadLocalHoldCounter静态内部类。
HoldCounter记录了重入计数与线程的tid。
ThreadLocalHoldCounter继承ThreadLocal,重载了initialValue()方法。
并且为了更快的获取重入数,还额外赠送了两个全局变量,cachedHoldCounter和firstReaderHoldCount。
cachedHoldCounter保存上个请求读锁线程的HoldCounter。
firstReaderHoldCount保存第一个获取读锁线程的重入数。

所以Sync实现的getReadHoldCount()方法根据return,分为四个分支,通过各种全局缓存变量,尽量高效的返回结果。
第一个分支,读取state的高位,如果为0返回。
第二个分支,检查当前线程是否等于firstReader,如果是,返回firstReaderHoldCount
第三个分支,检查cachedHoldCounter中的tid是否等于当前线程的tid,如果是,返回cachedHoldCounter.count
第四个分支,就是遍历ThreadLocal中的threadMap找到当前线程的HoldCounter返回count

思考

一个线程是否可以同时获取到写锁与读锁?
肯定可以,不然也不会在注释里说明会产生死锁的场景了。
根据写锁的tryAcquire(int arg)源码:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

先判断了state != 0,再判断w==0 || current != getExclusiveOwnerThread()直接堵死线程获取读锁在获取写锁的可能性。
所以能同时获取读锁和写锁的操作只能是先获取写锁,然后获取读锁。
如何能成功同时获取读锁和写锁呢?
在非公平模式下,线程持有写锁后,去请求读锁,而此时等待线程队列中第一个节点不是请求写锁的线程,即可成功获取读锁与写锁,但如果是请求写锁的线程,会返回失败状态值-1,之后进入等待线程队列,线程在持有写锁的情况下休眠并且等待通知被唤醒,由于唤醒超时线程的触发点实在有线程释放锁之后,所以这时无论是否设置过超时参数,都是无效的,依然会导致死锁,所以在使用时需要格外注意。

总结

读锁可以与其他线程共享,但排斥其他线程获取写锁。
写锁排斥其他线程获取锁,当前持有写锁后,仍然可以去获取读锁,完全独占,但是写锁竞争激烈时,完全独占的操作也很容易导致死锁。
读写锁的持有或重入上限均是65535。
当前线程重入计数,读写锁是分开保存的,state中保存的是读锁重入的总数,写锁不受影响。
当前线程读锁重入次数是使用ThreadLocal保存的,每个线程单独维护一个HoldCounter。使用的三个全局变量缓存特殊状态的线程重入计数变量,尽量快的返回结果。

上一篇下一篇

猜你喜欢

热点阅读