ScheduledThreadPoolExecutor源码剖析

2022-12-04  本文已影响0人  王侦

1.字段与构造方法

    /**
     * 线程池状态成为 shutdown 时,线程从任务队列内获取到 “周期执行任务” 时,是否进行执行。
     * 默认值 false,不执行。
     * 如果设置为true,线程读取出来 周期执行任务 时 ,还会执行一次..
     *
     * False if should cancel/suppress periodic tasks on shutdown.
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * 线程池状态成为 shutdown 时,线程从任务队列内获取到 “延迟任务” 时,是否进行执行。
     * 默认值 true,执行。
     *
     * False if should cancel non-periodic tasks on shutdown.
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;


    /**
     * 任务被取消时 是否 需要从“任务队列”内移除,默认false,不移除,等到线程拿到任务之后 抛弃..
     * 设置为 true 则 取消任务时,就主动的从 任务队列 内移除走了..
     *
     * True if ScheduledFutureTask.cancel should remove from queue
     */
    private volatile boolean removeOnCancel = false;

    /**
     * 生成任务序列号的一个字段
     * 为什么需要序列号呢?因为调度线程池 它 有独特的任务队列,该任务队列 是 优先级任务队列,
     * 优先级任务队列 内的 任务 都是需要实现 Compare 接口的,如果两个 任务 他们 的 compare 比较之后
     * 得到的 结果 是 0, 说明 这俩任务的 优先级一致,此时 就需要 再根据 序列号 进行 对比,看到底谁优先。
     *
     * Sequence number to break scheduling ties, and in turn to
     * guarantee FIFO order among tied entries.
     */
    private static final AtomicLong sequencer = new AtomicLong();
     /* corePoolSize 核心线程数
     * threadFactory 线程工厂
     * handler 拒绝策略
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        // super -> ThreadPoolExecutor 构造方法
        // 参数2:maximumPoolSize,传的 int 最大值,非核心线程 不受限制
        // 参数3:keepAliveTime,0 说明非核心线程 一旦空闲 就立马销毁去..(线程是不分 核心 和 非核心的,只不过线程池会维护 一个 corePoolSize 内的线程数量)
        // 参数5:new DelayedWorkQueue() ,创建了一个 任务队列 ,这是一个 延迟任务队列,它是一个 优先级 任务队列,在优先级 基础之上提供了 hold 线程的事情。
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);

    }

ScheduledThreadPoolExecutor的主要特点:

2.提交任务的接口

2.1 提交延迟任务

    /**
     * 提交的任务被称为“延迟任务”,不是周期执行的任务
     * 延迟多久呢? delay 参数 和 unit 参数控制。
     * 注意:只执行一次
     *
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

2.2 按照固定时间间隔执行的周期性任务

    /**
     * 提交的任务是“周期任务”,什么是周期任务?按照一个周期去反复执行的任务。
     * initialDelay:任务初次执行时的延迟时间。
     * period:周期时长
     *
     * 特点:该接口提交的任务,它不考虑执行的耗时。
     * 举个例子:有一个任务 10秒钟 执行一次,假设它每次执行耗时2秒。在 03:00:00 执行了一次,耗时了2秒,那它下一次的执行时间节点是 03:00:10.
     * 如果它执行耗时超过了 10 秒,那它下一次的执行时间 就是 立马执行。
     *
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

2.3 按照固定延迟时间执行的周期性任务

    /**
     * 提交的任务是“周期任务”,什么是周期任务?按照一个周期去反复执行的任务。
     * initialDelay:任务初次执行时的延迟时间。
     * period:周期时长
     *
     * 特点:它考虑到执行耗时。
     * 例子:有一个任务 每10秒执行一次,假设它本次执行的时间点是 03:00:00 ,耗时了5秒钟,那它下一次的执行时间点 03:00:05 + 10s => 03:00:15 这个时间点。
     *
     * 任务执行【等待10s】任务执行【等待10s】任务执行【等待10s】...
     *
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

3.ScheduledFutureTask

3.1 字段

        // 序列号,用于对比任务优先级,当任务的 time 字段 比较不出来 优先级时,就使用该字段进行比较。
        // 该字段一定不会一致。
        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;


        // 任务下一次执行的时间节点,交付时间。当任务执行结束之后,会再次将任务加入到 queue,加入到queue之前
        // 会修改该字段 为 下一次 执行的时间节点。
        /** The time the task is enabled to execute in nanoTime units */
        private long time;



        // 周期时间长度..
        // 1. period > 0 的情况,说明当前任务 是 由 scheduleAtFixedRate(..)接口提交的任务,该任务它不考虑执行耗时。
        // 2. period < 0 的情况,说明当前任务 是 由 scheduleWithFixedDelay(..)接口提交的任务,该任务考虑执行耗时,下次执行节点为 本次结束时间 + period 时间。
        // 3. perod == 0 的情况,说明当前任务 是 由 schedule(..)接口提交的任务,该任务是一个延迟执行的任务,只执行一次
        /**
         * Period in nanoseconds for repeating tasks.  A positive
         * value indicates fixed-rate execution.  A negative value
         * indicates fixed-delay execution.  A value of 0 indicates a
         * non-repeating task.
         */
        private final long period;


        // 指向自身..后面看源码再说..
        /** The actual task to be re-enqueued by reExecutePeriodic */
        RunnableScheduledFuture<V> outerTask = this;



        // 因为DelayedWorkQueue 底层使用的数据结构是 堆(最小堆),记录当前任务 在 堆 中的索引。
        // 其实就是 数组中的索引,堆是一个满二叉树,满二叉树 一般是使用 数组表示,看过 netty 内存管理的同学 应该清楚 满二叉树 数组表示..
        /**
         * Index into delay queue, to support faster cancellation.
         */
        int heapIndex;
        /**
         * 周期任务执行完成之后,再次提交到 DelayedWorkQueue 之前调用的。
         * 设置下一次 任务的执行时间节点。
         *
         * Sets the next time to run for a periodic task.
         */
        private void setNextRunTime() {
            // p 表示 period
            long p = period;

            if (p > 0)
                // 该条件成立,说明 任务 是由 scheduleAtFixedRate(..)接口提交的任务,该任务它不考虑执行耗时。
                // 直接拿 上一次 任务的执行节点 + 执行周期 作为下一次 执行节点。
                time += p;
            else
                // 该条件成立,说明 任务 是由 scheduleWithFixedDelay(..)接口提交的任务,该任务考虑执行耗时 (period < 0 , -period 就是正数了 )
                // 拿系统当前时间 + 执行周期,作为下一次执行 时间节点。
                time = triggerTime(-p);
        }

        /**
         * 取消任务
         * @param mayInterruptIfRunning (当任务运行中时,是否需要先中断它..true 会先去中断任务,false 则直接设置任务状态为 取消状态)
         * @return cancelled bool值,表示是否取消成功
         */
        public boolean cancel(boolean mayInterruptIfRunning) {
            // super -> FutureTask 父类,cancelled 表示任务是否取消成功
            // 取消失败有哪些情况?1. 任务已经执行完成  2. 任务出现异常 3.任务被其它线程取消..
            boolean cancelled = super.cancel(mayInterruptIfRunning);

            // 条件1:cancelled ...
            // 条件2:removeOnCancel  设置为 true 则 取消任务时,就主动的从 任务队列 内移除走了..
            // 条件3:heapIndex > = 0 ,条件成立,说明 任务仍然在 堆 结构中,从 堆 结构中移除的任务 它的 heapIndex 会先设置为 -1
            if (cancelled && removeOnCancel && heapIndex >= 0)
                // remove 是线程池的方法,最终触发的是 延迟任务队列(DelayedWorkQueue.remove(..) 方法)
                remove(this);
            return cancelled;
        }
        /**
         * 线程池中的线程 从 queue 内获取到 task 之后,调用 task.run() 方法 执行任务逻辑。
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            // periodic 表示当前任务是否为 周期执行 的任务
            boolean periodic = isPeriodic();

            // 1. periodic == true 时,当前任务是周期任务,并且 continue...参数值为 true,则会执行当前任务,否则 continue.. 值为 false,则不执行周期任务。
            // 2. periodic == false 时,当前任务是 延迟任务,并且 executeExistingDelayedTasksAfterShutdown 参数值为 true(默认),则执行当前 延迟任务,
            // 否则 executeExistingDelayedTasksAfterShutdown 参数值为 false,则不执行 从queue内取出的 延迟任务。
            if (!canRunInCurrentRunState(periodic))
                // 取消任务
                cancel(false);


            else if (!periodic)// 条件成立,说明任务是 延迟任务,不是 周期任务..
                // 调用父类的run方法,即FutureTask.run() 逻辑,普通线程池没啥区别
                ScheduledFutureTask.super.run();


            // 执行到 else if 条件,就说明当前这个任务 肯定 是一个 “周期执行的任务”,此时调用 父类的 runAndReset() 方法去执行 任务逻辑。
            // 该方法一般情况下 返回 true
            else if (ScheduledFutureTask.super.runAndReset()) {
                // 上面 ScheduledFutureTask.super.runAndReset() 执行,代表 任务 周期性 执行了 一次,执行完一次之后 该干什么呢?
                // 1. 设置当前任务的 下一次执行时间
                // 2. 再加入到 延迟任务队列
                setNextRunTime();
                reExecutePeriodic(outerTask);

            }

        }

上面FutureTask#run和runAndReset区别是,runAndReset没有设置结果的步骤,因为是周期执行。

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            // 将任务再次加入到 任务队列
            super.getQueue().add(task);
            // 下面if 条件内的逻辑,是说 如果提交完任务之后,线程池状态变为了 shutdown 状态,并且 continue...参数值是 false的话,直接
            // 将上一步 提交的任务 从 队列 移除走
            if (!canRunInCurrentRunState(true) && remove(task))
                // 将任务设置为 取消状态
                task.cancel(false);
            else
                // 正常走这里..确保提交任务之后,线程池内有线程干活。
                ensurePrestart();
        }
    }

4.DelayedWorkQueue

使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。

工作队列是高度定制化的延迟阻塞队列DelayedWorkQueue,其实现原理和DelayQueue基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也就用不上了,所以线程池中永远会保持至多有corePoolSize个工作线程正在运行。

最小堆:

假设“第一个元素” 在数组中的索引为 0 的话,则父结点和子结点的位置关系如下:

为什么要使用DelayedWorkQueue呢?

    /**
     * 先讲一个数据结构 “堆”,堆分为 最大堆 和 最小堆,堆这个结构是使用 满二叉树 表示的。
     * 满二叉树结构在程序内一般是使用 数组表示的,下面这个最小堆 使用数组表示:[1,3,5,6,7,8,9]
     * 公式:
     * 1. 查询父节点:floor((i-1) / 2)
     * 2. 查询左子节点: left = i * 2 + 1
     * 3. 查询右子节点: right = i * 2 + 2
     *
     * 最小堆:
     *           1 --- 索引 0
     *         /  \
     *索引1----3    5  ----索引 2
     *       / \  / \
     *      6  7 8   9
     *
     * 最大堆:
     *
     *          10
     *         /  \
     *        8    9
     *       / \  / \
     *      4  5 6   7
     *
     * 延迟队列采用的堆 是 最小堆,言外之意 最顶层的 元素 是优先级最高的 任务。
     *
     * 最小堆 插入元素的流程,咱们向下面这颗 最小堆 插入 元素 2 的流程:
     *
     *
     *           1                  |               1                   |               1
     *         /  \                 |              /  \                 |              /  \
     *        3    5               ->             3    5               ->             2    5
     *       / \  / \               |            / \  / \               |            / \  / \
     *      6  7 8   9              |           2  7 8   9              |           3   7 8  9
     *     /                                   /                        |          /
     *    2                                   6                         |         6
     *
     * 规则:
     * 1. 将新元素 放入数组的最后一个位置,[1,3,5,6,7,8,9] -> [1,3,5,6,7,8,9, “2”]
     * 2. 新元素 执行向上冒泡的逻辑,去和它的父节点 比较大小,(最小堆)父节点 值 大于 子节点,则交换它俩位置,此时相当于 向上冒泡了一层,然后持续这个过程
     * 直到 新元素 成为 顶层节点 或者 碰到 某个 父节点 优先级 比他高,则停止。
     *
     *
     * 最小堆 删除顶层元素的流程,假设删除 节点 1:
     *              1               |           9           |           3           |           3
     *             / \              |          / \          |          / \          |          / \
     *            3   5            ->         3   5        ->         9   5        ->         6   5
     *           / \  / \           |        / \  /         |        / \  /         |        / \  /
     *          6   7 8  9          |       6   7 8         |       6   7 8         |       9  7  8
     *
     *
     * 规则:
     * 1. 将数组最后一个节点 覆盖 数组[0] 这个位置,将数组最后一个节点 设置为null,反映到树上 就是 将最后一个 节点 提升到 树顶层位置。
     * 2. 上一步 提升的 这个节点,执行 向下 冒泡的逻辑。怎么向下冒泡?获取左右子节点 中 优先级 高的节点,去和优先级高的子节点比较 看 到底是 向下 冒泡节点的优先级高
     *  还是 子节点优先级高,如果子节点 优先级高,则交换位置,反映到图上 就是向下 坠落了一层。
    */

4.1 字段

leader线程的设计,是Leader-Follower模式的变种,旨在于为了不必要的时间等待。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。

        // 初始化数组时的大小 16
        private static final int INITIAL_CAPACITY = 16;

        // 这个数组就是 满二叉树 的数组表示,存储提交的 优先级任务
        // 这里其实存储的就是 ScheduledFutureTask 实例
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];


        // 可重入锁,添加任务 和 删除任务的时候 都需要加锁操作,确保 满二叉树 线程安全
        private final ReentrantLock lock = new ReentrantLock();

        // 表示延迟队列内存储的任务数量
        private int size = 0;

        /**
         * leader字段? 一种策略
         * 线程池内的某个线程 去 take() 获取任务时,如果 满二叉树 顶层节点不为null(队列内有任务),但是顶层节点 这个 任务 它还不到 交付时间,
         * 这个怎么办?线程就去检查 leader 字段是否被占用,如果未被占用,则当前线程占用该字段,将线程自己 保存 到 leader字段
         * 再下一步,当前线程 到 available 条件队列 指定超时时间(堆顶任务.time - now()) 的挂起。
         *
         * 如果 满二叉树顶层节点不为null(队列内有任务),但是顶层节点 这个 任务 它还不到 交付时间,
         * 线程检查 leader 字段是否被占用,如果被占用了,则当前线程怎么办?直接到available 条件队列 “不指定”超时时间的 挂起。
         *
         * 不指定超时时间挂起,那什么时候会被唤醒呢?..
         * leader 挂起时指定了 超时时间,其实 leader 在 available 条件队列 内 它是 首元素,它超时之后,会醒过来,
         * 然后再次将 堆顶元素 获取走..
         * 获取走之后,take()结束之前,会释放 lock,释放lock前 会做一件事,就是 available.signal() 唤醒下一个 条件队列 内的 等待者
         * 下一个等待者 收到信号之后,去哪了 ? 去到 AQS 队列了,做 acquireQueue(node) 逻辑去了。
         */
        private Thread leader = null;

        /**
         * 条件队列,很关键的一个东西..
         *
         * Condition signalled when a newer task becomes available at the
         * head of the queue or a new thread may need to become leader.
         */
        private final Condition available = lock.newCondition();

4.2 添加任务

        /**
         * 添加任务的入口
         * @param x 任务(ScheduledFutureTask 实例)
         */
        public boolean offer(Runnable x) {

            if (x == null)
                throw new NullPointerException();

            // 转换成RunnableScheduledFuture类型,e表示
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;

            // 加锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            // 内部的代码 肯定是 串行的!
            try {
                // i,当前任务队列 任务数量
                int i = size;

                if (i >= queue.length)// 条件成立,说明 任务数组 所有的slot都已经占用完了,需要扩容了..
                    // 扩容..
                    grow();

                // size + 1,因为接下来就是存储任务 到 最小堆 的逻辑了,注意:i 还是原来的 size值。
                size = i + 1;


                if (i == 0) {// 条件成立,说明 当前 任务 是最小堆 的第一个节点,将它存放到 数组[0] 的这个位置,即可,不需要执行 向上冒泡的逻辑。
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    // 执行到这里,说明 最小堆 存在其它节点,新节点 需要执行向上冒泡的逻辑..

                    // i : 新插元素的下标
                    // e : 新提交的任务
                    siftUp(i, e);
                }



                if (queue[0] == e) {
                    // 两种情况:
                    // 1. 当前任务是 第一个加入到 queue 内的任务
                    // 2. 当前任务不是第一个加入到 queue 内的任务,但是当前任务 优先级 高啊,它上升成为了 堆顶 节点。

                    // 1. 当前任务是 第一个加入到 queue 内的任务
                    // 第一种情况:在当前任务 加入 到 queue 之前,take()线程会直接到 available 不设置超时的挂起。并不会去占用 leader 字段。
                    //  available.signal() 时,会唤醒一个线程 让它去消费...

                    // 2. 当前任务不是第一个加入到 queue 内的任务,但是当前任务 优先级 高啊,它上升成为了 堆顶 节点。
                    // 这种情况下,leader 可能是 有值的,因为原 堆顶 任务 可能还未到 交付时间,leader线程正在 设置超时的 在 available 挂起呢..
                    // 此时 需要将 leader 线程 唤醒,唤醒之后 它 会检查 堆顶,如果堆顶任务 可以被消费,则直接获取走了..否则 继续 成为 leader 等待 新堆顶..

                    leader = null;
                    available.signal();
                }

            } finally {
                // 释放锁
                lock.unlock();
            }
            // 返回true 表示添加任务成功..
            return true;
        }





        public void put(Runnable e) {
            offer(e);
        }

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable e, long timeout, TimeUnit unit) {
            return offer(e);
        }

扩容是1.5倍扩容:

        /**
         * 数组扩容的方法,每次扩容1.5倍
         * Resizes the heap array.  Call only when holding lock.
         */
        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }

        / * k: 新插元素的下标
         * key: 新插元素
         */
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            // while 条件:k > 0, k 其实表示的是 新插元素的最终存储的下标,
            // 隐含结束循环的条件是 “k == 0”,因为最顶层元素的下标是0,到0之后没办法再冒泡了..
            while (k > 0) {
                // 获取父节点 下标
                int parent = (k - 1) >>> 1;
                // 获取父节点 任务对象
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    // 条件成立:说明 父节点 优先级 是高于 key 任务的,停止向上冒泡的逻辑。
                    break;

                // 执行到这里,说明 父节点 的优先级 是 低于 key 任务的,需要执行 冒泡逻辑..

                // 父节点 占用 当前key 对象的 位置 即可,即下坠一层。
                queue[k] = e;
                // 设置heapIndex
                setIndex(e, k);
                // k 赋值为 原父节点的 下标位置
                k = parent;
            }
            // 结束循环之后,说明当前key任务 找到 它的应许之地了
            // k 这个下标表示的位置,就是 key 对象的应许之地
            queue[k] = key;
            // 设置任务的heapIndex
            setIndex(key, k);
        }

4.3 取走任务

        // 线程池 线程数在 核心数量以内时(<= corePoolSize) 的时候,线程获取任务使用的 方法 是 take() 方法
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            // 获取锁,保证 队列线程安全
            lock.lockInterruptibly();
            try {
                // 自旋,从该循环返回时,一定是获取到 任务了,或者 线程收到了 中断异常..
                for (;;) {
                    // 获取堆顶任务(优先级最高的任务)
                    RunnableScheduledFuture<?> first = queue[0];

                    if (first == null)
                        // 该条件成立,说明队列是空队列,没办法,线程只能在 条件队列 available 不设置超时的挂起..
                        // 什么时候会唤醒呢? offer时 添加第一个任务时,会signal..
                        available.await();
                    else {
                        // 执行到该分支,说明 任务队列 内存在其它任务

                        // 获取堆顶任务 距离 执行的 时间长度 ,纳秒
                        long delay = first.getDelay(NANOSECONDS);

                        if (delay <= 0)// 该条件成立,说明 堆顶任务 已经到达 交付时间了,需要立马返回 给 线程 去执行了..

                            return finishPoll(first);



                        // 执行到这里,说明什么?
                        // 说明堆顶任务 还未到 交付时间..


                        first = null; // don't retain ref while waiting

                        if (leader != null)
                            // 当前线程不设置 超时时间的挂起..
                            // 不用担心 醒不来的事,leader 它醒来之后,会获取 堆顶元素,返回之前 会 唤醒下一个 等待者线程的..
                            available.await();
                        else {
                            // 因为持锁线程是 自己,没人能给你 抢 leader,所以 这里没有使用CAS

                            // 获取当前线程
                            Thread thisThread = Thread.currentThread();
                            // leader 设置为当前线程
                            leader = thisThread;
                            try {
                                // 在条件队列available使用 带超时的挂起 (堆顶任务.time - now() 纳秒值..)
                                // 当 到达 阻塞时间时,当前线程会从这里醒过来,或者 offer 了 优先级更高 的任务时,offer线程也会唤醒你..
                                // (能从这里醒来,那肯定已经拿到lock了..)
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    // 释放leader字段
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    // 队列内还有其它 任务,此时 唤醒 条件队列内 下一个 等待者,让它去尝试 获取 新的 堆顶节点。
                    available.signal();

                // 释放锁..
                lock.unlock();
            }
        }
        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            // --size ,堆顶任务 肯定是要返回走了,所以 减就行了。
            // 将 减1之后的 size 赋值给 s,s表示 堆结构 最后一个节点的下标
            int s = --size;

            // 获取 堆结构 最后一个节点
            RunnableScheduledFuture<?> x = queue[s];

            // 将堆结构 最后一个节点占用的slot设置为null,因为 该节点 要升级成 堆顶,然后执行 下坠的逻辑了..
            queue[s] = null;


            if (s != 0)// s == 0 说明 当前 堆结构 只有堆顶一个节点,此时不需要做任何的事情..
                // 执行if代码块内,说明 堆结构 的元素数量是 > 1 的。这个时候就需要执行 堆最后节点 下坠的逻辑了..
                // k: 堆最后节点 当前开始下坠的位置
                // x: 堆最后节点   (在这个时候,它已经认为自己是 堆顶了..)
                siftDown(0, x);


            // f 原堆顶,将它的 heapIndex 设置为 -1 ,表示从 堆内出去了..
            setIndex(f, -1);

            // 返回原堆顶任务..
            return f;
        }

        // k: 堆最后节点 当前开始下坠的位置
        // key: 堆最后节点   (在这个时候,它已经认为自己是 堆顶了..)
        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            // 记住:size 在来到 siftDown 之前 已经-1过了.. k 值,最终要成为 key 对象 应该存储的 slot 下标.
            int half = size >>> 1;

            // half 干什么的呢?做为  while 循环条件的一个判断了..
            // half 表示满二叉树 最底层 ,最左侧的节点 下标值,当 k >= half 时,说明 key 对象已经下落到 最底层了,再往下 没法再下坠了..
            while (k < half) {
                // child 最终表示 key 的左右子节点 中 优先级 高的 子节点 下标
                // child 的初始值 是 key 的左子节点的 下标
                int child = (k << 1) + 1;

                // 获取左子节点任务对象
                RunnableScheduledFuture<?> c = queue[child];

                // 获取key的右子节点 下标
                int right = child + 1;

                // 条件1:right < size,成立 说明 key 存在右子节点
                // 条件2:(条件1成立,key在该层时,存在右子节点),c key的左子节点 比较 右子节点
                if (right < size && c.compareTo(queue[right]) > 0)
                    // 将优先级高的子节点 赋值给 c,优先级高的子节点的下标 赋值给 child
                    c = queue[child = right];


                if (key.compareTo(c) <= 0)
                    // 条件成立,说明 key 的优先级 是高于 下层子节点的,说明已经下坠到 该到的层级了..不再继续下坠了..break 跳出循环。
                    break;


                // 执行到这里,说明 key 优先级 是小于 下层子节点的 某个节点的。
                // 与子节点 交换位置..对应就是 下坠一层..(对应c节点 向上升一层)
                queue[k] = c;
                // 设置 c的heapIndex..
                setIndex(c, k);
                // key 下坠一层,占用child 下标..
                k = child;
            }

            // 执行完 while 循环 key对象 肯定已经找到它 自己的位置了..
            // 进入自己的坐席..
            queue[k] = key;
            // 设置好自己的 heapIndex
            setIndex(key, k);
        }

5.DelayedWorkQueue与DelayQueue、PriorityBlocking区别

5.1 DelayQueue、PriorityBlocking区别

DelayQueue 与 PriorityBlockingQueue 有相似之处,它们都具有优先级队列的特性,因为它们底层都使用了二叉树数组实现。但 DelayQueue 比起 PriorityBlockingQueue 还多了一个延迟属性,可以设置延迟到某个时间再出列。

需要实现Delayed接口同时需要实现getDelay方法和compareTo方法,getDelay方法用于计算出队列时间,一旦小于0就会出队列;compareTo方法用于按触发时间从小到大排序。

对于 DelayQueue 的实现,其实也并不复杂。其直接使用了 PriorityBlockingQueue 来实现数据的维护,使用 ReentrantLock 实现并发控制。因为其使用 PriorityBlockingQueue 实现,而本质上 PriorityBlockingQueue 是有最大值限制的,所以其并不是无界队列,而是有界队列。

在 DelayQueue 的具体实现上,其使用了 leader 属性保存了第一个等待获取元素的线程,从而避免了过多线程进行 CPU 自旋浪费资源。此外,还使用了 awaitNano 方法最大限度地避免 CPU 无效空转,这些都是非常好的设计思路。

5.2 DelayedWorkQueue与DelayQueue区别

DelayedWorkQueue 类其实一个特殊的 DelayQueue 类,其唯一的不同是:DelayQueue 内部用 PriorityQueue 来维护元素的二叉树结构。而 DelayedWorkQueue 因为是专为线程池设计,所以其内部用 RunnableScheduledFuture 数组重新实现了一遍二叉树结构。

DelayedWorkQueue是一个无界队列,在队列元素满了以后会自动扩容,它并没有像DelayQueue那样,将队列操作委托给PriorityQueue,而是自己重新实现了一遍堆的核心操作——上浮、下沉。

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

        /*
         * A DelayedWorkQueue is based on a heap-based data structure
         * like those in DelayQueue and PriorityQueue, except that
         * every ScheduledFutureTask also records its index into the
         * heap array. This eliminates the need to find a task upon
         * cancellation, greatly speeding up removal (down from O(n)
         * to O(log n)), and reducing garbage retention that would
         * otherwise occur by waiting for the element to rise to top
         * before clearing. But because the queue may also hold
         * RunnableScheduledFutures that are not ScheduledFutureTasks,
         * we are not guaranteed to have such indices available, in
         * which case we fall back to linear search. (We expect that
         * most tasks will not be decorated, and that the faster cases
         * will be much more common.)
         *
         * All heap operations must record index changes -- mainly
         * within siftUp and siftDown. Upon removal, a task's
         * heapIndex is set to -1. Note that ScheduledFutureTasks can
         * appear at most once in the queue (this need not be true for
         * other kinds of tasks or work queues), so are uniquely
         * identified by heapIndex.
         */

DelayedWorkQueue类似于DelayQueue和PriorityQueue,是基于“堆”的一种数据结构。区别就在于ScheduledFutureTask记录了它在堆数组中的索引,这个索引的好处就在于:取消任务时不再需要从数组中查找任务,极大的加速了remove操作,时间复杂度从O(n)降低到了O(log n),同时不用等到元素上升至堆顶再清除从而降低了垃圾残留时间。但是由于DelayedWorkQueue持有的是RunnableScheduledFuture接口引用而不是ScheduledFutureTask的引用,所以不能保证索引可用,不可用时将会降级到线性查找算法(我们预测大多数任务不会被包装修饰,因此速度更快的情况更为常见)。

所有的堆操作必须记录索引的变化 ————主要集中在siftUp和siftDown两个方法中。一个任务删除后他的headIndex会被置为-1。注意每个ScheduledFutureTask在队列中最多出现一次(对于其他类型的任务或者队列不一定只出现一次),所以可以通过heapIndex进行唯一标识。

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 因为在heapIndex中存储了索引
        // indexOf的时间复杂度从线性搜索的O(n)
        // 降低到了常量O(1)
        int i = indexOf(x);
        if (i < 0)
            return false;

        // heapIndex标记为-1,表示已删除
        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        // siftUp和siftDown操作完全二叉树时间复杂度为O(log n)
        // 综合前面的O(1)+O(log n) ==> O(log n)
        if (s != i) {
            siftDown(i, replacement);
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}
private int indexOf(Object x) {
    if (x != null) {
        if (x instanceof ScheduledFutureTask) {
            // 如果是ScheduledFutureTask,可用heapIndex直接索引
            int i = ((ScheduledFutureTask) x).heapIndex;
            if (i >= 0 && i < size && queue[i] == x)
                return i;
        } else {
            // 否则使用线性查找
            for (int i = 0; i < size; i++)
                if (x.equals(queue[i]))
                    return i;
        }
    }
    return -1;
}
上一篇下一篇

猜你喜欢

热点阅读