5. StampedLock

2019-03-15  本文已影响0人  shallowinggg

StampedLock是Java 8新增的一个读写锁,它是对ReentrantReadWriteLock的改进。StampedLock的同步状态包含了一个版本和模式,获取锁的方法返回一个stamp表示这个锁的状态;而这些方法的 "try" 版本返回一个特殊值0表示获取锁失败。锁释放和转换的方法需要stamp作为参数,如果stamp不符合锁的同步状态就会失败。StampedLock提供了三种模式的控制:

  1. 独占写模式。writeLock方法可能会在获取共享状态时阻塞,如果成功获取锁,返回一个stamp,它可以作为参数被用在unlockWrite方法中以释放写锁。tryWriteLock的超时与非超时版本都被提供使用。当写锁被获取,那么没有读锁能够被获取并且所有的乐观读锁验证都会失败。

  2. 悲观读模式。readLock方法可能会在获取共享状态时阻塞,如果成功获取锁,返回一个stamp,它可以作为参数被用在unlockRead方法中以释放读锁。tryReadLock的超时与非超时版本都被提供使用。

  3. 乐观读模式。tryOptimisticRead方法只有当写锁没有被获取时会返回一个非0的stamp。在获取这个stamp后直到调用validate方法这段时间,如果写锁没有被获取,那么validate方法将会返回true。这个模式可以被认为是读锁的一个弱化版本,因为它的状态可能随时被写锁破坏。这个乐观模式的主要是为一些很短的只读代码块的使用设计,它可以降低竞争并且提高吞吐量。但是,它的使用本质上是很脆弱的。乐观读的代码区域应当只读取共享数据并将它们储存在局部变量中以待后来使用,当然在使用前要先验证这些数据是否过期,这可以使用前面提到的validate方法。在乐观读模式下的数据读取可能是非常不一致的过程,因此只有当你对数据的表示很熟悉并且重复调用validate方法来检查数据的一致性时使用此模式。例如,当先读取一个对象或者数组引用,然后访问它的字段、元素或者方法之一时上面的步骤都是需要的。

这个类还提供了在三种模式之间转换的辅助方法。例如,tryConvertToWriteLock方法尝试"提升"一个模式,如果已经获取了读锁并且此时没有其他线程获取读锁,那么这个方法返回一个合法的写stamp。这些方法被设计来帮助减少以“重试为主”设计时发生的代码代码膨胀。

示例

下面的类中描述了一些StampedLock的常用用法,它主要操作一个简单的二维点。这个示例在没有异常会抛出的情况下依然沿用使用try-catch块的惯例。

class Point {

    // 成员变量
    private double x, y;

    // 锁实例
    private final StampedLock sl = new StampedLock();

    // 排它锁-写锁(writeLock)
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 一个只读方法
    // 其中存在乐观读锁到悲观读锁的转换
    double distanceFromOrigin() {

        // 尝试获取乐观读锁
        long stamp = sl.tryOptimisticRead();
        // 将全部变量拷贝到方法体栈内
        double currentX = x, currentY = y;
        // 检查在获取到读锁stamp后,锁有没被其他写线程抢占
        if (!sl.validate(stamp)) {
            // 如果被抢占则获取一个共享读锁(悲观获取)
            stamp = sl.readLock();
            try {
                // 将全部变量拷贝到方法体栈内
                currentX = x;
                currentY = y;
            } finally {
                // 释放共享读锁
                sl.unlockRead(stamp);
            }
        }
        // 返回计算结果
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 获取读锁,并尝试转换为写锁
    void moveIfAtOrigin(double newX, double newY) {
        long stamp = sl.tryOptimisticRead();
        try {
            // 如果当前点在原点则移动
            while (x == 0.0 && y == 0.0) {
                // 尝试将获取的读锁升级为写锁
                long ws = sl.tryConvertToWriteLock(stamp);
                // 升级成功,则更新stamp,并设置坐标值,然后退出循环
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    // 读锁升级写锁失败则释放读锁,显示获取独占写锁,然后循环重试
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

介绍了StampedLock的基本用法后,下面开始进行源码分析。

同步节点

StampedLock使用 long 作为同步状态的类型,它使用一个小的有限数作为读锁被获取的2进制位数(目前为7),所以当reader的数量到达上限时,使用一个额外的溢出字来表示溢出。我们通过将最大的reader数量(RBITS)视作一个自旋锁来保护同步状态的溢出更新。

在StampedLock中使用的同步节点与AQS的同步节点有一点不同,下面先看它的常量代表的意义。

> line: 352
    /** 处理器的数量,控制自旋次数 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** 被增加到同步队列前最大的重试次数,至少为1 */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1;

    /** 在头节点处被阻塞前的最大重试次数 */
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1;

    /** 在再次被阻塞前的最大重试次数 */
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 1;

    /** The period for yielding when waiting for overflow spinlock */
    private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1

    /** 读锁被获取的次数的二进制位数 */
    private static final int LG_READERS = 7;

    // Values for lock state and stamp operations
    private static final long RUNIT = 1L;                  // 类似读写锁的RUNIT,意思是每次获取读锁时
                                                           // 同步状态应当增加1
    private static final long WBIT  = 1L << LG_READERS;    // 写状态        10000000
    private static final long RBITS = WBIT - 1L;           // 溢出保护      01111111
    private static final long RFULL = RBITS - 1L;          // 最大reader    01111110
    private static final long ABITS = RBITS | WBIT;        // 掩码          11111111
    private static final long SBITS = ~RBITS;              // 掩码     24(1)10000000

    /*
     * 3种模式可以通过检查区分 (m = stamp & ABITS):
     * 写模式: m == WBIT
     * 乐观读模式: m == 0L (即使读锁已经被持有)
     * 悲观读模式: m > 0L && m <= RFULL (同步状态的拷贝,,但是stamp中的
     * read hold count除了用来决定是哪个模式以外不会被使用)
     *
     * This differs slightly from the encoding of state:
     * (state & ABITS) == 0L 表示锁没有被获取
     * (state & ABITS) == RBITS 这是一个特殊值,表示操作读者bit位的自旋锁溢出
     */

    /** 锁状态的初始值 */
    private static final long ORIGIN = WBIT << 1;          // 1 00000000

    // Special value from cancelled acquire methods so caller can throw IE
    private static final long INTERRUPTED = 1L;

    // 节点状态值; order matters
    private static final int WAITING   = -1;
    private static final int CANCELLED =  1;

    // 节点模式 (使用int而不是boolean以允许运算)
    private static final int RMODE = 0;
    private static final int WMODE = 1;

其中ABITS和SBITS是作为掩码使用的,来快速检查当前锁的状态,在后面读写锁的获取中可以看到它们的使用。使用ORIGIN作为初始值也是与此相关,我们在后面讨论。而读状态正常最多只可以被获取126(RFULL)次,如果超出这个上限,那么其他读线程获取锁时需要在readreaderOverflow记录。因为readreaderOverflow不是个原子变量,所以为了保证它的同步性,需要进行同步处理。

了解了常量值的含义后,开始对同步节点的分析:

> line: 406
static final class WNode {
    volatile WNode prev;      // 前驱节点
    volatile WNode next;      // 后继节点
    volatile WNode cowait;    // 读线程链表
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

/** 队列头节点 */
private transient volatile WNode whead;
/** 队列尾节点 */
private transient volatile WNode wtail;

StampedLock中的同步节点和AQS的几乎一样,只多加了一个cowait字段,同时状态略有不同,还多了个判定是读还是写的mode字段。关于状态只有WAITINGCANCELLED两种,阅读过AQS相信对此不会有疑惑,而cowait的的出现是对AQS的优化。在StampedLock中,读节点不像AQS那样每个读线程都会构造一个自己的节点并加入到同步队列中,而是将许多连续的读节点挂载在一个读节点上,此时同步队列中就不会出现多个连续的读节点,当此读节点获取到锁时,会唤醒在其上挂载的所有读线程,此时其他需要增加到同步队列中的线程无论读写都会帮助头节点唤醒,如此就大大加快了读线程的唤醒速度,具体实现会在后面进行讲解。

写锁的获取与释放

> line: 459
public long writeLock() {
    long next;
    // 当没有悲观读锁或者写锁已经被获取时,能够获取到写锁
    return ((next = tryWriteLock()) != 0L) ? next : acquireWrite(false, 0L);
}

> line: 471
public long tryWriteLock() {
    long s;
    // 进行掩码运算
    return (((s = state) & ABITS) == 0L) ? tryWriteLock(s) : 0L;
}

> line: 442
private long tryWriteLock(long s) {
    // assert (s & ABITS) == 0L;
    long next;
    if (casState(s, next = s | WBIT)) {
        VarHandle.storeStoreFence();
        return next;
    }
    return 0L;
}

我们先明确一件事,同步状态的初始值为100000000(二进制),如果获取到悲观读锁,那么同步状态会加一,ABITS的值为11111111,所以如果 state & ABITS不为0,就表示有线程获取了悲观读锁或者由线程已经获取了写锁,而如果state & ABITS为0,则只可能有线程获取到了乐观读锁,此时线程可以无视乐观读锁然后获取写锁。

下面是获取读锁的超时版本:

> line: 489
public long tryWriteLock(long time, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(time);
    // 如果在尝试获取锁前线程已经被中断,那么直接抛出异常
    if (!Thread.interrupted()) {
        long next, deadline;
        // 成功获取到写锁,直接返回一个 stamp
        if ((next = tryWriteLock()) != 0L)
            return next;
        // 指定超时时间 <=0,直接返回
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

> line: 1231
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 如果当前没有线程获取了悲观读锁或写锁,那么尝试获取写锁
        if ((m = (s = state) & ABITS) == 0L) {
            if ((ns = tryWriteLock(s)) != 0L)
                return ns;
        }
        else if (spins < 0)
            // 如果有线程已经获取了写锁并且队列还未被初始化或者为空,设置自旋次数
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        // 进行忙等待
        else if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        // 如果自旋结束还未获取到锁
        else if ((p = wtail) == null) { // 初始化队列
            // 构建一个节点,并设为头节点以及尾节点
            WNode hd = new WNode(WMODE, null);
            if (WHEAD.weakCompareAndSet(this, null, hd))
                wtail = hd;
        }
        // 如果队列已经被初始化,将自己增加到同步队列中
        else if (node == null)
            node = new WNode(WMODE, p);
        // 入队过程中如果有其他线程已经将自己设置为尾节点,则重新插入到尾端
        else if (node.prev != p)
            node.prev = p;
        // 尝试将自己设置为队列尾节点,成功则跳出循环
        else if (WTAIL.weakCompareAndSet(this, p, node)) {
            p.next = node;
            break;
        }
    }

    boolean wasInterrupted = false;
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果前驱节点是头节点
        if ((h = whead) == p) {
            // 设置自旋次数
            if (spins < 0)
                spins = HEAD_SPINS;
            // 如果小于最大头节点自旋次数,则将自选次数扩大为两倍
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; k > 0; --k) { // 在队列头部一直自旋
                long s, ns;
                // 检查当前是否有线程获取了写锁或者悲观读锁
                if (((s = state) & ABITS) == 0L) {
                    // 如果没有,则尝试获取写锁
                    if ((ns = tryWriteLock(s)) != 0L) {
                        // 获取成功,将自己设为头节点
                        whead = node;
                        node.prev = null;
                        if (wasInterrupted)
                            Thread.currentThread().interrupt();
                        return ns;
                    }
                }
                // 如果当前锁已被占有,那么就自旋
                else
                    Thread.onSpinWait();
            }
        }
        // 如果头节点不为空且不是当前节点的前驱节点
        else if (h != null) { 
            WNode c; Thread w;
            // 如果头节点的cowait!=null,即头节点是一个读节点,那么便帮助它释放在此处
            // 积聚的读线程
            while ((c = h.cowait) != null) {
                if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) &&
                    (w = c.thread) != null)
                    LockSupport.unpark(w);
            }
        }
        // 如果在执行上面操作时头节点没有被改变
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            // 将前驱节点设置为WAITING
            else if ((ps = p.status) == 0)
                WSTATUS.compareAndSet(p, 0, WAITING);
            // 如果前驱节点被取消了,则将此节点移除同步队列,与更前面的节点建立联系
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                // 如果超时时间已经到达,取消此节点
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                node.thread = wt;
                // 阻塞当前线程
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p) {
                    if (time == 0L)
                        LockSupport.park(this);
                    else
                        LockSupport.parkNanos(this, time);
                }
                node.thread = null;
                if (Thread.interrupted()) {
                    if (interruptible)
                        return cancelWaiter(node, node, true);
                    wasInterrupted = true;
                }
            }
        }
    }
}

上面获取写状态的方法很复杂,我们将其逐个块分析。首先此方法包含了两个死循环,第一个循环主要的操作就是将自己加入到同步队列中,而第二个循环的主要目的是阻塞自己。

下面先说第一个循环,在这个循环中执行时,一直都在伴随着自旋,在将自己加入到同步队列时,也在尝试能否获取到同步状态。

  1. 第一个if分支 if ((m = (s = state) & ABITS) == 0L)。 每次都会检查同步状态以查看是否有资格去获取写锁,而失败的原因只会是与其他线程竞争写锁时失败了,自此以后它就没有资格去获取了,直到锁再次为空,这就是第一个if分支的用处。
  2. 第二个if分支 spins < 0。 如果第一个if判断失败,那么证明锁已经被其他线程持有了,不管是读锁还是写锁。此时设置自旋次数,准备自旋获取锁,这也是StampedLock相对于过去的ReentrantLock做出的改变,因为很多时候在自旋的这段时间内其他线程会释放锁,所以此时就能更快的获取到锁,而非通过等待/通知机制等待唤醒。当然,如果是其他线程获取了悲观读锁,那么就将自旋次数设置为0,以让它们能够有时间读取共享数;或者当前同步队列中已经有不止一个节点了,那么根据FIFO原则,需要让前面的线程先获取写锁,所以自旋次数也为0.
  3. 第三个if分支 spins > 0。 在第二个if分支设置了自旋次数后,线程便可以开始自旋了,每自旋一次spins减1。经过这两步之后,主要的自旋过程已经完成,后面在加入到同步队列中时,只会每次循环时再尝试一次,相当于多几次尝试的机会。
  4. 第四个if分支 (p = wtail) == null。 当自旋次数用完后线程还没有获取到写锁,那么就尝试将自己增加到同步队列中。如果队列还没有被初始化,就构造一个节点并将其设置为头节点以及尾节点。
  5. 第五个if分支 node == null。 如果同步队列已经被初始化了,那么就构造一个写节点,前驱节点指向当前的尾节点。
  6. 第六个if分支 node.prev != p。在第五步后,这个节点并没有被真正增加到同步队列中,因为前驱节点的next字段还没有指向它。在此期间,可能有其他节点在它之前成功插入到了同步队列中,此时队列的尾节点已经被改变,而不是在线程堆栈中保存的尾节点p,所以将前驱节点指向新的尾节点,避免队列混乱。
  7. 第七个分支 WTAIL.weakCompareAndSet(this, p, node)。 如果尾节点并未发生改变,那么便尝试CAS将自己设置为队列的尾节点并使前驱节点的next字段指向自己,如果成功了,就跳出此循环,否则就重新设置前驱节点然后再次尝试。至此,第一个循环结束。

第一个循环完成之后,当前线程已经成功插入到了同步队列中,第二个循环可能依然会自旋获取同步状态,如果不就将自己阻塞。

  1. 第一个if分支 (h = whead) == p。 此时p保存的还是在第一个循环中记录的上一个尾节点,即当前节点的前驱结点。如果前驱结点为头节点,那么便可以再次尝试获取同步状态,因为此时有很大几率能够成功。下面的if分支建立在此if分支的基础上。

在设置了自旋次数后,开始尝试获取同步状态。此处的for循环的唯一功能就是获取同步状态,如果锁目前被其他线程占有,那么进入忙等待,否则执行tryWriteLock方法进行CAS竞争。

  1. 第二个if分支 h != null。 如果前驱节点不是头节点,并且头节点是读节点,那么就帮助唤醒积聚的等待线程。注意,如果读线程获取锁失败,当同步队列的尾节点是读节点时,它便不会将自己插入到同步队列中,而是直接挂载在那个尾节点,当那个节点成为头节点时,就会唤醒在其上挂载的所有读线程,此处就是帮助读节点更快唤醒那些线程。

在完成了上面两个分支的工作后(不管做的是哪一个),如果头节点没有发生变化,那么就执行下面的操作,否则进入下一个循环重新开始。

虽然此方法看起来比较复杂,但是逻辑清晰,还是比较容易理解。其复杂性主要就是自旋优化导致的。

下面是此方法的流程图:

loop1 loop2

下面是几种写节点加入同步队列的常见情况:

下面是获取锁的响应中断版本:

> line: 517
public long writeLockInterruptibly() throws InterruptedException {
    long next;
    if (!Thread.interrupted() &&
        (next = acquireWrite(true, 0L)) != INTERRUPTED)
        return next;
    throw new InterruptedException();
}

整体上一样,只是多了个响应中断。

锁释放时需要将加锁时返回的stamp作为参数。

> line: 678
public void unlockWrite(long stamp) {
    // 如果同步状态和stamp不相符,抛出异常
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    unlockWriteInternal(stamp);
}

> line: 661
private long unlockWriteInternal(long s) {
    long next; WNode h;
    // 更改同步状态
    STATE.setVolatile(this, next = unlockWriteState(s));
    // 如果同步队列中有节点等待,并且自旋获取同步状态失败被阻塞(status!=0),释放它
    if ((h = whead) != null && h.status != 0)
        release(h);
    return next;
}

> line: 657
// 返回一个未加锁状态,增加一个版本并避免同步状态为 0
private static long unlockWriteState(long s) {
    return ((s += WBIT) == 0L) ? ORIGIN : s;
}

> line: 1208
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        WSTATUS.compareAndSet(h, WAITING, 0);
        // 如果第一个等待节点为null或者被取消了,那么从队尾开始逆向寻找
        if ((q = h.next) == null || q.status == CANCELLED) {
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            LockSupport.unpark(w);
    }
}

从写锁释放的源码中可以看出,写锁释放的时候,同步状态会增加WBIT,相当于记录写锁总共被获取的次数,当其增加到上限溢出时,重置同步状态为ORIGIN

读锁的获取与释放

> line: 532
public long readLock() {
    long s, next;
    // 如果同步队列为空并且没有线程获取了写锁并且CAS成功,返回stamp
    return (whead == wtail
            && ((s = state) & ABITS) < RFULL
            && casState(s, next = s + RUNIT))
        ? next
        : acquireRead(false, 0L);
}

> line: 1339
private long acquireRead(boolean interruptible, long deadline) {
    boolean wasInterrupted = false;
    WNode node = null, p;
    for (int spins = -1;;) {
        WNode h;
        // 如果同步队列为空或者只有一个头节点
        if ((h = whead) == (p = wtail)) {
            for (long m, s, ns;;) {
                // 如果没有线程获得写锁并且读锁数量未达到上限,进行CAS竞争
                // 如果读锁数量达到上限,尝试增加读锁数量溢出,如果成功,返回
                if ((m = (s = state) & ABITS) < RFULL ?
                    casState(s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    if (wasInterrupted)
                        Thread.currentThread().interrupt();
                    return ns;
                }
                // 如果有线程获取了写锁
                else if (m >= WBIT) {
                    // 如果自旋次数大于0,进行忙等待
                    if (spins > 0) {
                        --spins;
                        Thread.onSpinWait();
                    }
                    else {
                        // 如果自旋次数用完,并且同步队列未发生变化,跳出循环
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 设置自旋次数
                        spins = SPINS;
                    }
                }
            }
        }
        if (p == null) { // 初始化队列
            WNode hd = new WNode(WMODE, null);
            if (WHEAD.weakCompareAndSet(this, null, hd))
                wtail = hd;
        }
        // 如果队列已经初始化,创建一个读节点,插入到队列尾部
        else if (node == null)
            node = new WNode(RMODE, p);
        // 如果队列只有一个节点或者尾节点为写节点
        else if (h == p || p.mode != RMODE) {
            // 如果在自旋期间有其他节点成功插入到队列尾部,将prev字段设置为新的尾节点
            if (node.prev != p)
                node.prev = p;
            // 否则,设置当前节点为尾节点,并使前驱节点的next字段指向自己,跳出循环
            else if (WTAIL.weakCompareAndSet(this, p, node)) {
                p.next = node;
                break;
            }
        }
        // 如果尾节点为读节点,将其cowait设置为当前节点,注意这是一个类似链表插入的操作
        else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node))
            node.cowait = null;
        // 如果上一步CAS成功
        else {
            for (;;) {
                WNode pp, c; Thread w;
                // 如果头节点为读节点,尝试释放此节点上挂载的所有节点
                if ((h = whead) != null && (c = h.cowait) != null &&
                    WCOWAIT.compareAndSet(h, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    LockSupport.unpark(w);
                // 响应中断
                if (Thread.interrupted()) {
                    if (interruptible)
                        return cancelWaiter(node, p, true);
                    wasInterrupted = true;
                }
                // 如果挂载节点的前驱是头节点或者挂载节点为头节点,即现在可以尝试获取读锁
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            casState(s, ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L)) {
                            if (wasInterrupted)
                                Thread.currentThread().interrupt();
                            return ns;
                        }
                    } while (m < WBIT);
                }
                // 如果在前面执行完队列头节点未发生变化
                if (whead == h && p.prev == pp) {
                    long time;
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 计算超时时间,并阻塞自己
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L) {
                        if (wasInterrupted)
                            Thread.currentThread().interrupt();
                        return cancelWaiter(node, p, false);
                    }
                    Thread wt = Thread.currentThread();
                    node.thread = wt;
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp) {
                        if (time == 0L)
                            LockSupport.park(this);
                        else
                            LockSupport.parkNanos(this, time);
                    }
                    node.thread = null;
                }
            }
        }
    }

    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果前驱节点是头节点
        if ((h = whead) == p) {
            // 设置自旋数
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins;;) { // 在头部自旋,尝试获取锁
                long m, s, ns;
                if ((m = (s = state) & ABITS) < RFULL ?
                    casState(s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    while ((c = node.cowait) != null) {
                        if (WCOWAIT.compareAndSet(node, c, c.cowait) &&
                            (w = c.thread) != null)
                            LockSupport.unpark(w);
                    }
                    if (wasInterrupted)
                        Thread.currentThread().interrupt();
                    return ns;
                }
                else if (m >= WBIT && --k <= 0)
                    break;
                else
                    Thread.onSpinWait();
            }
        }
        // 如果头节点不为空并且前驱节点不为头节点,如果头节点是读节点
        // 帮助唤醒挂载的读线程
        else if (h != null) {
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (WCOWAIT.compareAndSet(h, c, c.cowait) &&
                    (w = c.thread) != null)
                    LockSupport.unpark(w);
            }
        }
        // 如果头节点未发生改变
        if (whead == h) {
            // 如果前驱节点发生了改变,更新节点引用
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            // 如果前驱节点状态是初始状态,将其设置为WAITING
            else if ((ps = p.status) == 0)
                WSTATUS.compareAndSet(p, 0, WAITING);
            // 如果前驱节点被取消,移除此节点并更新节点引用
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            // 否则,阻塞自己
            else {
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p) {
                        if (time == 0L)
                            LockSupport.park(this);
                        else
                            LockSupport.parkNanos(this, time);
                }
                node.thread = null;
                if (Thread.interrupted()) {
                    if (interruptible)
                        return cancelWaiter(node, node, true);
                    wasInterrupted = true;
                }
            }
        }
    }
}

> line: 1157
private long tryIncReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    // 如果刚好到达上限,则将同步状态的最后七位设置为RBITS,用作屏蔽其他读线程以原子更新readOverflow
    if ((s & ABITS) == RFULL) {
        if (casState(s, s | RBITS)) {
            // readerOverflow记录当读线程数量饱和时的溢出数
            ++readerOverflow;
            // 更新完readerOverflow后将state重置回RFULL
            // 此时可以让下一个线程来更新readerOverflow
            STATE.setVolatile(this, s);
            return s;
        }
    }
    // 否则,如果随机数&111==0,调度线程
    else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    // 否则进行忙等待
    else
        Thread.onSpinWait();
    return 0L;
}

读状态的获取相比写状态更加复杂,主要原因便是StampedLock的读节点不像ReentrantReadWriteLock那样每个线程构造一个节点,而是如果当前尾节点是写节点,那么就构造一个读节点加入队列尾端,如果尾节点是读节点,就不会再将新构造的读节点加入到队列尾部了,而是直接挂载在前面那个读节点上,当那个节点成为头节点时,唤醒挂载在上面的所有读线程。

整个方法依然被分为两个大循环,不过这两个循环的功能交错,下面一个一个进行分析,首先说第一个大循环。

  1. 第一个if (h = whead) == (p = wtail)。 如果队列还没有被初始化或者只有一个头节点,那么尝试获取读状态。因为此时很有可能获取到读锁,所以即使现在读线程数量已经到达上限,依然继续尝试。但如果已经有线程获取了写锁,那么就自旋 1<<6 == 64 次进行尝试。如果自旋结束还没有获取到锁,那么退出获取锁的循环。
  2. 第一个if分支 p == null。 如果当前队列没有被初始化,那么就初始化队列,此步写锁获取中已经说过,不再赘述,后面与写锁获取类似的功能都不在赘述。
  3. 第二个if分支 node == null。 构造一个读节点,设置prev字段为队尾。
  4. 第三个if分支 h == p || p.mode != RMODE。 如果前驱节点为头节点或者前驱节点的写模式,如果当前节点的prev字段被改变,那么就重新设置prev字段,否则将此节点设置为队尾,并跳出第一个大循环。
  5. 第四个if分支 !WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node)。 如果前驱结点不是头节点并且是一个读节点,那么尝试将自己挂载到此节点上。
  6. 第五个if分支。 如果第五步CAS成功,则进入最后一个分支,否则循环CAS设置。这个if分支由多个if语句组成,下面逐个分析。

第二个大循环:

  1. (h = whead) == p。 如果前驱节点是头节点,那么再次自旋尝试获取同步状态。如果获取成功,将自己设置为头节点,并唤醒挂载节点。
  2. h != null。 如果头节点不为空且是读节点,帮助唤醒线程。
  3. whead == h。如果经过上面的步骤后头节点未发生变化,那么就将自己阻塞。

下面是此方法的流程图:

loop 1 loop 2

下面展示一下读线程插入到同步队列的几种常见情况,注意读节点与写节点的内部构造是一样的,此处省略写节点的cowait字段。

StampedLock-read1.jpg StampedLock-read2.jpg StampedLock-read3.jpg

关于读锁获取的其他版本大体差不多,不再赘述,此处只贴出源码。

> line: 549
public long tryReadLock() {
    long s, m, next;
    while ((m = (s = state) & ABITS) != WBIT) {
        if (m < RFULL) {
            if (casState(s, next = s + RUNIT))
                return next;
        }
        else if ((next = tryIncReaderOverflow(s)) != 0L)
            return next;
    }
    return 0L;
}

> line: 576
public long tryReadLock(long time, TimeUnit unit)
    throws InterruptedException {
    long s, m, next, deadline;
    long nanos = unit.toNanos(time);
    if (!Thread.interrupted()) {
        if ((m = (s = state) & ABITS) != WBIT) {
            if (m < RFULL) {
                if (casState(s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        if ((next = acquireRead(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

> line: 610
public long readLockInterruptibly() throws InterruptedException {
    long s, next;
    if (!Thread.interrupted()
        // bypass acquireRead on common uncontended case
        && ((whead == wtail
             && ((s = state) & ABITS) < RFULL
             && casState(s, next = s + RUNIT))
            ||
            (next = acquireRead(true, 0L)) != INTERRUPTED))
        return next;
    throw new InterruptedException();
}

读锁释放代码如下:

> line: 693
public void unlockRead(long stamp) {
    long s, m; WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)
           && (stamp & RBITS) > 0L
           && ((m = s & RBITS) > 0L)) {
        if (m < RFULL) {
            if (casState(s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            return;
    }
    throw new IllegalMonitorStateException();
}

> line: 1179
private long tryDecReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    if ((s & ABITS) == RFULL) {
        if (casState(s, s | RBITS)) {
            int r; long next;
            // 优先减少溢出记录
            if ((r = readerOverflow) > 0) {
                readerOverflow = r - 1;
                next = s;
            }
            else
                next = s - RUNIT;
            STATE.setVolatile(this, next);
            return next;
        }
    }
    else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    else
        Thread.onSpinWait();
    return 0L;
}

乐观读锁的获取

乐观读锁只需要获取,不需要释放。只要没有线程获取写锁,那么就能获取到乐观读锁。当然,当使用由乐观读锁读取的共享数据时,需要先调用validate方法验证数据是否过期,只需要判断从获取数据到现在这段时间内是否有线程获取读锁即可。从此可以看出,乐观读锁可以很好的提高吞吐量,但是它的使用也非常不稳定,如果对数据的了解与控制不清晰,那么很容易出现脏读问题。

> line: 629
public long tryOptimisticRead() {
    long s;
    // 只要没有线程获取写锁,那么就能成功获取乐观读锁
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

> line: 646
public boolean validate(long stamp) {
    VarHandle.acquireFence();
    // 验证是否有线程获取到写锁
    return (stamp & SBITS) == (state & SBITS);
}

读写锁之间的转换

除了提供了乐观读锁外,StampedLock相比ReentrantReadWriteLock还提供了读写锁之间的转换,而ReentrantReadWriteLock只提供了写锁到读锁的锁降级特性。

  1. 尝试转换为写锁

此时有三种情况,乐观读锁转换为读锁,悲观读锁转换为读锁,写锁转换为写锁。

>line: 739
public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    // 验证stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        // 当前没有悲观读锁或写锁
        if ((m = s & ABITS) == 0L) {
            // 但是提供的stamp记录不相符,失败
            if (a != 0L)
                break;
            // 尝试获取写锁,成功则返回
            if ((next = tryWriteLock(s)) != 0L)
                return next;
        }
        // 如果当前线程是已经获取了写锁的线程
        else if (m == WBIT) {
            if (a != m)
                break;
            return stamp;
        }
        // 如果此时只有自己一个读线程获取了悲观读锁
        else if (m == RUNIT && a != 0L) {
            if (casState(s, next = s - RUNIT + WBIT)) {
                VarHandle.storeStoreFence();
                return next;
            }
        }
        else
            break;
    }
    return 0L;
}

  1. 尝试转换为悲观读锁

尝试转换为悲观读锁依然有三种情况。

> line: 776
public long tryConvertToReadLock(long stamp) {
    long a, s, next; WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((a = stamp & ABITS) >= WBIT) {
            // write stamp
            if (s != stamp)
                break;
            STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT);
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        else if (a == 0L) {
            // optimistic read stamp
            if ((s & ABITS) < RFULL) {
                if (casState(s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        else {
            // already a read stamp
            if ((s & ABITS) == 0L)
                break;
            return stamp;
        }
    }
    return 0L;
}
  1. 尝试转换为乐观读锁。
> line: 817
public long tryConvertToOptimisticRead(long stamp) {
    long a, m, s, next; WNode h;
    VarHandle.acquireFence();
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((a = stamp & ABITS) >= WBIT) {
            // write stamp
            if (s != stamp)
                break;
            return unlockWriteInternal(s);
        }
        else if (a == 0L)
            // already an optimistic read stamp
            return stamp;
        else if ((m = s & ABITS) == 0L) // invalid read stamp
            break;
        else if (m < RFULL) {
            if (casState(s, next = s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return next & SBITS;
            }
        }
        else if ((next = tryDecReaderOverflow(s)) != 0L)
            return next & SBITS;
    }
    return 0L;
}
上一篇下一篇

猜你喜欢

热点阅读