AQS源码浅析(5)——锁方法

2022-04-07  本文已影响0人  墨_0b54

acquireQueued条件等待方法及独占模式获取

final boolean acquireQueued(final Node node, int arg) { //自旋获得锁,会阻塞当前线程
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//前驱节点
            if (p == head && tryAcquire(arg)) {//前驱节点是头节点就获取锁
                setHead(node); //成功获取锁(其他线程失败,都会阻塞),当前node出队
                p.next = null; // help GC
                failed = false;
                return interrupted; //如果在竞争锁的过程中发生了中断,返回true
            }
            //线程获取锁失败后的操作
            if (shouldParkAfterFailedAcquire(p, node) &&//判断线程获取锁失败后,是否应该阻塞
                parkAndCheckInterrupt()) //阻塞线程,并在被唤醒后检查是否被中断
                interrupted = true;
        }
    } finally {
        if (failed) // tryAcquire发生未知的错误导致线程退出自旋,shouldParkAfterFailedAcquire、parkAndCheckInterrupt应该不会出错
            cancelAcquire(node); //取消当前线程
    }
}

shouldParkAfterFailedAcquire检查和更新获取锁失败的节点的状态

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //pred节点是SIGNAL状态表示node应该被唤醒,当前node可以安全的阻塞,返回true
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) { //跳过CANCELLED状态的前驱节点
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {//pred的waitStatus一定是0 或者 PROPAGATE,说明当前节点不需要阻塞,调用者需要重试
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //前驱节点状态设置为SIGNAL
    }
    return false;
}

cancelAcquire取消节点尝试acquire

取消当前节点,如果节点在队列的中间就从队列中去掉,否则直接唤醒一个有效的后继节点

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;//清除线程对象
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0) //跳过已取消的前驱节点
        node.prev = pred = pred.prev;
    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;//不会有其他线程干扰,这一步后其他线程会跳过
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { //如果node已经在队尾,设置尾节点
        compareAndSetNext(pred, predNext, null);//更新pred的next指针为null
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head && //确定node前驱节点不是head,说明现在node已经不在队首了
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //前驱状态小于0,或者将未取消的前驱设置为SIGNAL状态(标记后继节点需要唤醒)
            pred.thread != null) {//前驱拥有线程对象
            Node next = node.next;//if判断前驱节点不是虚节点
            if (next != null && next.waitStatus <= 0) //判断后继有效
                compareAndSetNext(pred, predNext, next);//连接node的前驱和后继
        } else {//如果node的前驱是虚节点,唤醒一个后继
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

cancelAcquire与shouldParkAfterFailedAcquire一样,都有一段跳过CANCELLED状态的代码如下:

Node pred = node.prev;
while (pred.waitStatus > 0) //跳过已取消的前驱节点
    node.prev = pred = pred.prev;

为什么这里不需要用CAS呢?

这也是cancelAcquire的逻辑:

doAcquireNanos和doAcquireInterruptibly

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared共享不可中断模式获取锁

共享模式的acquireQueued,doAcquireShared与acquireQueued的区别是会传播唤醒后继节点

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//共享节点入队
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {//代表获取成功
                    setHeadAndPropagate(node, r);//node变成头节点,传播唤醒后继
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly和doAcquireSharedNanos

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读