Condition源码解读

2020-05-05  本文已影响0人  7d972d5e05e8

Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的这两个方法,需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁。只不过Object的锁是synchronized,而Condition是Lock锁。这里先声明两个概念:

  1. 等待队列:Condition上用来等待这个条件的队列,它是个单向队列
  2. 同步队列:就是AQS等待拿锁的CLH队列,它是个双向队列。
    下文涉及这两个概念,一定要注意区分。

一、解读Condition的await方法

当然是先上源码:

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

1.1 第一步是addConditionWaiter方法。

这个方法是为了构造等待Condition条件的等待队列。这个队列是单向链表,有别于等待Lock锁的同步队列(CLH队列,它是双向队列)。 这里一定要区别好这两个队列,虽然队列的元素都是Node实现的,但是两者的用途完全不一样。好了,我们看下这个方法:

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

其实这个方法很简单,看下最后的等待元素node是否已经取消了。如果取消了,那么就从队列里面清除。这里有个疑问:
1. 为啥要判断最后一个node,而不是第一个,或第二个,第三个等等?难道最后一个元素,它特别香吗
2. 为啥最后一个node取消了,unlinkCancelledWaiters要遍历所有等待队列,清除所有已取消元素?

1.2 第二步是fullyRelease方法

这个方法其实就是模拟Object.wait方法,用来释放锁的。如果重入过,要把它释放干净。所以看名字fullyRelease,起的挺好的很直观。源码我们也来看一下:

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

fullyRelease是通过getState,然后全部release掉。

1.3 第三步是isOnSyncQueue方法

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

这个方法,乍一看莫名其妙。刚刚上面不是说了,node是单向队列吗,怎么用node.prev来判空。WT?单向队列的prev不是必空吗。其实这里的Node节点,已经被偷换了。 这个方法的Node,如果是第一次调用,那么确实还是Condition等待队列上node,它是单向队列,且它的waitStqatus = -2。这个方法在第一行的判断逻辑一定是true的。但是第2次,第3次调用,这个Node就不是等待队列的Node的,而是同步队列的Node,它是双向的,它有prev和next指针。具体是怎么悄悄就被转换了,第二节的signal方法会解释清楚的。
咱们继续看,循环调用isOnSyncQueue方法,是怎么回事?

      while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

第一次调用,一定会进入进来(上面已经解释了,不在赘述)。这里的park会阻塞自己,等待其他线程的唤醒。现在假如其他线程唤醒了该线程,它首先检查自己在阻塞期间,是否被中断过。如果没有中断过,继续执行isOnSyncQueue方法。上面也说了,第二次调用的时候,这个Node元素就变成了同步队列元素。再回头先下isOnSyncQueue方法,node.waitStatus一定不再等于-2了,且node.prev必不为null。其实这里为啥要判断node.prev == null 我也还没搞懂。然后第二个判断条件,node.next != null,大概率不为空。
总结下:这个方法主要作用就是用来判断该node节点,没有加入到同步队列。为啥要判断有没有加入到同步队列呢?因为await的时候,你先释放了锁,现在唤醒了,你不是要重新获取锁嘛?和Object的wait效果一致。学习了!

1.4 第四步是acquireQueued方法

这个方法,其实就是被唤醒后,用来获取锁。其和Lock的流程就一模一样了,可以看以前的文章,有讲过,不在赘述。有个细节acquireQueued(node, savedState)的savedState,是之前fullyRealese留下来的,这里要全部拿回。释放了我多少,我就要拿回来多少。拿锁那一套,走Lock流程。

该方法后面又来一次unlinkCancelledWaiters,咋这么喜欢清理等待队列呢?

二、解读Condition的signal方法

上源码,不上源码我怎么讲。

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

里面就一个核心方法doSignal。

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

里面也是一个核心方法transferForSignal

 final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

这个方法的作用就是,把等待队列的Node元素,转换为同步队列的node元素。然后修改node的waitStatus 由 -2 变为0。
compareAndSetWaitStatus(node, Node.CONDITION, 0) 这个动作就要要告诉node节点,你已经等到condition啦,你可以安全的去竞争锁了。然后就把node 执行enq方法。这个enq方法是不是很熟悉,就是把node加入到CLH锁的同步队列里面。所以这里就是转换的关键,也是上文提到的Node在第一次和非第一次执行时的不同。转换结束后,看下p节点(p是node的前置节点)。如果p节点已经取消了,或者p节点修改失败,就唤醒当前node节点的线程。搞不懂为啥要这么做,还没想明白?因为同步队列里面,可能还有有其他很多很多线程等待着锁呢,这里仅仅根据node的前置节点状态去判断,就要唤醒node线程,唤醒了又能怎么样呢?醒了后,发现自己不是head的next节点,还是要继续阻塞等待,还不如不唤醒。

总结:Condition所有的操作,就是为了实现Object的wait和notify的功能,实现线程间更优雅的通信。

吃饭去了。。。

上一篇下一篇

猜你喜欢

热点阅读