ReentrantLock condition 源码分析

2020-11-27  本文已影响0人  想起个帅气的头像

本篇主要介绍ReentrantLock 中 condition的await/signal方法的实现原理。

想忽略整个分析过程可以直接跳到结尾看总结。

使用说明

public void foo() throws InterruptedException {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        reentrantLock.lock();
            condition.await();
            //....
            condition.signal();
        reentrantLock.unlock();
    }

当前线程在获取到锁后,通过await来让自己进入park阻塞状态、加入等待队列,并释放锁。
signal方法将其他在等待队列中,处于park状态下的线程唤醒,并尝试竞争锁。

源码分析

await() #1

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 第一部分
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

await方法的代码比较多,可以拆分成两部分。第一部分如何让当前线程park。第二部分是线程被unpark后的实现。

第一部分:

addConditionWaiter()

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

主要目的是将线程构建成Conditon模式下的Node,加入到队列中。
首先,队列为空,firstWaiter和lastWaiter都为null。当第一个node创建成功后,firstWaiter和lastWaiter都指向这个node。后续再来节点,则让node.next 指向新节点,lastWaiter也指向新节点。如此构建一个带有头尾指针的单向链表。

再看方法里第二行的if判断,因为进入到condition队列的node一定都是condition(-2)状态,如果不是,则说明当前node所属线程已经处理了其他的逻辑。一般是cancel状态。此时要从链表中去掉cancel态的节点。

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

unlinkCancelledWaiters

/**
         * Unlinks cancelled waiter nodes from condition queue.
         * Called only while holding lock. This is called when
         * cancellation occurred during condition wait, and upon
         * insertion of a new waiter when lastWaiter is seen to have
         * been cancelled. This method is needed to avoid garbage
         * retention in the absence of signals. So even though it may
         * require a full traversal, it comes into play only when
         * timeouts or cancellations occur in the absence of
         * signals. It traverses all nodes rather than stopping at a
         * particular target to unlink all pointers to garbage nodes
         * without requiring many re-traversals during cancellation
         * storms.
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

总而言之做了一件事,将非condition状态的node从链表中去掉。此时lastWaiter一定是condition状态,赋值给t。

fullyRelease()

/**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

savedState表示的是重入的次数,可能1次,也可能多次,这里一次性全部释放掉,将全局的state=0,exclusiveOwnerThread=null。并且通过unparkSuccessor获取同步队列中的下一个node。具体过程已经在ReentrantLock源码分析中做了说明。
简而言之就是当前线程释放锁,让同步队列的下一个node开始抢占。

isOnSyncQueue()

/**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    /**
     * Returns true if node is on sync queue by searching backwards from tail.
     * Called only when needed by isOnSyncQueue.
     * @return true if present
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

此时判断node是不是已经在sync队列中,判断的标准是waitStatus、prev和next,以及从tail倒序查找。
这里关于倒序查询有很大一段注释,大意是说单纯判断node.prev是not null,并不能代表在node已经在sync队列中。需要从sync队列中的tail倒序查询,并且说明了node大概率在tail附近,不会有太多性能损耗。

cas在替换prev时可能失败,也就是我下面贴的入队的方法实现。因为prev是volatile的,会直接可见,但是compareAndSetTail可能会失败,从而导致没有成功入队。


如果node并没有在sync队列中,则被park。

      while (!isOnSyncQueue(node)) {
          LockSupport.park(this);
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }

至此第一部分说明完成,当前占有锁的线程被添加到了condition queue中,释放锁被处于park状态。

第二部分:

既然线程已经被park了,就先说明是如何被unpark的。一般来说我们都是配置signal(signalAll)一起使用。先分析下signal().

signal()

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

如果condition队列里有node,则开始唤醒。

doSignal()

 /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

如果first.nextWaiter是null,则说明condition队列中只有这个node,firstWaiter、lastWaiter、nextWaiter都是null。
如果后续还有节点,将nextWaiter指向firstWaiter,并断开first.nextWaiter。
重点看下transferForSignal。

transferForSignal()

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

顾名思义,这个方法的目的就是将node从condition队列转到sync队列。
转移前的状态如果不是condition,说明是cancel,就不再执行。成功则继续向后执行,此时当前node的waitState=0
将node节点enq到sync队列中,返回前一个node。
如果前一个node已经被取消,或者在cas成signal的过程中失败(也就是可能在设置过程中cancel),那就通过unpark将当前节点唤醒(相当于被提前唤醒)。

此时,当前线程完成signal方法的调用,如果调用了unpark,则这个线程也被唤醒。两个线程同时在执行。

doSignalAll()

/**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

这个all表示把condition队列中的所有node全部transfer到sync队列。

至此,signal(signalAll)执行完成,transfer或者unpark condition队列中的node。

await() #2

无论以什么样的方式唤醒,await内的park线程终究还是会被唤醒,继续向后执行。

      public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 第一部分
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

先检查在等待过程中是否中断过,如果是,看中断时机。
在signal信号前被中断返回THROW_IE,已经在sync队列中返回REINTERRUPT。

/**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

      /**
     * Transfers node, if necessary, to sync queue after a cancelled wait.
     * Returns true if thread was cancelled before being signalled.
     *
     * @param node the node
     * @return true if cancelled before the node was signalled
     */
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //如果收到了中断信号,且当前node还在condition队列中,则入队到sync队列。
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

acquireQueued()

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

中断标记记录后,尝试获取锁,如果没有达到条件,则再次进入park状态。

            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

再次被唤醒或者抢占到锁后,清理一波cancel的condition队列。根据不同的中断标记向上抛出异常或者返回中断标记。

至此,await() 方法也执行完成。

总结

condition的各种await、signal的处理结合了lock和unlock的状态。内部的很多操作都是需要在获得锁的状态下执行。这也就是为什么await、signal需要写到lock和unlock块中。

这四个方法需要整体看。

重点说明

ReentrantLock 内部分为了两个队列(sync和condition), 两种模式(EXCLUSIVE、SHARED),五种状态(SINGAL, CONDITION, CANCELLED, PROPAGATE, 0)

sync 队列是带有头尾指针的双向链表,节点字段是

    private transient volatile Node head;
    private transient volatile Node tail;
    volatile Node prev;
    volatile Node next;

condition队列是带有头尾指针的单链表,节点字段是

        private transient Node firstWaiter;
        private transient Node lastWaiter;
        Node nextWaiter;

lock()方法本质是将未获得锁的node加入到sync队列
unlock方法本质是将sync队列的node依次唤醒执行。
await()方法是将node加入到condition队列中。
signal()方法是将condition队列中的head node(signalAll是全部node)从condition转到sync队列。

上一篇下一篇

猜你喜欢

热点阅读