JUC并发包

并发包同步器的核心AQS-深入-条件队列

2021-04-06  本文已影响0人  于情于你

    ConditionObject主要是为并发编程中的同步提供了等待通知的实现方式,可以在不满足某个条件的时候挂起线程等待(await)。直到满足某个条件的时候在唤醒线程(signal/signalAll)。


结构

API层

将当前线程的锁释放,并加入条件等待队列

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
           // 用当前线程创建一个新节点,并加入等待队列
            Node node = addConditionWaiter();
          // 释放锁,并唤醒后面节点的线程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
           // 当前节点不在同步队列的时候挂起当前线程
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);

                 // 线程被唤醒,是否被中断了,如果没中断了退出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 尝试获取锁,获取锁成功interruptMode改为REINTERRUPT
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 清理状态是取消的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 根据interruptMode响应
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

REINTERRUPT,退出等待时再次中断。THROW_IE,退出等待时抛出InterruptedException

将等待时间最长的线程,从条件等待队列,加入等待锁的同步队列

  public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

内部方法调用层

往等待队列中添加节点

 private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 如果最后一个节点是取消状态,则清空等待队列中所有的被取消的节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 新建一个节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 队列是空,则把当前节点当作firstWaiter,否则加到最后一个节点后面
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
节点加入等待队列

等待队列转移到同步队列

 private void doSignal(Node first) {

            // 从指定节点开始把节点转移到同步队列并删除节点,直到有一个节点转移成功或者等待队列空了
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

 final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        // 把节点的waitStatus更新成0,如果更新失败则退出
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        // 将节点加入到同步队列,如果节点被取消操作了,或者更新节点状态失败,那么就唤醒当前节点的线程
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
image.png
上一篇下一篇

猜你喜欢

热点阅读