ScheduledThreadPoolExecutor原理和源码

2019-05-01  本文已影响0人  无聊之园

延迟以及周期性执行线程池。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

堵塞队列为DelayedWorkQueue,这是ScheduledThreadPoolExecutor的内部类。看其grow扩容方法会发现,这个队列最大大小为Integer.MAX_VALUE。所以队列可以放置很多元素。

看堵塞队列的DelayedWorkQueue的offer方法
可以看出offer的元素是RunnableScheduledFuture类型。详细的流程不看,设计堆的调整等操作,目的是保证一个有优先级级的队列,延迟时间短的优先级高,在队列的最前面。

public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            // offer的元素是RunnableScheduledFuture类型
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

看DelayedWorkQueue队列保存的元素类型RunnableScheduledFuture。
RunnableScheduledFuture的唯一一个自己的方法是isPeriodic,判断这个任务时候是周期性任务。
RunnableScheduledFuture的父类是RunnableFuture继承了Runnable接口,所以RunnableScheduledFuture也可以看成一个线程。
还有一个父类是ScheduledFuture,其继承了Delayed接口,Delayed接口唯一的方法就是获取任务的剩余延迟时间,以供延迟队列的延迟获取元素。
所以RunnableScheduledFuture类型元素,即是一个线程,有run方法,也可以判断是否周期性执行,又可以获取任务剩余延迟时间。

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {

    /**
     * Returns {@code true} if this task is periodic. A periodic task may
     * re-run according to some schedule. A non-periodic task can be
     * run only once.
     *
     * @return {@code true} if this task is periodic
     */
    boolean isPeriodic();
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

看堵塞队列的take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        // 获取队列顶部元素的剩余延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 剩余延迟时间小于0,则finishPoll调整堆,然后直接返回这个元素
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        // 剩余延迟时间大于0。如果leader不为空,说明已经有线程成为leader并等待堆顶任务
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 堵塞等待delay时间
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

take方法总结:take方法和普通的延迟队列一样,比如delayQueue。等待队列头部元素,剩余延迟时间过了之后,才能获取到值。

DelayedWorkQueue延迟队列分析完了之后,分析ScheduledThreadPoolExecutor的关键方法。

构造方法:除了堵塞队列以外,和ThreadPoolExcutor差不多,但是由于DelayedWorkQueue堵塞队列是无限大的,所以,不存在最大线程数以及最大线程数空闲时间。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

execute方法:延迟时间为0,所以正常执行,该启动新线程则启动新线程,该放入队列则放入队列。

 public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        // decorateTask方法只是返回最后一个入参,什么都没做。
        // ScheduledFutureTask类是RunnableScheduledFuture的实现类,所以ScheduledFutureTask类可以放入DelayWorkQueue堵塞队列
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
         // delayedExecute方法把RunnableScheduledFuture放入堵塞队列,如果线程数小于核心线程,则开启线程从队列中取任务
        delayedExecute(t);
        return t;
    }
// ScheduledFutureTask构造方法,period为0,说明不是周期性任务,time表示延迟时间。
ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
             // 放入堵塞队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 如果线程数小于核心线程,则开启线程从队列中取任务
                ensurePrestart();
        }
    }

void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
      // 已经开启线程数小于核心线程数,则开启新线程,
// addWorker传入的第一参数为null,也就是worker的firstTask为null,所以直接从getTask()直接从队列中获取任务,
// 堵塞队列是延迟队列,所以可能会延迟获取到任务。
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

之后的流程就是,和和普通线程池一样,ThreadPoolExecutor的真正工作的内部类worker,其就是开启的线程类的runnable入参,线程start启动的时候,会运行workker的run方法,workker的run方法会循环从队列中获取任务执行,因为是延迟队列,所以会又延迟的效果,延迟的原理前面分析堵塞队列的时候分析了,condition.awaitNanos方法实现延迟等待。

获取任务后,运行任务的run方法。

public void run() {
            // 是否周期执行
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                // 不周期执行,则直接执行
                ScheduledFutureTask.super.run();
            // 如果周期执行,则重置任务,再放入队列等待执行
          else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

scheduleAtFixedRate:该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务,看下图示例。

 new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period)

scheduleWithFixDelay:该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务,看下图示例。

new ScheduledFutureTask<Void>(command,
                                         null,
                                         triggerTime(initialDelay, unit),
                                         unit.toNanos(-delay)

和scheduledAtFixedRate类似,唯一不同的地方在于在于创建的ScheduledFutureTask不同

上一篇下一篇

猜你喜欢

热点阅读