线程池原理解析
一、为什么需要线程池
线程池是一种线程管理工具
常规的解释有这么几种:
- 线程有自己的栈内存
- 线程创建会发生操作系统调用,比较耗时
- 频繁的线程切换,也会消耗一定的CPU时间片
我自己的理解:
- 对于CPU密集型的任务,比如加解密,视频编解码,CPU的执行能力是有限的,如果执行任务的线程少于CPU核心数,CPU就会空闲;如果恰好等于CPU核心数,那CPU就会满载;如果线程数大于CPU核心数,操作系统就会把单个cpu核心按时间分片分配给多个线程来执行。原本都可以用来计算的cpu资源,就得被分配一部分用来切换线程,而且线程切换,是需要刷新CPU缓存的,也需要一定的时间,并且线程本身也会占用一定的内存,所以对于计算类型的任务,同时执行的线程数超过CPU最大线程数,是没有意义的,反而会拖慢处理过程,消耗过大的内存,甚至降低系统的稳定性。
- 对于IO密集型的任务,比如网络请求,文件读写,其实IO阻塞的时候,是不消耗CPU资源的。所以线程越多,执行速度越快。但是网络的速度和磁盘的速度是有限制的,在未达到IO瓶颈的时候,增加线程是可以增加处理速度的,达到瓶颈以后,增加线程,是会降低处理速度的,还会因为资源占用过多降低系统的稳定性。
所以就需要使用线程池来管理线程,尽可能的降低资源占用,提高CPU使用率。
二、怎么使用线程池
1. 通过ThreadPoolExecutor直接创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
稍微解释一下各个参数的作用
corePoolSize
:核心线程数量
核心线程不会回收,即使已经没有任务;但是核心线程是添加任务才启动的,并不是一开始就启动的
maximumPoolSize
:允许的最大线程数量
用来控制线程池中的最大线程数量。当BlockingQueue,是一个无界或者是容量很大时,这个参数是不起作用的;
keepAliveTime
:当线程数量超过核心线程时,闲置的线程存活时长
TimeUnit unit
:keepAliveTime
的时间单位
BlockingQueue<Runnable> workQueue
:任务队列,用来保存任务
设置一个无界或者是容量很大的队列,会导致task 很久才被执行
设置SynchronousQueue,任务会立即得到执行,如果有限制的线程,会让闲置的线程执行任务,否则会新开启线程执行任务
LinkedBlockingQueue 比ArrayBlockingQueue 更加适用一般的场景
ThreadFactory threadFactory
:线程工厂类,用来创建线程
可以通过这个来统一的配置线程,比如设置线程名称
RejectedExecutionHandler handler
:线程池不能添加任务时的拒绝策略(超过了线程池的承载容量或者是线程池已经关闭)
默认的几种
- DiscardOldestPolicy 删除最老的任务,然后尝试重新提交任务,如果线程池已经关闭,则无任何处理
- AbortPolicy 抛出RejectedExecutionException
- CallerRunsPolicy 在提交任务的线程执行任务,如果线程池已经关闭,则无任何处理
- DiscardPolicy 空实现,忽略问题
2. 使用Executors的这个工程类来创建线程池
Executors.newCachedThreadPool 创建一个缓存的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特性:
- 没有核心线程
- 线程没有任务会在60s后退出
- 提交任务会立即执行,且最大的线程数量是Integer.MAX_VALUE
适合IO密集型的任务,且要求实时性的情况,比如网络请求
Executors.newFixedThreadPool 创建一个固定线程数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
特性:
- 线程数固定
- 允许提交的任务数量为Integer.MAX_VALUE
适合需要限制并发数的情况,比如多线程限制,例如最多开启3个线程(网速一定时,增大线程数量并不会提高下载速度)
Executors.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
特性:
- 通过FinalizableDelegatedExecutorService代理了线程池,当对象被回收时,线程池会被关闭
- 相当于newFixedThreadPool的特殊情况,线程数量为1
使用场景,某些场景需要单线程模型时
Executors.newScheduledThreadPool 先忽略
Executors.newWorkStealingPool 先忽略
三、线程池的原理解析
状态流转
线程池流程图.png- shutdown()和shutdownNow()的区别
-
shutdown()
关闭线程池- 设置线程池状态为SHUTDOWN
- 中断所有闲置线程
-
shutdownNow()
立即关闭线程池- 设置线程池状态为STOP
- 中断全部线程
- 清空并返回等待中的任务队列
- Tidying 只是一个临时状态
final void tryTerminate() {
for (;;) {
// 省略一段代码
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
protected void terminated() { }
TIDYING
可以看到执行完钩子函数terminated()
,就变成了TERMINATED
提交任务的过程
线程池提交任务.png-
execute(Runnable)
提交一个任务- 如果线程池已经关闭,会执行拒绝策略
- 如果任务队列满了,且工作线程数量已经达到了最大线程数量,会执行拒绝策略
- 当
BlockingQueue
是无界的或者容量很大时,将不会创建非核心线程
线程池工作线程的执行流程
线程池工作线程的执行流程.png- 工作线程如何获取任务
超时等待
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
一直等待
workQueue.take();
四、查漏补缺
BlockingQueue
BlockingQueue基本使用
- ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
- LinkedBlockingQueue
以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
- SynchronousQueue
它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素
ThreadPoolExecutor 其他的一些有用的函数
-
awaitTermination()
等待线程池关闭
轮训的方式等待线程池关闭,会阻塞线程
-
prestartCoreThread()
预启动一个核心线程 -
prestartAllCoreThreads()
预启动全部核心线程
可以预启动线程,来提高系统响应时间
-
allowCoreThreadTimeOut()
设置允许核心线程超时 -
remove()
从任务队列中移除任务 -
purge()
移除任务队列中,已经取消的Future
-
getPoolSize()
获取线程数量 -
getActiveCount()
获取正在执行任务的线程数量 -
getLargestPoolSize()
获取线程池中出现过的最大的线程数量 -
getTaskCount()
获取总任务的个数
总任务的个数=已经完成的任务个数+正在执行的任务的个数+等待队列中的任务的个数
-
getCompletedTaskCount()
获取已经完成的任务的个数
五、总结
- 使用线程池可以更好的管理线程资源
- 需要根据情况配置合理的参数
欠缺内容
- Rxjava 中的线程池
- Kotlin Coroutine中的线程池
附录:
- 相关源码取自JDK1.8