线程池
一、线程池的好处,详解,单例
1.1 线程池的好处
Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时, 不会有 T1,T3 的开销了。
- 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
假设一个服务器一天要处理 50000 个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为 50000。 一般线程池大小是远小于 50000。所以利用线程池的服务器程序不会为了创建 50000 而在处理请求时浪费时间,从而提高效率。
1.1.1 线程池的重用
线程的创建和销毁的开销是巨大的,而通过线程池的重用大大减少了这些不必要的开销,当然既然少了这么多消费内存的开销,其线程执行速度也是突飞猛进的提升。
1.1.2 控制线程池的并发数
并发:在某个时间段内,多个程序都处在执行和执行完毕之间;但在一个
时间点上只有一个程序在运行。 例如:老鹰妈妈喂小雏鹰食物,小雏鹰很多,而老鹰只有一张嘴,她需要一个个喂过去,到最后每个小雏鹰都可以吃到,但是在一个时间点里只能有一个小雏鹰可以吃到美味的食物。
并行:在某个时间段里,每个程序按照自己独立异步的速度执行,程序之
间互不干扰。 例如:这就好似老鹰妈妈决定这样喂食太费劲于是为每个小雏鹰请了个保姆,这样子在一个时间点里,每个小雏鹰都可以同时吃到食物,而且互相不干扰。
控制线程池的并发数可以有效的避免大量的线程池争夺 CPU 资源而造成堵塞。例如:还是拿老鹰的例子来讲,妈妈只有一个,要这么一个个喂下去,一些饿坏的小雏鹰等不下去了就要破坏规则,抢在靠前喂食的雏鹰面前,而前面的雏鹰也不是吃软饭的,于是打起来了,场面混乱。老鹰生气了,这么不懂事,谁也别吃了,于是造成了最后谁也没食吃的局面。
1.1.3 线程池可以对线程进行管理
线程池可以提供定时、定期、单线程、并发数控制等功能。比如通过
ScheduledThreadPool线程池来执行S秒后,每隔N秒执行一次的任务。
1.2 线程池的详解
1.2.1 ThreadPoolExecutor 的类关系
- Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。
- ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit() 的扩展,可以说是真正的线程池接口;
- AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法;
- ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
- ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期 执行"功能 ExecutorService;
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令, 或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
线程池的创建各个参数含义
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
......
}
-
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;
如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。 -
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize -
keepAliveTime
线程空闲时的存活时间,这个时间指的是,当线程池中的线程数量大于核心线程数 corePoolSize,这些线程闲着之后,多久销毁它们。
TimeUnit unit keepAliveTime 的时间单位 -
workQueue
workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的 corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能。
workQueue 用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
(1)当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待, 因此线程池中的线程数不会超过 corePoolSize。
(2)由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数。
(3)由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。
(4)更重要的,使用无界 queue 可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、 SynchronousQueue、PriorityBlockingQueue。 -
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
参见下文代码 ThreadPoolAdv。
Executors 静态工厂里默认的 threadFactory,线程的命名规则是“pool-数字 -thread-数字”。 -
RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
(1)AbortPolicy:表示拒绝任务并抛出一个异常RejectedExecutionException
,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
1.2.2 其它线程池的用法
(1)FixedThreadPool 详解
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
这个线程池的特点是:
- 线程池中的线程数量是固定的,也是我们创建线程池时需要穿入的参数;
- 超出这个数量的线程就需要在队列中等待。
创建使用固定线程数的 FixedThreadPool 的 API。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads。
当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把 keepAliveTime 设 置为 0L,意味着多余的空闲线程会被立即终止。
FixedThreadPool 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列 (队列的容量为 Integer.MAX_VALUE)。
(2)SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建使用单个线程的 SingleThread-Executor 的 API,于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
corePoolSize 和 maximumPoolSize 被设置为 1。其他参数与 FixedThreadPool 相同。SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工 作队列(队列的容量为 Integer.MAX_VALUE)。
(3) CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心线程池数量为 0,也就是它不会永久保留任何线程;
- 最大容量是 Integer.MAX_VALUE;
- 每个线程的存活时间是 60 秒,也就是如果 1 分钟没有用这个线程就被回收了;
- 最后用到了同步队列。
创建一个会根据需要创建新线程的 CachedThreadPool 的 API。大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
FixedThreadPool 和 SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列。CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时, CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源。
(4)WorkStealingPool
利用所有运行的处理器数目来创建一个工作窃取的线程池,使用 ForkJoin 实现。
(5)ScheduledThreadPoolExecutor
使用工厂类 Executors 来创建。Executors 可以创建 2 种类型的 ScheduledThreadPoolExecutor,如下。
- ScheduledThreadPoolExecutor。包含若干个线程的 ScheduledThreadPoolExecutor。
- SingleThreadScheduledExecutor。只包含一个线程的 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor 适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
SingleThreadScheduledExecutor 适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
1.3 线程池的单例
那么问题来了,我线程池用的好好的,用的时候创建一个,不用就不管他,那为什么要将线程池设计成单例模式呢。那么就要看看你将线程池应用的场所了。一般情况下,整个系统中只需要单种线程池,多个线程公用一个线程池,不会是每创一个线程就要创建一个线程池,那样子你还不如不用线程池呢。
在 ThreadPool 类里面实现线程池的创建,我们这里创建的是 FixedThreadPool 线程池(记住构造方法要私有,保证不被其他类实例化)
public class ThreadPool {
/**
* 线程池中核心线程的数量
*/
private int corePoolSize;
/**
* 线程池中最大线程数量
*/
private int maximumPoolSize;
/**
* 非核心线程的超时时长
*/
private long keepAliveTime;
private ThreadPoolExecutor mExecutor;
private volatile static ThreadPool mThreadPool = null;
public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
}
public void executor(Runnable runnable) {
if (runnable == null) {
return;
}
if (mExecutor == null) {
mExecutor = new ThreadPoolExecutor(
corePoolSize,// 核心线程数
maximumPoolSize,// 最大线程数
keepAliveTime, // 闲置线程存活时间
TimeUnit.MILLISECONDS, // 时间单位
new LinkedBlockingDeque<Runnable>(),// 线程队列
Executors.defaultThreadFactory(),// 线程工厂
new ThreadPoolExecutor.AbortPolicy()//队列已满,而且当前线程数已经超过最大线程数时的异常处理策略
);
}
mExecutor.execute(runnable);
}
// 获取单例的线程池对象
public static ThreadPool getThreadPool() {
if (mThreadPool == null) {
synchronized (ThreadPool.class) {
if (mThreadPool == null) {
// 获取处理器数量
int cpuNum = Runtime.getRuntime().availableProcessors();
// 根据 cpu数量,计算出合理的线程并发数
int threadNum = cpuNum * 2 + 1;
mThreadPool = new ThreadPool(threadNum, threadNum, 0L);
}
}
}
return mThreadPool;
}
}
二、线程池的优点及其原理
2.1 使用线程池的好处
池化技术应用:线程池、数据库连接池、http连接池等等。
池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用
率。
线程池提供了一种限制、管理资源的策略。 每个线程池还维护一些基本统
计信息,例如已完成任务的数量。
使用线程池的好处:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成
的消耗。 - 提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执
行。 - 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会
消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的
分配,监控和调优。
线程池的工作机制
线程池在内部实际上采用了生产者消费者模型将线程和任务解藕,从而使线程池同时管理任务和线程。
当任务提交到线程池里之后,需要经过以下流程:
1)如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意, 执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于 corePoolSize,则将任务加入阻塞队列 BlockingQueue
。
3)如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务。
4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
提交任务
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
submit()
方法用于提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,而使用 get (long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这 时候有可能任务没有执行完。
关闭线程池
可以通过调用线程池的 shutdown
或 shutdownNow
方法来关闭线程池。它们 的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别, shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown
方法就会返回 true。 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed
方法 会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任 务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完, 则可以调用 shutdownNow 方法。
合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来 分析。
- 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
- 任务的优先级:高、中和低。
- 任务的执行时间:长、中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。
CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。 由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*Ncpu。
混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()
方法获得当前设备的 CPU 个数。
对于 IO 型的任务的最佳线程数,有个公式可以计算
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
- NCPU 是处理器的核的数目
- UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)
- W/C 是等待时间与计算时间的比率
等待时间与计算时间我们在 Linux 下使用相关的 vmstat 命令或者 top 命令查 看。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可 以让优先级高的任务先执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先 级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果, 等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样 才能更好地利用 CPU。
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
假设,我们现在有一个 Web 系统,里面使用了线程池来处理业务,在某些 情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异 常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台 任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的 工作线程全部阻塞,任务积压在线程池里。
如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
提交定时任务
//向定时任务线程池提交一个延时 Runnable 任务(仅执行一次)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
//向定时任务线程池提交一个延时的 Callable 任务(仅执行一次)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//向定时任务线程池提交一个固定时间间隔执行的任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
//向定时任务线程池提交一个固定延时间隔执行的任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从理论上讲是确定的,当然执行任务的时间不能超过执行周期。
固定延时间隔的任务是指每次执行完任务以后都延时一个固定的时间。由于操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间是不确定的,也就导致了每次任务的执行周期存在一定的波动。
定时任务超时问题
scheduleAtFixedRate 中,若任务处理时长超出设置的定时频率时长,本次任 务执行完才开始下次任务,下次任务已经处于超时状态,会马上开始执行。
若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务会在定时器等待频率时长后执行。
如下例子:
设置定时任务每 60s 执行一次,那么从理论上应该第一次任务在第 0s 开始, 第二次任务在第 60s 开始,第三次任务在 120s 开始,但实际运行时第一次任务 时长 80s,第二次任务时长 30s,第三次任务时长 50s,则实际运行结果为:
- 第一次任务第 0s 开始,第 80s 结束;
- 第二次任务第 80s 开始,第 110s 结束(上次任务已超时,本次不会再等待 60s, 会马上开始);
- 第三次任务第 120s 开始,第 170s 结束.
- 第四次任务第 180s 开始.....
自定义线程工厂 ThreadPoolAdv.java
/**
* 类说明:自定义线程池中线程的创建方式,把线程设置为守护线程
*/
public class ThreadPoolAdv {
static class Worker implements Runnable {
private String taskName;
private Random r = new Random();
public Worker(String taskName) {
this.taskName = taskName;
}
public String getName() {
return taskName;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+ " process the task : " + taskName);
SleepTools.ms(r.nextInt(100) * 5);
}
}
private static class MyThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "Mark_" + count.getAndIncrement());
t.setDaemon(true);
System.out.println("create " + t);
return t;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = new ThreadPoolExecutor(2,
4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
new MyThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i <= 6; i++) {
Worker worker = new Worker("worker " + i);
System.out.println("A new task has been added : " + worker.getName());
threadPool.execute(worker);
}
}
}
打印结果:
A new task has been added : worker 0
create Thread[Mark_0,5,main]
A new task has been added : worker 1
create Thread[Mark_1,5,main]
Mark_0 process the task : worker 0
A new task has been added : worker 2
Mark_1 process the task : worker 1
A new task has been added : worker 3
A new task has been added : worker 4
A new task has been added : worker 5
A new task has been added : worker 6
扩展线程池 ThreadPoolExt.java
/**
* 类说明:扩展线程池的使用范例
*/
public class ThreadPoolExt {
static class Worker implements Runnable {
private String taskName;
private Random r = new Random();
public Worker(String taskName) {
this.taskName = taskName;
}
public String getName() {
return taskName;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+ " process the task : " + taskName);
SleepTools.ms(r.nextInt(100) * 5);
}
}
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.DiscardOldestPolicy()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("Ready Execute " + ((Worker) r).getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("Complete Execute " + ((Worker) r).getName());
}
@Override
protected void terminated() {
System.out.println("线程池退出 ");
}
};
for (int i = 0; i <= 6; i++) {
Worker worker = new Worker("worker " + i);
System.out.println("A new task has been added : " + worker.getName());
threadPool.execute(worker);
}
threadPool.shutdown();
}
}
打印结果:
A new task has been added : worker 0
A new task has been added : worker 1
A new task has been added : worker 2
Ready Execute worker 0
A new task has been added : worker 3
A new task has been added : worker 4
Ready Execute worker 1
pool-1-thread-2 process the task : worker 1
A new task has been added : worker 5
pool-1-thread-1 process the task : worker 0
A new task has been added : worker 6
Complete Execute worker 1
Ready Execute worker 2
pool-1-thread-2 process the task : worker 2
Complete Execute worker 0
Ready Execute worker 3
pool-1-thread-1 process the task : worker 3
Complete Execute worker 3
Ready Execute worker 4
pool-1-thread-1 process the task : worker 4
Complete Execute worker 2
Ready Execute worker 5
pool-1-thread-2 process the task : worker 5
Complete Execute worker 4
Ready Execute worker 6
pool-1-thread-1 process the task : worker 6
Complete Execute worker 6
Complete Execute worker 5
线程池退出