java多线程-7-ReentrantReadWriteLock

2019-09-30  本文已影响0人  宠辱不惊的咸鱼

概述

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

公平与非公平

非公平

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // writers can always barge 闯入 // 写锁总是可以抢,避免饥饿吧
    }
    final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); }
}

// 第1个节点有效 && 等写锁
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
}

公平

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
    final boolean readerShouldBlock() { return hasQueuedPredecessors(); }
}

// 首尾不等 && 第1节点非自己,需要block
public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

构造函数

public ReentrantReadWriteLock() {
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

public static class ReadLock implements Lock, java.io.Serializable
public static class WriteLock implements Lock, java.io.Serializable

state属性

static final int SHARED_SHIFT   = 16;                      // 读锁占用位数
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);     // 读锁线程+1时的递增单位
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; // 读锁最大线程数
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 写锁重入掩码,16个1
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } // 读锁线程
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 写锁重入数

线程持锁信息

private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

if (r == 0) { // 读锁线程数sharedCount(state)
    firstReader = current; // 第1读锁线程
    firstReaderHoldCount = 1; // 第1读锁线程重入数
} else if (firstReader == current) {
    firstReaderHoldCount++;
} else {
    HoldCounter rh = cachedHoldCounter; // 最新读锁线程持锁信息(count, tid)
    if (rh == null || rh.tid != getThreadId(current))
        cachedHoldCounter = rh = readHolds.get(); // cache不是当前线程,用当前线程覆盖cache
    else if (rh.count == 0)
        readHolds.set(rh); // cache是当前线程,上次释锁后还未有读线程占有过
    rh.count++;
}

ReadLock

lock()

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1; // 其它线程持有写锁,失败
    // 能到这,要么写锁未被持;要么当前线程持写锁
    int r = sharedCount(c);
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        // 能到这,说明reader不需block && 读锁线程未超 && CAS更新读锁线程数成功
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
// NonfairSync
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
}
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1; // 写锁被占且非当前线程,超小概率
            // else we hold the exclusive lock; blocking here would cause deadlock.
        } else if (readerShouldBlock()) {
            // 进入这,说明写锁未被持 && reader should block
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) { // 明显重入
                // assert firstReaderHoldCount > 0;
            } else { // 寻找非明显重入
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove(); // remove后回归初始值
                    }
                }
                if (rh.count == 0)
                    return -1; // 非重入,失败
            }
        }
        // 能到这,要么当前线程持写锁;要么当前线程读锁重入
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded"); // 上面已判,所以超小概率
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh); // 为0为什么要刷入,默认值不是一样?
                rh.count++;
                cachedHoldCounter = rh;
            }
            return 1;
        }
    }
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) { // 只有同步队列第1节点才可尝试持锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 进行自我阻塞
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared(); // 进入这里,表明下一节点非写锁
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

// 从队尾开始,找到最靠前的有效节点,唤醒线程;被唤醒线程会调用setHeadAndPropagate传递下去
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

unlock

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) { // 当前线程为第1线程,改first信息
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else { // 非第1线程,若是cache改cache,不是cache改ThreadLocal
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove(); // unlock后不再持锁,去除,help GC
            if (count <= 0)
                throw unmatchedUnlockException(); // 无锁可unlock,异常
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0; // 当读写锁均为空时,才会开启自旋
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

WriteLock

lock()

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires); // 写锁重入,所以不需CAS
        return true;
    }
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

unlock()

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
上一篇下一篇

猜你喜欢

热点阅读