Java 并发程序员

【Java 并发笔记】ReentrantReadWriteLoc

2018-12-23  本文已影响12人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. ReentrantReadWriteLock 概述

公平选择性

public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
}

重入性

读写锁的状态低 16 位为写锁,高 16 位为读锁

锁降级

condition 支持

锁中断

1.1 ReadWriteLock 接口

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading.
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing.
     */
    Lock writeLock();
}
方法 说明
readLock() 用来获取读锁。
writeLock() 用来获取写锁。

1.2 ReentrantReadWriteLock 实现原理

ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.readLock().lock();
System.out.println("get readLock.");
rtLock.writeLock().lock();
System.out.println("blocking");
ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.writeLock().lock();
System.out.println("writeLock");
rtLock.readLock().lock();
System.out.println("get read lock");=

1.2.1 写锁的获取

//获取写锁
public void lock() {
    sync.acquire(1);
}

//AQS 实现的独占式获取同步状态方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//自定义重写的 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
    //当前线程
    Thread current = Thread.currentThread();
    //获取状态
    int c = getState();
    //写线程数量(即获取独占锁的重入数)
    int w = exclusiveCount(c);
    
    //当前同步状态 state != 0,说明已经有其他线程获取了读锁或写锁
    if (c != 0) {
        // 当前 state 不为 0,此时:如果写锁状态为 0 说明读锁此时被占用返回 false。
        // 如果写锁状态不为 0 且写锁没有被当前线程持有返回 false。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        
        //判断同一线程获取写锁是否超过最大次数(65535),支持可重入。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //更新状态
        //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
        setState(c + acquires);
        return true;
    }
    
    //到这里说明此时 c=0,读锁和写锁都没有被获取。
    //writerShouldBlock 表示是否阻塞
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    
    //设置锁为当前线程所有
    setExclusiveOwnerThread(current);
    return true;
}
static final int SHARED_SHIFT   = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
1 & 1111111111111111 = 1
获取写锁的步骤
//FairSync 中需要判断是否有前驱节点,如果有则返回 false,否则返回 true。遵循 FIFO。
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

//NonfairSync 中直接返回 false,可插队。
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

1.2.2 写锁的释放

//写锁释放
public void unlock() {
    sync.release(1);
}

//AQS 提供独占式释放同步状态的方法
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//自定义重写的 tryRelease 方法
protected final boolean tryRelease(int releases) {
    //若锁的持有者不是当前线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //写锁的新线程数
    int nextc = getState() - releases;
    //如果独占模式重入数为 0 了,说明独占模式被释放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        //若写锁的新线程数为 0,则将锁的持有者设置为null
        setExclusiveOwnerThread(null);
    //设置写锁的新线程数
    //不管独占模式是否被释放,更新独占重入数
    setState(nextc);
    return free;
}
释放写锁的步骤

1.2.3 读锁的获取

public void lock() {
    sync.acquireShared(1);
}

//使用 AQS 提供的共享式获取同步状态的方法
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

//自定义重写的 tryAcquireShared 方法,参数是 unused,因为读锁的重入计数是内部维护的。
protected final int tryAcquireShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 获取状态
    int c = getState();
    
    //如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 读锁数量
    int r = sharedCount(c);
    /*
     * readerShouldBlock():读锁是否需要等待(公平锁原则)
     * r < MAX_COUNT:持有线程小于最大数(65535)
     * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
     */
     // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //r == 0,表示第一个读锁线程,第一个读锁 firstRead 是不会加入到 readHolds 中。
        if (r == 0) { // 读锁数量为 0
            // 设置第一个读线程
            firstReader = current;
            // 读线程占用的资源数为 1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
            // 占用资源数加 1
            firstReaderHoldCount++;
        } else { // 读锁数量不为 0 并且不为当前线程
            // 获取计数器
            HoldCounter rh = cachedHoldCounter;
            // 计数器为空或者计数器的 tid 不为当前正在运行的线程的 tid。
            if (rh == null || rh.tid != getThreadId(current)) 
                // 获取当前线程对应的计数器
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0) // 计数为 0
                //加入到 readHolds 中
                readHolds.set(rh);
            //计数+1
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
读锁的获取步骤
//FairSync 中需要判断是否有前驱节点,如果有则返回 false,否则返回 true。遵循 FIFO。
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
//当 head 节点不为 null 且 head 节点的下一个节点 s 不为 null 且 s 是独占模式(写线程)且 s 的线程不为 null 时,返回 true。
//目的是不应该让写锁始终等待。作为一个启发式方法用于避免可能的写线程饥饿,这只是一种概率性的作用,因为如果有一个等待的写线程在其他尚未从队列中出队的读线程后面等待,那么新的读线程将不会被阻塞。
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}
final int fullTryAcquireShared(Thread current) {

    HoldCounter rh = null;
    for (;;) { // 无限循环
        // 获取状态
        int c = getState();
        if (exclusiveCount(c) != 0) { // 写线程数量不为0
            if (getExclusiveOwnerThread() != current) // 不为当前线程
                return -1;
        } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) { // 当前线程为第一个读线程
                // assert firstReaderHoldCount > 0;
            } else { // 当前线程不为第一个读线程
                if (rh == null) { // 计数器不为空
                    // 
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功
            if (sharedCount(c) == 0) { // 读线程数量为0
                // 设置第一个读线程
                firstReader = current;
                // 
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

1.2.4 读锁的释放

public  void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    if (firstReader == current) { // 当前线程为第一个读线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1) // 读线程占用的资源数为1
            firstReader = null;
        else // 减少占用的资源
            firstReaderHoldCount--;
    } else { // 当前线程不为第一个读线程
        // 获取缓存的计数器
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
            // 获取当前线程对应的计数器
            rh = readHolds.get();
        // 获取计数
        int count = rh.count;
        if (count <= 1) { // 计数小于等于1
            // 移除
            readHolds.remove();
            if (count <= 0) // 计数小于等于0,抛出异常
                throw unmatchedUnlockException();
        }
        // 减少计数
        --rh.count;
    }
    for (;;) { // 无限循环
        // 获取状态
        int c = getState();
        // 获取状态
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc)) // 比较并进行设置
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
读锁释放的步骤
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

static final class HoldCounter {
    int count = 0;
    final long tid = Thread.currentThread().getId();
}

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

1.2.5 锁降级

class CachedData {
   Object data;
     //保证状态可见性
   volatile boolean cacheValid;
   ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     //读锁获取
     rwl.readLock().lock();
     if (!cacheValid) {
        // 在获取写锁前必须释放读锁
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        //再次检查其他线程是否已经抢到  
        if (!cacheValid) {
           //获取数据
          data = ...
          cacheValid = true;
        }
        //获取读锁。在写锁持有期间获取读锁
        //此处获取读锁,是为了防止,当释放写锁后,又有一个线程T获取锁,对数据进行改变,而当前线程下面对改变的数据无法感知。
        //如果获取了读锁,则线程T则被阻塞,直到当前线程释放了读锁,那个T线程才有可能获取写锁。
        rwl.readLock().lock();
        //释放写锁,保持读锁
        rwl.writeLock().unlock();
     }
     //锁降级完成
     try {
               //使用数据的流程
               use(data);
     } finally{
                //释放读锁
                readLock.unlock();
     }
   }
}

1.3 总结

class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock();
        try { return m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
 }

参考资料

https://www.cnblogs.com/xiaoxi/p/9140541.html

上一篇 下一篇

猜你喜欢

热点阅读