线程那些事

线程池那些事之ScheduledThreadPoolExecut

2018-11-05  本文已影响0人  土豆肉丝盖浇饭

前言

ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上增加了schedule方法,支持了延迟,固定间隔触发执行任务。与ThreadPoolExecutor的任务不同的是,这些任务可以重复执行。

源码解析

ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上做了一些修改

  1. 新增任务类型ScheduledFutureTask,继承于FutureTask
  2. workqueue的实现为延迟队列DelayedWorkQueue
  3. 增加了shutdown的一些配置参数,continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown
  4. 提供包装方法decorateTask让子类扩展自定义任务类型

ScheduledExecutorService实现

相对ThreadPoolExecutor,ScheduledThreadPoolExecutor额外实现了ScheduledExecutorService接口。
ScheduledExecutorService定义了这些定时任务的方法。

//延迟任务
//delay时间后执行任务
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//延迟任务
//delay时间后执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
//固定频率任务
//第一次initialDelay后执行任务,之后按照固定period执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//固定延迟任务
//第一次initialDelay后执行任务,之后延迟period时间执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

调度任务分为两种,周期任务延迟任务。周期任务又分为固定频率和固定延迟。

固定频率和固定延迟的区别在于,initialDelay为0的情况下,如果一个任务耗时2秒,固定5秒频率任务的时间为,0,5,10,15...,而固定5秒延迟任务为0,7,14,21...

shedule方法的逻辑基本一致,我们看下scheduleAtFixedRate实现

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        //创建ScheduledFutureTask
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        //进行再次包装,用于子类扩展
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //放入延迟队列
        delayedExecute(t);
        return t;
    }

delayedExecute封装了向workqueue放任务的逻辑

private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果已经shutdown,拒绝任务
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            //再次check线程池状态
            //如果已经shutdown那么取消该任务
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //判断工作线程是否足够,不够增加
                ensurePrestart();
        }
    }

相对ThreadPoolExecutor的submit方法,schdule新增了一种任务类型ScheduledFutureTask,用于执行调度任务,接下来我们看下ScheduledFutureTask的实现

ScheduledFutureTask


ScheduledFutureTask相对FutureTask增加了Delayed接口,用于对任务进行排序。以及RunnableScheduledFuture中的isPeriodic方法,判断当前任务是否是周期性的。

新增属性

在ScheduledFutureTask新增了几个属性。

属性 作用
sequenceNumber 用于任务排序
time 任务执行的绝对时间
period 大于0代表固定频率周期,小于0代表固定延迟周期,0代表不是周期任务
heapIndex 用于快速定位在DelayedWorkQueue的位置,方便删除

run方法重载

ScheduledFutureTask的调度逻辑主要在run方法,增加了reset的逻辑,让周期性任务能够重复执行。

public void run() {
            boolean periodic = isPeriodic();
            //当前线程池状态能否执行任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            //如果是普通任务,执行用父类run方法执行
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //如果是调度任务,执行完,重新设置初始状态
            else if (ScheduledFutureTask.super.runAndReset()) {
                //设置下次生效的时间
                setNextRunTime();
                //把任务重新放到workQueue中去
                reExecutePeriodic(outerTask);
            }
        }

runAndReset直接使用FutureTask的实现,和run方法的区别是,在执行完任务后,不会修改任务的状态(不调用set方法设置结果),还是为NEW。

protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    //在出异常的情况下,会被设置为EXCEPTION状态,那么这个任务就执行不了了
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

setNextRunTime用于设置下次生效时间

private void setNextRunTime() {
            long p = period;
            if (p > 0)
                //固定频率,直接加period得到下次运行时间
                time += p;
            else
                //固定延迟,在triggerTime用的是当前时间+p
                time = triggerTime(-p);
        }

long triggerTime(long delay) {
        //注意是当前时间,其他的位移逻辑就当是把负数变成正数把
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

reExecutePeriodic用于重新把任务放入队列

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            //如果线程池当前状态不能执行任务,那么取消该任务
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

cancel方法重载

下面来看下cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            //取消成功后,主动从workqueue删除该任务
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

这边一个优化点是,会主动删除任务,一般任务被取消了,工作线程还是会去获取的,但是不实际执行该任务。

Comparable接口实现

ScheduledFutureTask同时实现了Comparable<Delayed>接口,用于在workQueue排序的时候通过延迟时间来排序

public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                //根据绝对时间比较
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                //时间相同,使用任务号码比较,先加入的任务号码小
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            //使用getDelay接口实现比较
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

下面来介绍延迟队列的实现,为什么用延迟队列,延迟队列按照执行时间顺序排,我们只需要关注最前面的任务,获取有效任务更快。

延迟队列DelayedWorkQueue

DelayedWorkQueue内部使用了小根堆算法,保证时间最小也就是最近执行的任务放在队头。并且只有任务的生效时间到了,才能被工作线程获取到,不然一直阻塞在那边。

DelayedWorkQueue内部通过一个数组存储ScheduledFutureTask

private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

这个数组是一个小根堆,根节点总是延迟最小的任务。

如何构造这个小根堆暂时不讲,但是它的算法还是挺有意思的。
我们关注它的take方法,因为DelayedWorkQueue是无界的,因此ScheduledThreadPoolExecutor内只有核心线程,所以在getTask的时候使用take获取任务

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

看下take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        //如果workQueue第一个任务为空,直接await
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //如果第一个任务可以获取,那么直接获取
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        //在第一个元素不满足获取条件的情况下
                        //只能有一个线程能获取该元素,这个线程保存在leader中
                        if (leader != null)
                            //如果leader已经被别的线程占有,那么await
                            available.await();
                        else {
                            //leader为空,那么占有它
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                //等待直到这个任务有效
                                available.awaitNanos(delay);
                            } finally {
                                //放弃leader锁
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                //通知其他等待的线程
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

take方法的大致逻辑是
如果workQueue为空,那么直接等待
如果不为空,并且第一个任务有效,那么直接获取
如果第一个任务延迟时间还没到,第一个到达的线程通过leader锁先锁定这个任务,消费这个任务后,使用signal唤醒其他等待的工作线程来获取其他任务。
做这个优化,我觉得是为了防止过多线程竞争任务,浪费CPU资源。因为一个任务最终只能被一个线程消费。

onShutdown钩子

ScheduledThreadPoolExecutor中重载了onShutdown的钩子方法,会在shutdown方法内被调用。

Override void onShutdown() {
        BlockingQueue<Runnable> q = super.getQueue();
        boolean keepDelayed =
            getExecuteExistingDelayedTasksAfterShutdownPolicy();
        boolean keepPeriodic =
            getContinueExistingPeriodicTasksAfterShutdownPolicy();
        //如果keepDelayed和keepPeriodic都为false,cancel所有ScheduledFutureTask
        if (!keepDelayed && !keepPeriodic) {
            for (Object e : q.toArray())
                if (e instanceof RunnableScheduledFuture<?>)
                    ((RunnableScheduledFuture<?>) e).cancel(false);
            q.clear();
        }
        else {
            // Traverse snapshot to avoid iterator exceptions
            for (Object e : q.toArray()) {
                if (e instanceof RunnableScheduledFuture) {
                    RunnableScheduledFuture<?> t =
                        (RunnableScheduledFuture<?>)e;
                    //根据配置以及任务类型对任务进行取消
                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                        t.isCancelled()) { // also remove if already cancelled
                        if (q.remove(t))
                            t.cancel(false);
                    }
                }
            }
        }
        tryTerminate();
    }

重载这个方法用于cancel那些可以被cancel的ScheduledFutureTask任务。
continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown用于控制shutdown后是否执行周期任务(固定延迟和固定频率)和延迟任务。哪一个配置为false,那种类型的任务会被cancel。

测试

延迟任务

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        System.out.println(new Date());
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date());
            }
        },2, TimeUnit.SECONDS);

延迟任务只会执行一次

周期定时延迟任务

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        System.out.println(new Date());
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date());
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },2,5, TimeUnit.SECONDS);

周期定时频率

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        System.out.println(new Date());
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date());
                try {
                    //这边把2000修改为6000后,其实频率会变为6s,任务会继续放到延迟队列,时间生成是对的,但是放的时间比原来时间拖了1s
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },2,5, TimeUnit.SECONDS);

总结

对调度线程池更加熟悉了,这种对子类的设计开放扩展点的方式很任性化。
下面是我公众号,大家可以关注下。


image
上一篇下一篇

猜你喜欢

热点阅读