并发编程之锁(四)--ReentrantReadWriteLoc

2019-05-07  本文已影响0人  夏目手札

前言

上一篇中已经分析了ReentrantLock,下面我们来看一下读写锁ReentrantReadWriteLock。
在这之前,先看一下其结构图:


ReadLock/WriteLock

//使用ReentrantReadWriteLock的Sync对象
protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}

public void lock() {
    sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
    return sync.tryReadLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void unlock() {
    sync.releaseShared(1);
}
//不支持条件变量
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

public String toString() {
    int r = sync.getReadLockCount();
    return super.toString() +
        "[Read locks = " + r + "]";
}

protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}

public void lock() {
    sync.acquire(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public void unlock() {
    sync.release(1);
}

public Condition newCondition() {
    return sync.newCondition();
}

public String toString() {
    Thread o = sync.getOwner();
    return super.toString() + ((o == null) ?
                               "[Unlocked]" :
                               "[Locked by thread " + o.getName() + "]");
}

public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}

public int getHoldCount() {
    return sync.getWriteHoldCount();
}

WriteLock的代码,类似ReadLock的代码,差别在于独占式获取同步状态。

Sync抽象类

Sync是ReentrantReadWriteLock的静态内部类,继承自AbstractQueuedSynchronizer的抽象类。它使用AQS的state字来表示当前锁的持有数量,其中state的高16位表示读状态,即获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数。

static final int SHARED_SHIFT   = 16; // 位数
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; // 每个锁的最大重入次数,65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//写锁的标记,用来计算写锁的重入次数

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//第一个获得读锁的线程
private transient Thread firstReader = null;
//firstReader的重入次数
private transient int firstReaderHoldCount;
//当前线程持有的可重入数量
private transient ThreadLocalHoldCounter readHolds;
//成功获取读锁的的最后一个线程的计数器
private transient HoldCounter cachedHoldCounter;
/**
  * 计数器,主要存储线程id和重入次数
  **/
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    // 避免 HoldCounter 和 ThreadLocal 互相绑定而导致 GC 难以释放它们
    final long tid = getThreadId(Thread.currentThread());
}
static final long getThreadId(Thread thread) {
    return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
Sync() {
    //ThreadLocal子类,表示当前线程持有的可重入读锁的数量
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
}

abstract boolean readerShouldBlock();

abstract boolean writerShouldBlock();
//写锁释放
protected final boolean tryRelease(int releases) {
    //1. 如果释放的线程不为锁的持有者,直接抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 写锁的重入次数,如果为0,释放持有的线程
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
//写锁获取。针对写锁,因为要确保写锁的操作对读锁是可见的。
//如果在存在读锁的情况下允许获取写锁,那么那些已经获取读锁的其他线程可能就无法感知当前写线程的操作。
//因此只有等读锁完全释放后,写锁才能够被当前线程所获取,一旦写锁获取了,所有其他读、写线程均会被阻塞。
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    //1. 计算写锁的重入次数
    int c = getState();
    int w = exclusiveCount(c);
    //2. 如果存在锁
    if (c != 0) {
        //w == 0 表示仅存在读锁不存在写锁
        //2.1  如果只存在读锁或者存在写锁但是当前线程不是已经获取写锁的线程,直接返回失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //2.2. 如果写锁可重入次数超出最大范围,抛出异常
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //2.3. 设置状态,获取成功
        setState(c + acquires);
        return true;
    }
    //3. 使用CAS设置状态,如果失败返回失败(#writerShouldBlock()在非公平锁下永远是false,
    //在公平锁下需要判断是否有前驱节点,上一篇中提到过 )
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
//读锁释放
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //1. 如果想要释放锁的线程为第一个获取读锁的线程
    if (firstReader == current) {
        // 将计数器至空或者减1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        //2. 判断最后一次获取到读锁的线程是否是当前线程,如果不是从readholds中获取
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //3. 自旋,设置状态
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

private IllegalMonitorStateException unmatchedUnlockException() {
    return new IllegalMonitorStateException(
        "attempt to unlock read lock, not locked by current thread");
}
//读锁获取
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果存在写锁,且锁的持有者不是当前线程,直接返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //2. 获取读锁重入次数
    int r = sharedCount(c);
    /*
     * readerShouldBlock():读锁是否需要等待(公平锁原则)
     * r < MAX_COUNT:持有线程小于最大数(65535)
     * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //3. 如果重入次数为0,firstReader = current;如果firstReader == current, firstReaderHoldCount++;
        //否则从缓存中获取
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
//获取读锁的完整版本,处理CAS未命中和tryAcquireShared中未处理的重入读取
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            //有写锁的前提下,线程持有者不是当前线程
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        // 读锁需要阻塞,判断是否当前线程已经获取到读锁
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        //读锁超出最大范围
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS设置读锁成功
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
//尝试获取写锁
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        //写锁的数量
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
//尝试获取读锁
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
    return new ConditionObject();
}

final Thread getOwner() {
    return ((exclusiveCount(getState()) == 0) ?
            null :
            getExclusiveOwnerThread());
}

final int getReadLockCount() {
    return sharedCount(getState());
}

final boolean isWriteLocked() {
    return exclusiveCount(getState()) != 0;
}

final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

final int getReadHoldCount() {
    if (getReadLockCount() == 0)
        return 0;

    Thread current = Thread.currentThread();
    if (firstReader == current)
        return firstReaderHoldCount;

    HoldCounter rh = cachedHoldCounter;
    if (rh != null && rh.tid == getThreadId(current))
        return rh.count;

    int count = readHolds.get().count;
    if (count == 0) readHolds.remove();
    return count;
}
//自定义序列化逻辑
private void readObject(java.io.ObjectInputStream s)
    throws java.io.IOException, ClassNotFoundException {
    s.defaultReadObject();
    readHolds = new ThreadLocalHoldCounter();
    setState(0); // reset to unlocked state
}

final int getCount() { return getState(); }

Sync的实现类

final boolean writerShouldBlock() {
    return false; // writers can always barge
}
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    //头节点不为空且下一个节点也不为空且是独占锁且下一个节点的线程也不为空
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

由于写锁是独占排它锁,所以在非公平锁的情况下,需要调用 AQS 的 #apparentlyFirstQueuedIsExclusive()方法,判断是否当前写锁已经被获取。

final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

ReentrantReadWriteLock

private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;

public ReentrantReadWriteLock() {
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

ReentrantReadWriteLock的默认构造方法初始化了非公平锁的实现,同时也初始化了readerLock,writerLock。

总结

1. Java中tid指的是什么?
线程的tid,这里不直接使用#Thread.getId()是因为#getId()方法不是final,容易被子类覆盖。
2. 什么是锁降级?
锁降级就意味着写锁是可以降级为读锁的,但是需要遵循先获取写锁、获取读锁再释放写锁的次序。注意如果当前线程先获取写锁,然后释放写锁,再获取读锁这个过程不能称之为锁降级,锁降级一定要遵循那个次序。
锁降级中读锁的获取释放为必要?
肯定是必要的。假如当前线程 A 不获取读锁而是直接释放了写锁,这个时候另外一个线程 B 获取了写锁,那么这个线程 B 对数据的修改是不会对当前线程 A 可见的。如果获取了读锁,则线程B在获取写锁过程中判断如果有读锁还没有释放则会被阻塞,只有当前线程 A 释放读锁后,线程 B 才会获取写锁成功。

上一篇下一篇

猜你喜欢

热点阅读