ThreadScheduledPool
ScheduledThreadPoolExecutor是一个使用线程池执行定时任务的类。 ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。

这里有必要来介绍一下ScheduledExecutorService接口。
ScheduledExecutorService本身继承了ExecutorService接口,并为调度任务额外提供了两种模式
延时执行
schedule(Runnable, long, TimeUnit)
根据参数中设定的延时,执行一次任务
schedule(Callable, long, TimeUnit)
根据参数中设定的延时,执行一次任务
周期执行
scheduleAtFixedRate
假设第n次任务开始时间是t,运行时间是p,设置的间隔周期为T则第n+1次任务的开始时间是max(t + p,t + T)。换句话说,如果任务执行足够快,则任务之间的间隔就是配置的周期T,否则如果任务执行比较慢,耗时超过T,则在任务结束后会立即开始下一次的任务。所以不会有同时并发执行提交的周期任务的情况。
scheduleWithFixedDelay
假设第n次任务结束时间是t,设置的延时为T,则第n+1次任务的开始时间是t+T。换句话说连续两个任务的首尾(本次结束与下次开始)为T。
ScheduledThreadPoolExecutor 和 Timer 的比较:
在JDK1.5之前,我们关于定时/周期操作都是通过Timer来实现的。如果你是使用JDK1.5以上版本,建议用ScheduledThreadPoolExecutor代替Timer。它基本上解决了上述问题。
- Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
- TTimer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
- T在TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机:-( …即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法
优于 Timer
其主要有如下两个优点:
-
使用多线程执行任务,不用担心任务执行时间过长而导致任务相互阻塞的情况,Timer是单线程执行的,因而会出现这个问题;
-
不用担心任务执行过程中,如果线程失活,其会新建线程执行任务,Timer类的单线程挂掉之后是不会重新创建线程执行后续任务的。
package com.ebay.concurrent.threadpool;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.conrrentcy.thread.BatchRebias;
public class TimerTest {
private static final Logger log = LoggerFactory
.getLogger(TimerTest.class);
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask t1 = new TimerTask(){
@Override
public void run() {
log.info("t1 is running");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
TimerTask t2 = new TimerTask(){
@Override
public void run() {
log.info("t2 is running");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
timer.schedule(t1, 1000);
timer.schedule(t2, 1000);
ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
service.schedule(()->{
log.info("Schedule Service t1 is running");
try {
Thread.sleep(2000);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
service.schedule(()->{
log.info("Schedule Service t2 is running");
}, 1, TimeUnit.SECONDS);
// service.scheduleWithFixedDelay(command, initialDelay, delay, unit)
// service.scheduleAtFixedRate(command, initialDelay, period, unit)
}
}
通过源代码查看ScheduledThreadPoolExecutor构造函数实现:
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} or
* {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor实现中:
- corePoolSize => 需要自己传递参数,指定核心线程数大小。
- maximumPoolSize => 允许最大的线程数默认Integer.MAX_VALUE无限大。
- keepAliveTime => keepAliveTime为0,意味着多余的空闲线程会被立即终止。
- workQueue => 采用DelayedWorkQueue作为线程池的工作队列。该队列是一个使用数组实现的优先队列,在调用ScheduledFutureTask::cancel()方法时,其会根据removeOnCancel变量的设置来确认是否需要将当前任务真正的从队列中移除,而不只是标识其为已删除状态。
ScheduledThreadPoolExecutor运行机制
ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。

ScheduledThreadPoolExecutor 的执行主要分为两大部分:
当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者scheduleWirhFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor 做了如下修改:
- 使用 DelayQueue 作为任务队列;
- 获取任务的方不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor 执行周期任务的步骤

线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指 ScheduledFutureTask 的 time 大于等于当前系统的时间;
线程 1 执行这个 ScheduledFutureTask;
线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。
源码分析
分析ScheduledThreadPoolExecutor的源码,主要会分成三个部分:ScheduledFutureTask, DelayedWorkQueue以及ScheduledThreadPoolExecutor本身。
ScheduledFutureTask
ScheduledFutureTask是ScheduledThreadPoolExecutor中的一个内部类。

我们可以看到,它的接口继承线大体是两条:RunnableFuture和ScheduledFuture,而RunnableScheduledFuture是两者的合体。
基本数据
sequenceNumber
任务的序列号,在排序中会用到。
- time
任务可以被执行的时间,以纳秒表示。 - period
0表示非周期任务。正数表示fixed-rate模式,负数表示fixed-delay模式。 - outerTask
ScheduledThreadPoolExecutor#decorateTask允许我们包装一下Executor构造的RunnableScheduledFuture(实现为ScheduledFutureTask)并重新返回一个RunnableScheduledFuture给Executor。
所以ScheduledFutureTask.outerTask实际上就是decorateTask方法包装出来的结果。decorateTask默认返回的就是参数中的RunnableScheduledFuture,也就是不进行包装,这种情况下outerTask就是ScheduledFutureTask自身了。
outerTask的主要目的就是让周期任务在第二次及之后的运行时跑的都是decorateTask返回的包装结果。 - heapIndex
用于维护该任务在DelayedWorkQueue内部堆中的索引(在堆数组中的index)。
ScheduledFutureTask#run方法
ScheduledFutureTask通常情况下就是线程池中Worker线程拿到的Runnable对象。注意这里说的是通常情况,因为ScheduledThreadPoolExecutor允许我们通过decorateTask方法包装原先的ScheduledFutureTask,相比之下这并不常见。
public void run() {
// 是否周期性,就是判断period是否为0。
boolean periodic = isPeriodic();
// 检查任务是否可以被执行。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果非周期性任务直接调用run运行即可。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
// 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
/*
* fixed-rate模式,时间设置为上一次时间+p。
* 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。
* 如果这次任务还没执行完是肯定不会执行下一次的。
*/
if (p > 0)
time += p;
/**
* fixed-delay模式,计算下一次任务可以被执行的时间。
* 简单来说差不多就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。
*/
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
/*
* 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
*
* 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
*/
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 主要就是有这么一种情况:
* 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
*
* 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
*
* 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
* 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
public void run() {
// 是否周期性,就是判断period是否为0。
boolean periodic = isPeriodic();
// 检查任务是否可以被执行。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果非周期性任务直接调用run运行即可。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
// 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
/*
* fixed-rate模式,时间设置为上一次时间+p。
* 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。
* 如果这次任务还没执行完是肯定不会执行下一次的。
*/
if (p > 0)
time += p;
/**
* fixed-delay模式,计算下一次任务可以被执行的时间。
* 简单来说差不多就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。
*/
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
/*
* 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
*
* 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
*/
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 主要就是有这么一种情况:
* 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
*
* 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
*
* 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
* 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
ScheduledFutureTask#cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 先调用父类FutureTask#cancel来取消任务。
boolean cancelled = super.cancel(mayInterruptIfRunning);
/*
* removeOnCancel开关用于控制任务取消后是否应该从队列中移除。
*
* 如果已经成功取消,并且removeOnCancel开关打开,并且heapIndex >= 0(说明仍然在队列中),
* 则从队列中删除该任务。
*/
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
DelayedWorkQueue
DelayedWorkQueue是ScheduledThreadPoolExecutor使用的工作队列。它内部维护了一个小根堆,根据任务的执行开始时间来维护任务顺序。但不同的地方在于,它对于ScheduledFutureTask类型的元素额外维护了元素在队列中堆数组的索引,用来实现快速取消。DelayedWorkQueue用了ReentrantLock+Condition来实现管程保证数据的线程安全性。
DelayedWorkQueue#offer方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 容量扩增50%。
grow();
size = i + 1;
// 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 插入堆尾。
siftUp(i, e);
}
// 如果新加入的元素成为了堆顶,则原先的leader就无效了。
if (queue[0] == e) {
leader = null;
// 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
3.1.3 ScheduledFutureTask#cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 先调用父类FutureTask#cancel来取消任务。
boolean cancelled = super.cancel(mayInterruptIfRunning);
/*
* removeOnCancel开关用于控制任务取消后是否应该从队列中移除。
*
* 如果已经成功取消,并且removeOnCancel开关打开,并且heapIndex >= 0(说明仍然在队列中),
* 则从队列中删除该任务。
*/
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
DelayedWorkQueue
DelayedWorkQueue是ScheduledThreadPoolExecutor使用的工作队列。它内部维护了一个小根堆,根据任务的执行开始时间来维护任务顺序。但不同的地方在于,它对于ScheduledFutureTask类型的元素额外维护了元素在队列中堆数组的索引,用来实现快速取消。DelayedWorkQueue用了ReentrantLock+Condition来实现管程保证数据的线程安全性。
DelayedWorkQueue#offer方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 容量扩增50%。
grow();
size = i + 1;
// 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 插入堆尾。
siftUp(i, e);
}
// 如果新加入的元素成为了堆顶,则原先的leader就无效了。
if (queue[0] == e) {
leader = null;
// 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
DelayedWorkQueue#take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
/*
* 循环读取当前堆中最小也就执行开始时间最近的任务。
* 如果当前队列为空无任务,则在available条件上等待。
* 否则如果最近任务的delay<=0则返回这个任务以执行,否则的话根据是否可以作为leader分类:
* 如果可以作为leader,则根据delay进行有时限等待。
* 否则无限等待直至被leader唤醒。
*/
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果当前队列无元素,则在available条件上无限等待直至有任务通过offer入队并唤醒。
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果delay小于0说明任务该立刻执行了。
if (delay <= 0)
// 从堆中移除元素并返回结果。
return finishPoll(first);
/*
* 在接下来等待的过程中,first应该清为null。
* 因为下一轮重新拿到的最近需要执行的任务很可能已经不是这里的first了。
* 所以对于接下来的逻辑来说first已经没有任何用处了,不该持有引用。
*/
first = null;
// 如果目前有leader的话,当前线程作为follower在available条件上无限等待直至唤醒。
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
/*
* 如果从available条件中被唤醒当前线程仍然是leader,则清空leader。
*
* 分析一下这里不等的情况:
* 1. 原先thisThread == leader, 然后堆顶更新了,leader为null。
* 2. 堆顶更新,offer方法释放锁后,有其它线程通过take/poll拿到锁,读到leader == null,然后将自身更新为leader。
*
* 对于这两种情况统一的处理逻辑就是只要leader为thisThread,则清leader为null用以接下来判断是否需要唤醒后继线程。
*/
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
/*
* 如果当前堆中无元素(根据堆顶判断)则直接释放锁。
*
*
* 否则如果leader有值,说明当前线程一定不是leader,当前线程不用去唤醒后续等待线程。
* 否则由当前线程来唤醒后继等待线程。不过这并不代表当前线程原来是leader。
*/
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
DelayedWorkQueue#poll(long, TimeUnit)方法
由于和take方法套路差不多,这里不展开细讲了。
DelayedWorkQueue#remove方法
ScheduledThreadPoolExecutor支持任务取消的时候快速从队列中移除,因为大部分情况下队列中的元素是ScheduledFutureTask类型,内部维护了heapIndex也即在堆数组中的索引。
堆移除一个元素的时间复杂度是O(log n),前提是我们需要知道待删除元素在堆数组中的位置。如果我们不维护heapIndex则需要遍历整个堆数组来定位元素在堆数组的位置,这样光是扫描一次堆数组复杂度就O(n)了。而维护了heapIndex,就可以以O(1)的时间来确认位置,从而可以更快的移除元素。
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
/*
* 堆的删除某个元素操作就是将最后一个元素移到那个元素。
* 这时候有可能需要向上调整堆,也可能需要向下维护。
*
* 对于小根堆而言,如果移过去后比父元素小,则需要向上维护堆结构,
* 否则将左右两个子节点中较小值与当前元素比较,如果当前元素较大,则需要向下维护堆结构。
*/
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// 如果参数x就是堆数组中最后一个元素则删除操作已经完毕了。
if (s != i) {
// 尝试向下维护堆。
siftDown(i, replacement);
// 相等说明replacement比子节点都要小,尝试向上维护堆。
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex;
// 再次判断i确实是本线程池的,因为remove方法的参数x完全可以是个其它池子里拿到的ScheduledFutureTask。
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
ScheduledThreadPoolExecutor
在了解了ScheduledFutureTask与DelayedWorkQueue之后最后再来看ScheduledThreadPoolExecutor本身的方法,就显得容易很多。
这里我们来介绍一些ScheduledThreadPoolExecutor以及父类ThreadPoolExecutor中的方法。
ScheduledThreadPoolExecutor#canRunInCurrentRunState方法
这个方法在任务提交时,任务运行时都会被调用以校验当前状态是否可以运行任务。
boolean canRunInCurrentRunState(boolean periodic) {
/*
* isRunningOrShutdown的参数为布尔值,true则表示shutdown状态也返回true,否则只有running状态返回ture。
* 如果为周期性任务则根据continueExistingPeriodicTasksAfterShutdown来判断是否shutdown了仍然可以执行。
* 否则根据executeExistingDelayedTasksAfterShutdown来判断是否shutdown了仍然可以执行。
*/
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
执行入口方法
ScheduledThreadPoolExecutor任务提交的入口方法主要是execute, schedule, scheduleAtFixedRate以及scheduleWithFixedDelay这几类。
/**
* 覆盖了父类execute的实现,以零延时任务的形式实现。
*/
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 包装ScheduledFutureTask。
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// fixed-rate模式period为正数。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 包装ScheduledFutureTask,默认返回本身。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将构造出的ScheduledFutureTask的outerTask设置为经过包装的结果。
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// fixed-delay模式delay为正数。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// 包装ScheduledFutureTask,默认返回本身。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将构造出的ScheduledFutureTask的outerTask设置为经过包装的结果。
sft.outerTask = t;
delayedExecute(t);
return t;
}
ScheduledThreadPoolExecutor#delayedExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 非RUNNING态,根据饱和策略处理任务。
if (isShutdown())
reject(task);
else {
// 往work queue中插入任务。
super.getQueue().add(task);
/*
* 检查任务是否可以被执行。
* 如果任务不应该被执行,并且从队列中成功移除的话(说明没被worker拿取执行),则调用cancel取消任务。
*/
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
// 参数中false表示不试图中断执行任务的线程。
task.cancel(false);
else
ensurePrestart();
}
}
/**
* 这是父类ThreadPoolExecutor的方法用于确保有worker线程来执行任务。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker数目小于corePoolSize,则添加一个worker。
if (wc < corePoolSize)
addWorker(null, true);
// wc==orePoolSize==0的情况也添加一个worker。
else if (wc == 0)
addWorker(null, false);
}
ScheduledThreadPoolExecutor#reExecutePeriodic方法
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 塞到工作队列中。
super.getQueue().add(task);
// 再次检查是否可以执行,如果不能执行且任务还在队列中未被取走则取消任务。
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
ScheduledThreadPoolExecutor#onShutdown方法
onShutdown方法是ThreadPoolExecutor的一个钩子方法,会在shutdown方法中被调用,默认实现为空。而ScheduledThreadPoolExecutor覆盖了此方法用于删除并取消工作队列中的不需要再执行的任务。
@Override
void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// shutdown是否仍然执行延时任务。
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
// shutdown是否仍然执行周期任务。
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果两者皆不可则对队列中所有RunnableScheduledFuture调用cancel取消并清空队列。
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
/*
* 不需要执行的任务删除并取消。
* 已经取消的任务也需要从队列中删除。
* 所以这里就判断下是否需要执行或者任务是否已经取消。
*/
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
// 因为任务被从队列中清理掉,所以这里需要调用tryTerminate尝试跃迁executor的状态。
tryTerminate();
}
总结
本文介绍了ScheduledThreadPoolExecutor的原理与源码实现。
ScheduledThreadPoolExecutor内部使用了ScheduledFutureTask来表示任务,即使对于execute方法也将其委托至schedule方法,以零延时的形式实现。同时ScheduledThreadPoolExecutor也允许我们通过decorateTask方法来包装任务以实现定制化的封装。
而ScheduledThreadPoolExecutor内部使用的阻塞队列DelayedWorkQueue通过小根堆来实现优先队列的功能。由于DelayedWorkQueue是无界的,所以本质上对于ScheduledThreadPoolExecutor而言,maximumPoolSize并没有意义。整体而言,ScheduledThreadPoolExecutor处理两类任务--延时任务与周期任务。通过ScheduledFutureTask.period的是否为零属于哪一类,通过ScheduledFutureTask.period的正负性来判断属于周期任务中的fixed-rate模式还是fixed-delay模式。并且提供了通过参数来控制延时任务与周期任务在线程池被关闭时是否需要被取消并移除出队列(如果还在队列)以及是否允许执行(如果已经被worker线程取出)。
DelayedWorkQueue使用了Leader/Follower来避免不必要的等待,只让leader来等待需要等待的时间,其余线程无限等待直至被唤醒即可。
DelayedWorkQueue所有的堆调整方法都维护了类型为ScheduledFutureTask的元素的heapIndex,以降低cancel的时间复杂度。
下面整理一下ScheduledThreadPoolExecutor中几个重要参数。
参数总结
- continueExistingPeriodicTasksAfterShutdown
此参数用于控制在线程池关闭后是否还执行已经存在的周期任务。
可以通过setExecuteExistingDelayedTasksAfterShutdownPolicy来设置以及getContinueExistingPeriodicTasksAfterShutdownPolicy来获取。 - executeExistingDelayedTasksAfterShutdown
此参数用于控制在线程池关闭后是否还执行已经存在的延时任务。
可以通过setExecuteExistingDelayedTasksAfterShutdownPolicy来设置以及getExecuteExistingDelayedTasksAfterShutdownPolicy来获取。 - removeOnCancel
此参数用于控制ScheduledFutureTask在取消时是否应该要从工作队列中移除(如果还在队列中的话)。
可以通过setRemoveOnCancelPolicy来设置以及getRemoveOnCancelPolicy来获取。