JUC并发相关

23. 并发终结之ReentrantReadWriteLock

2020-09-29  本文已影响0人  涣涣虚心0215

ReentrantReadWriteLock是可重入锁,支持锁的降级(Downgrade),即一个线程如果持有写锁的情况下,可以继续持有读锁;不支持锁的升级,即持有读锁的情况下,要先释放读锁,然后再申请写锁。
与ReentrantLock类似,ReentrantReadWriteLock底层也是基于AQS实现,且同样提供了公平Fair和非公平NonFair的实现,所以我们分析源码主要分析tryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared()这些由ReentrantReadWriteLock提供的实现,而AQS提供的acquireQueued(), release(), doAcquireShared(), releaseShared()我们分别在ReentrantLock和Semaphore文章里分析过。

Sync(AQS)源码分析

在分析之前,先看下ReentrantReadWriteLock的相关数据结构。

/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

Sync作为AQS的实现在这里提供了ryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared()这些重要实现。

abstract static class Sync extends AbstractQueuedSynchronizer {
    /*
     * 使用低16位表示Exclusive Lock(Write Lock)独占锁
     * 高16位表示Shared Lock(Read Lock)共享锁
     */
    static final int SHARED_SHIFT   = 16;
     //共享锁范围从2^16次方开始
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 独占锁和共享锁总个数一样都是 2^16-1 = 65535个
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    //独占锁范围到2^16次方结束
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    /** 返回共享锁个数 c<65535,则返回0,c=65536,则返回1*/
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** 返回独占锁个数 比如 c=65535,则返回65535,c>65536,则返回0 */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    /**
     * 一个共享锁计数器
     */
    static final class HoldCounter {
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());
    }
    //针对每个线程的共享锁计数器
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    /**
     * 返回当前线程所持有的共享锁数量
     * 在共享锁数量降为0的时候移除.
     */
    private transient ThreadLocalHoldCounter readHolds;
    /**
     * 最后一个线程成功获得共享锁时共享锁的数量
     */
    private transient HoldCounter cachedHoldCounter;

    /**
     * firstReader 是第一个获取到共享锁的线程
     * firstReaderHoldCount 是第一个获取共享锁线程的持有共享锁数量。
     */
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
    //这两个方法是有Fair和NonFair分别提供的实现
    abstract boolean readerShouldBlock();
    abstract boolean writerShouldBlock();

最后这两个shouldBlock方法就是Fair和NonFair的区别所在

static final class NonfairSync extends Sync {
    //非公平实现里,写锁的获取不需要判断Sync Queue是不是有在等待的节点
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        /* 非公平实现里,读锁的获取要看Sync Queue里head的后继节点是共享锁还是独占锁,是独占锁则需要等待。
         */
        return apparentlyFirstQueuedIsExclusive();
    }
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    //看head的后继节点是不是共享锁,不是共享锁则return false
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}  
/**
 * Fair version of Sync
 */
static final class FairSync extends Sync {
    //公平实现里,写锁和读锁的获取都需要排队,不允许插队。
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
tryAcquire分析

独占锁尝试获取锁流程:
1.如果读锁不为0或者写锁不为0且独占线程不是当前线程,则尝试获取锁失败。
2.如果锁数量饱和,也不能继续申请锁。

protected final boolean tryAcquire(int acquires) {
    /*
     *  Walkthrough:
     * 1. If read count nonzero or write count nonzero and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if  it is either a reentrant acquire or
     *    queue policy allows it. If so, update state and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        //c!=0,w=0表示有共享锁,则不能申请独占锁
        //或者w!=0,表示有独占锁,如果持有线程不是当前线程(另一个线程正在写),则也不能申请。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
       //如果锁的数量饱和了,也不能申请锁
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire,能走到这就是同一个线程继续申请锁,重入,则直接更新state值
        setState(c + acquires);
        return true;
    }
    //writerShouldBlock就看是Fair还是NonFair的实现,然后CAS更新state,更新失败return false;
    //否则更新成功,将当前线程设置为独占锁线程
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
tryAcquireShared分析

上面分析完tryAcquire申请独占锁的代码,现在分析下申请共享锁的代码tryAcquireShared。

  1. 如果写锁被其他线程持有,则获取共享锁失败。这里意味着如果当前线程持有写锁,那么也是可以申请读锁的,即锁降级。
    2.如果tryAcquireShared没有检查可重入的部分,而是在readerShouldBlock允许读之后就CAS设置state的值,并且更新firstReaderHoldCount的值+1。
    3.如果2里面CAS更新失败或者readerShouldBlock不允许又或者锁的数量饱和了导致2失败,那么就需要fullTryAcquireShared来进行循环尝试获取共享锁。
protected final boolean tryAcquireShared(int acquires) {
     /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for lock wrt state, 
     *    so ask if it should block because of queue policy.
     *    If not, try to grant by CASing state and updating count
     *    Note that step does not check reentrant acquires, which postponed to full version 
     *    to avoid having to check hold count in the more typical non-reentrant case.
     * 3. If step 2 fails either because thread apparently not eligible or CAS fails or 
     *    count saturated. chain to version with full retry loop.
     */

    Thread current = Thread.currentThread();
    int c = getState();
    //如果共享锁count不为0,且当前线程不是独占锁线程,则return -1
    //这里如果当前线程持有写锁,那么是可以继续申请读锁的,即锁的降级。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
     //拿到共享锁个数
    int r = sharedCount(c);
    //如果readerShouldBlock不需要block,且共享锁个数不饱和,就进行CAS更新state
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //更新state成功,如果开始共享锁为0,则设置当前线程为firstReader和更新firstReaderHoldCount+1
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如果共享锁不为0,且firstReader是当前线程,则firstReaderHoldCount+1
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //这里主要更新readHolds的值
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //如果上面readerShouldBlock为true,即要block,或者共享锁个数不饱和,又或者CAS更新失败,走下面方法
    return fullTryAcquireShared(current);
}
// 死循环获取读锁。包含锁降级策略(代码逻辑与tryAcquireShared类似)
final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}   
tryRelease分析

独占锁的释放流程tryRelease:
1.如果不是当前线程持有独占锁,则抛出IllegalMonitorStateException异常。
2.查看state释放之后的值是否为0,如果是0,则将独占锁线程设为null。
3.更新state的值。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

tryReleaseShared分析

共享锁释放流程:
1.判断firstReader是不是当前线程,如果是,就将firstReaderHoldCount - 1(如果等于1则将firstReader设为null)。
2.如果不是当前线程持有读锁,则从cachedHoldCounter拿到上一次获取读锁的HoldCounter。主要还是对count进行-1

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        //如果当前线程是第一次获取读锁的线程,更新firstReaderHoldCount的值
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            //如果firstReaderHoldCount = 1,将firstReader设为null
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        //如果当前线程不是第一次获取共享锁的线程,
        //则主要从ThreadLocalHoldCounter获取对应的HolderCounter,进行-1操作。
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }  
    //for循环尝试CAS更新state
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
tryWriteLock分析

类似于ReentrantLock提供的tryLock()方法,写锁也提供tryWriteLock()方法,在尝试获取写锁失败,则直接返回false。
大体逻辑与tryAcquire()类似只是不需要判断writerShouldBlock。

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        //w==0表示是共享锁
        //current != getExclusiveOwnerThread())表示不是当前线程持有写锁
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}   
tryReadLock分析

读锁也提供tryReadLock()方法,在尝试获取读锁失败,则直接返回false。
大体逻辑同样与tryAcquireShared()方法类似。

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读