如何实现一个线程调度框架
一、前言
线程是程序执行流的最小单元,很基础,也很重要。
为了提高流畅性,耗时任务放后台线程运行,这是APP开发的常识了。
随着APP复杂度的提升,越来越多任务需要开线程执行,同时,遇到如下挑战:
- 任务场景多样化,常规的API无法满足;
- 随着组件化,模块化等演进,可能使得线程管理不统一(比如多个线程池)。
为此,我们今天来探讨一下的如何设计线程调度。
话不多说,从线程池开始吧。
二、线程池
2.1 ThreadPoolExecutor
为了减少线程创建和销毁带来的时间和空间上的代价,开发中通常会用到线程池。
JDK提供了一个很好用的线程池的封装:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心线程大小
maximumPoolSize:线程池最大容量(需大于等于corePoolSize,否则会抛异常)
keepAliveTime:线程执行任务结束之后的存活时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略
线程池中有两个任务容器:
private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
前者用于存储工作者线程,后者用于缓冲任务。
值得一提的是,maximumPoolSize限定的是workers的容量,和workQueue无关。
一个任务到来,假设此时容器workers中的线程数为n,则
- 当n < corePoolSize时,创建线程来执行这个任务,并将线程放入workers;
- 当n >= corePoolSize时,
- 若workQueue未满,则将任务放入workQueue
- 若workQueue已满,
- 若n < maximumPoolSize, 创建线程来执行这个任务,并将线程放入workers;
- 若n >= maximumPoolSize, 执行拒绝策略。
当任务执行结束,线程会存活keepAliveTime的时间;
时间到,
如果allowCoreThreadTimeOut为true, 或者 n > corePoolSize, 线程销毁;
否则,线程进入等待,直到新的任务到来(或者线程池关闭)。
关于workQueue,有两个极端:
- new SynchronousQueue<Runnable>(): 容量为零,一个任务装也不进;
- new LinkedBlockingQueue<Runnable>(): 无限容量,多少任务都装不满。
2.2 Executors
为了方便使用,JDK还封装了一些常用的ExecutorService:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
类型 | 最大并发 | 适用场景 |
---|---|---|
newFixedThreadPool | nThreads | 计算密集型任务 |
newSingleThreadExecutor | 1 | 串行执行的任务 |
newCachedThreadPool | Integer.MAX_VALUE | IO密集型任务 |
newScheduledThreadPool | Integer.MAX_VALUE | 定时任务,周期任务 |
众多ExecutorService中,newCachedThreadPool() 是比较特别的,
1、corePoolSize = 0,
2、maximumPoolSize = Integer.MAX_VALUE,
3、workQueue 为 SynchronousQueue。
效果是:所有任务立即调度,无容量限制,无并发限制。
这样的特点比较适合网络请求任务。
OkHttp的异步请求所用线程池与此类似(除了ThreadFactory ,其他参数一模一样)。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
2.3 线程池大小的估算
一台设备上,给定一批任务,要想最快时间完成所有任务,并发量应该如何控制?
一些文章提到如下估算公式:

M:并发数;
C:任务占用CPU的时间;
I:等待IO完成的时间(为简化讨论,且只考虑IO);
N:CPU核心数。
遗憾的是,对于APP来说,这条公式并不适用:
-
任务占用CPU时间和IO时间无法估算
APP上的异步任务通常是碎片化的,而不同的任务性质不一样,有的计算耗时多,有的IO耗时多;
然后同样是IO任务,比方说网络请求,IO时间也是不可估计的(受服务器和网速影响)。 -
可用CPU核心可能会变化
有的设备可能会考虑省电或者热量控制而关闭一些核心;
大家经常吐槽的“一核有难,九核围观”映射的就是这种现象。
虽然该公式不能直接套用来求解最大并发,但仍有一些指导意义:
IO等待时间较多,则需要高的并发,来达到高的吞吐率;
CPU计算部分较多,则需要降低并发,来提高CPU的利用率。
换言之,就是:
做计算密集型任务时控制并发小一点;
做IO密集型任务时控制并发大一点。
比如RxJava就提供了Schedulers.computation()和Schedulers.io(),
前者默认情况下为最大并发为CPU核心数,后者最大并发为Integer.MAX_VALUE。
三、线程框架
JDK提供线程池是比较基础,通用的API。
APP开发中,大家通常会使用一些为特定场景做对应的封装框架,比如AsyncTask和RxJava。
AsyncTask的定位是“方便异步任务和主线程交互”的“轻量级线程框架”,RxJava 则不仅仅是线程框架,其内涵更加丰富。
AsyncTask自诞生之初就被广泛吐槽,但是对其源码分析倒是乐此不彼;
RxJava开始在Android中普及的阶段,AsyncTask又被锤了一遍;
到现在很少人提AsyncTask了,零零星星地会被提起。
其实AsyncTask刨去注释只有三百多行代码,而RxJava的jar包有两M多,犹如单车和汽车,各有各的定位。
我们就不做太多的比较了,这里主要是提一下,承上启下的作用。
AsyncTask可能因为其定位的原因,设计有些保守,但总的来说实现简单,构思精巧,还是有不少地方值得借鉴的。
接下来,我们以AsyncTask为蓝本,结合APP开发中的使用场景,探讨如何设计一个适用性更强的线程框架。
四、线程调度
4.1 线程复用
第二节中我们分析了线程池和几个ExecutorService,结论是不同的任务特征,用不同的调度器。
但是,比方说如果直接调用 newFixedThreadPool 和 newSingleThreadExecutor 来分别执行任务的话,
会有两个线程池,彼此的任务不能复用线程,造成浪费。
对此,AsyncTask给我们提供了一种思路。
先看代码:
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
/**
* An {@link Executor} that executes tasks one at a time in serial
* order. This serialization is global to a particular process.
*/
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
先定义一个线程池THREAD_POOL_EXECUTOR,并行任务可以调用此Executor来执行;
封装一个SerialExecutor,加了一个任务队列,控制加入的任务串行执行,但是最终还是运行在THREAD_POOL_EXECUTOR上。
于是,调用者可以选择串行或者并行,且是在同一个线程池中调度的,线程可以复用。
这里抠一下细节:
1、源码中THREAD_POOL_EXECUTOR的注释,“execute tasks in parallel”。
parallel, 并行;Concurrent,并发。
个人认为此处应为“并发”,参考:并发与并行的区别。
2、SERIAL_EXECUTOR的注释:“This serialization is global to a particular process.”。
这里没有什么错误,但是要注意一个词,global。
global, 意味着不同的任务公用一个串行队列,可能会彼此阻塞。
在3.0之后, AsyncTask默认调度器是这个SERIAL_EXECUTOR。
关于这个设定,印象最深的是这位开发者遇到的“坑”:《使用AsyncTask时需要注意的隐含bug》
简单地说,就是他同时用了两个SDK,一个用来做图片剪裁,一个是facebook的广告SDK。
然后发现图片加载不出来,经过核查发现两个SDK都用了AsyncTask, 但是都是用的串行的Executor。
国内访问外网速度偏慢,所以facebook的SDK阻塞了后面的任务(图片剪裁)。
后来作者给这个图片剪裁库的开发者提了issue:Android-Image-Cropper, issues 183
关于这个问题,简单的解决方法是不同的任务用不同的SerialExecutor,共用线程池,但各自串行执行,互不干扰。
后面我们会介绍其他方案,接下来先继续分析Executor。
4.2 封装Executor
4.2.1 任务分组
上面我们看到,AsyncTask通过Executor包装Executor, 创建了SerialExecutor,增加了串行执行的能力。
这种技巧我们在JDK的InputStream和OutputStream也领略过了,大家称之为“装饰者模式”。
虽然拓展串行执行的能力,但是还是不支持分组并发。
为此,我们**仿照SerialExecutor封装一个Executor:
open class PipeExecutor @JvmOverloads constructor(
windowSize: Int,
private val capacity: Int = -1,
private val rejectedHandler: RejectedExecutionHandler = defaultHandler) : TaskExecutor{
private val tasks = PriorityQueue<PriorityRunnable>()
private val windowSize: Int = if (windowSize > 0) windowSize else 1
private var count = 0
companion object {
val defaultHandler = ThreadPoolExecutor.AbortPolicy()
}
@Synchronized
override fun execute(r: Runnable, tag: String, priority: Int, finish: (tag: String) -> Unit) {
if(capacity > 0 && count + tasks.size() >= capacity){
rejectedHandler.rejectedExecution(r, TaskCenter.executor)
}
val active = PriorityRunnable(r, tag, finish)
if (count < windowSize || (windowSize > 1 && priority == Priority.IMMEDIATE)) {
startTask(active)
} else {
tasks.offer(active, priority)
}
}
override fun execute(r: Runnable) {
execute(r, "")
}
// ......
private fun startTask(active: Runnable?) {
if (active != null) {
count++
// 线程池封装在 TaskCenter 中,任务最终在该线程池中执行
TaskCenter.poolExecutor.execute(active)
}
}
}
class PriorityRunnable internal constructor(
private val r: Runnable,
private val tag: String,
private val finish: (tag: String) -> Unit) : Runnable {
override fun run() {
try {
r.run()
} finally {
scheduleNext()
if(!tag.isEmpty()){
finish(tag)
}
}
}
// ......
}
解析一下代码中的参数:
windowSize:控制Executor的并发;
capacity:Executor容量,-1时为不限容量,超过容量触发rejectedHandler;
rejectedHandler:默认为AbortPolicy(抛出异常);
priority:调度优先级,当任务数量超过windowSize时,priority高者先被调度;
tag:任务标识;
finish: 任务结束后触发此回调,搭配tag完成一项功能(接下来会有介绍)。
使用时,可以实例化多个PipeExecutor,他们各自根据参数调度自己的任务队列,但最终都是在同一个线程池中运行。
比方说可以创建windowSize设置为cpu数量的PipeExecutor,用于计算密集型任务;
也可以创建windowSize多一点的PipeExecutor,用于IO密集型任务;
还可以windowSize=1的,用于串行执行。
PipeExecutor支持优先级,当优先级设定为IMMEDIATE且windowSize>1时为立即执行。
因为设置windowSize=1通常是想串行执行,故而不能让其立即执行。
优先级相同的任务,遵循先进先出(FIFO)的调度规则。
4.2.2 任务去重
APP开发中常会遇到任务重复的情况。
比方说一个页面所展示的数据可能来自多个数据源,而每个数据源的变更入口有多个,当同时有几个数据变更时,如果不做去重,会浪费计算资源,甚至使得APP卡顿;
又如,有几个数据项所记录的是同一张图片,需要上传,然后更新路径为服务端回传的URL,如果数据上传是并发的,会导致图片重复上传。
说到去重,首先要定义重复;
要定义重复,就要给任务设定标识,相同标识视为重复。
所以TaskExecutor给到的execute方法可以传tag参数,用tag标识一类任务。
不同的任务类型,去重策略也不一样。
1、数据刷新任务
当刷新任务在执行时,忽略后面的任务。不妥。忽略后面的任务,可能造成页面没有正确更新。
有任务正在执行,取消之,新建任务。也不妥。取消前面的任务,极端情况下(比如间隔性持续有刷新通知到达),可能会造成页面迟迟得不到更新。
这类任务的特征是,当任务未开始,一个和多个是等价的,故此对应的策略为:当有任务在执行时,保留一个任务在队列,忽略后来者。
其示意图如下:

其特征为:
不相同tag的任务并发,想同tag的任务串行;
但是tag相同的任务,最多只能存2个,更多的后来者将会被忽略。
进入调度的任务也不一定会被马上执行,只是被放到PipeExecutor中,进行下一层的调度。
2、图片加载任务
图片加载任务通常用图片的路径作为tag。
但图片加载除了path之外,还有target(要加载到哪个ImageView)。
所以不能采用上面的“忽略后来者”的策略,否则有可能导致有的ImageView加载不出图片(多个ImageView需要加载同一张图片的情况)。
把target混入tag?不行。有可能导致重复下载或者重复解码。
而如果让path相同的加载任务串行,则可以复用缓存。
从这个角度看,也是一种“去重”。
对应示意图如下:

其调度模式和前面的“数据刷新任务”很像,只是没有"ignore"。
从另一个角度看,这种模式可以用于执行“串行的任务”,只需要给同类的任务加tag即可。
这样的话就不用到处创建windowSize=1的PipeExecutor了。
任务去重的实现如下:
class LaneExecutor(private val executor: PipeExecutor, private val limit: Boolean = false) : TaskExecutor {
private val scheduledTasks = HashMap<String, Runnable>()
private val waitingQueues by lazy { HashMap<String, CircularQueue<TaskWrapper>>() }
private val waitingTasks by lazy { HashMap<String, TaskWrapper>() }
private class TaskWrapper(val r: Runnable, val priority: Int)
private val finishCallback: (tag: String) -> Unit = { tag ->
synchronized(LaneExecutor@ this) {
scheduledTasks.remove(tag)
if (limit) {
waitingTasks.remove(tag)?.let { start(it.r, tag, it.priority) }
} else {
waitingQueues[tag]?.let {
val wrapper = it.poll()
if (wrapper == null) {
waitingQueues.remove(tag)
} else {
start(wrapper.r, tag, wrapper.priority)
}
}
}
}
}
@Synchronized
override fun execute(r: Runnable, tag: String, priority: Int, finish: (tag: String) -> Unit) {
if (scheduledTasks.containsKey(tag)) {
if (limit) {
if (waitingTasks.containsKey(tag)) {
if (r is Future<*>) {
r.cancel(false)
}
} else {
waitingTasks[tag] = TaskWrapper(r, priority)
}
} else {
val queue = waitingQueues[tag]
?: CircularQueue<TaskWrapper>().apply { waitingQueues[tag] = this }
queue.offer(TaskWrapper(r, priority))
}
} else {
start(r, tag, priority)
}
}
private fun start(r: Runnable, tag: String, priority: Int) {
scheduledTasks[tag] = r
executor.execute(r, tag, priority, finishCallback)
}
}
PipeExecutor和LaneExecutor的关系如下图:

之前PipeExecutor通过装饰者模式,在ThreadPoolExecutor之上包装了一层,拓展了分组,优先级等特性,
如今LaneExecutor在PipeExecutor上又包了一层,拓展了去重的特性。
关于组合和继承,普遍的观点是组合优先于继承。
所以在设计LaneExecutor时,用PipeExecutor作为成员而非继承于PipeExecutor。
4.3 全局调度
当项目复杂度到了一定程度,如果没有相对严格的规范约束的话,可能会看到各种各样的冗余对象,比如缓存和Executor。
因为不想被其他模块所干扰,或者图方便,开发者可能会在自己的模块定义自己的Executor。
分散的Executor有隔离的效果(不会相互影响),但副作用就是无法集中管控,过多的线程并发执行可能会导致资源争抢以及带来更多线程切换代价。
如果各自创建的原生JDK提供的线程池,则还要加上一条:降低线程复用。
故此,可以集中定义执行器,各模块统一调用。
object TaskCenter {
private val cpuCount = Runtime.getRuntime().availableProcessors()
// ......
// standard Executor
val io = PipeExecutor(16, 512)
val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 6), 256)
// use to execute tasks which need to run in serial,
// such as writing logs, reporting app info to server ...
val lane = LaneExecutor(PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512))
// use to execute general tasks,such as loading data.
val laneIO = LaneExecutor(io, true)
val laneCP = LaneExecutor(computation, true)
}
很多开源项目都设计了API来使用外部的Executor,例如RxJava的话可以这样使用:
object TaskSchedulers {
val io: Scheduler by lazy { Schedulers.from(TaskCenter.io) }
val computation: Scheduler by lazy { Schedulers.from(TaskCenter.computation) }
val single by lazy { Schedulers.from(PipeExecutor(1)) }
}
使用:
Observable.range(1, 8)
.subscribeOn(TaskSchedulers.computation)
.subscribe { Log.d(tag, "number:$it") }
五、拓展AsyncTask
通过上面构造的相对完善的Executor,我们可以用于扩展AsyncTask。
通过继承AsyncTask无法做到我们预想的效果,所以没办法,只能重新写一个了。
限于篇幅,这里就不分析具体实现了。
大体框架还是Executor + Handler, 只是Executor换上了TaskExecutor,以及添加生命周期(被锤得最多的缺点之一)的支持。
简单地说,就是通过观察者模式实现对宿主的生命周期(onDestroy, onPause, onResume)的监听,在onDestroy是取消任务,在onPause时降低优先级,在onResume时恢复优先级。
这里补充一点,关于AsyncTask的cancel, 有不少文章说不一定能立即取消任务。
确实是不一定能立即取消,但这其实是合理的。
当调用AsyncTask的cancel(mayInterruptIfRunning), 并传入true时,会触发interrupt()。
关于interrupt()知乎上有不错的讨论:Java里一个线程调用了Thread.interrupt()到底意味着什么。
interrupt() 虽然不能保证马上终止任务,但是能够中断sleep(), wait()等方法;
如果使用OkHttp, interrupt()能够中断网络请求。
为什么不用Thread.stop()呢? Thread.stop()是个危险的方法。
比方说一个线程正在写入数据,如果突然中止,可能数据就不对了;
更有甚者,可能导致文件不完整,可能导致文件的数据都丢失了。
拓展后用法和原生的AsyncTask用法是类似的,只是多了一些方法,以提供额外的功能,例如优先级,以及监听Activity/Fragment生命周期。
六、下载
implementation 'com.horizon.task:task:1.0.1'
相关代码已上传GitHub,
项目地址:https://github.com/No89757/Task