ThreadPoolExecutor看懂、学会
我们知道,线程是任务执行的机制,为了让更多的任务更快的执行,通常会选择多线程技术,而线程池就是简化多线程开发的最佳方案。
使用线程池的好处,总结下主要有一下几点:
- 集中对线程资源进行管理,复用线程资源,使能够合理的规划服务器资源;
- 解耦任务提交过程和执行过程,开发人员能更加专注于任务本身,而不必过多的关注线程的执行;
- 平衡系统负载、吞吐量和响应时间,保护系统在任务过多时、性能平缓降低。
线程池实现
每个线程都有线程空间也就是线程栈,保存栈帧(包含:操作数栈、局部变量表等),也就是方法调用链。随着方法的调用完成,线程所占用的资源也将被系统回收。
线程要怎样才不被回收?或者说,控制它何时被回收?
思路就是一直循环,只要不跳出循环,线程就不会被回收。可以通过控制何时线程跳出循环,来达到销毁线程的目的。
ThreadPoolExecutor的实现
以下我修剪后的代码,可以看出,用的是阻塞队列的超时和挂起来控制线程的生命周期。
//这个是运行任务的具体方法
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) //当getTask返回null时,循环跳出,线程回收
task.run();
}
//获取任务
private Runnable getTask() {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //判断线程是否可以超时回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //通过keepAliveTime来控制线程空闲死亡时间
workQueue.take();//如果线程不允许回收,那么就会一直挂起
if (r != null)
return r;
}
线程数控制
线程池的目的就是避免线程数的失控以及复用线程。主要有三个参数来控制:corePoolSize、maximumPoolSize、workQueue。
- 当前线程总数小于corePoolSize时,不管有没有线程闲置,都会为新的任务创建新的线程,不会进行线程的复用;
- 当线程数等于corePoolSize,新到来的任务会进入队列中等待,等待的任务会被将来空闲的线程执行;
- 如果线程数等于corePoolSize且队列已经满了,会new新线程,直到线程数量等于maximumPoolSize;
- 当上面条件都验证后,那么在到来的任务就会进入拒绝服务,是直接放弃,还是抛出异常等各种策略可以定制。
下面代码我进行了注释:
public void execute(Runnable command) {
int c = ctl.get(); //ctl是atomicInteger类型,通过不同的计算方式得到线程池状态和线程数量
if (workerCountOf(c) < corePoolSize) { //线程总数小于corePoolSize
if (addWorker(command, true)) //为新的任务创建新的线程
return;
}
if (isRunning(c) && workQueue.offer(command)) { //不能新建线程,那么就入队列(offer会立刻返回,失败为false)
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //创建超过corePoolSize数量的线程
reject(command);
}
Executors线程池工厂
通过Executors提供的静态工厂方法,我们可以简单的使用线程池。
方法 | 介绍 |
---|---|
newFixedThreadPool(int nThreads) | 固定线程的线程池,任务会堆积在队列等待执行 |
newSingleThreadExecutor() | 一个线程的线程池,任务在队列中等待串行执行 |
newCachedThreadPool() | 最大线程数为Integer.MAX_VALUE的线程池,可以认为没有队列缓冲,没有空闲线程就创建新线程 |
以上线程池直接使用有没有问题?从上面介绍来看,好像问题很大,不管是任务过多的堆积,或是过多的创建线程。
1. newCachedThreadPool线程池
使用的是SynchronousQueue队列,offer方法直接把任务交个等待的线程,如果没有空闲线程挂起在队列上,那么直接返回false,从这里可以看出,当任务量急剧增大时,这可能导致线程无限增长,而线程创建是需要内存保存线程上下文信息的,可能会直接导致服务器内存不足,造成内存泄漏,同时线程对cup激烈的竞争,会导致服务器性能急剧下降。
2. newFixedThreadPool和newSingleThreadExecutor线程池
线程最大数量固定,通过LinkedBlockingQueue(容量很大的阻塞队列)来缓冲不能处理的任务,能够很好的保证合适数量的线程处于忙碌状态,但是同样会使得任务过多的堆积,导致响应能力下降。
从上分析来看,貌似并不能直接的使用工厂提供的线程池。如果自己去实例化ThreadPoolExecutor会不会更好呢?那么就可以根据使用场景,更精细的控制线程数量和队列长度,从而达到性能更优。
个性化定制
通过有界队列、线程数量和空闲死亡时间来控制线程池,能够构造具有很好伸缩性的线程池,但是在系统资源使用和响应能力之间进行平衡,进行调优,却是件非常困难的事情。
- 大容量队列,小数量线程,虽然可以减少cup竞争、上下文切换,但过少的线程,可能会造成吞吐量的下降;
- 小容量队列,大数量线程,会使得cup竞争激烈,上线文频繁切换,也会使吞吐量下降。
参数的确定非常的困难,需要根据具体的场景,参考生产速度、任务处理速度、任务能够容忍的延迟执行时间,进行合理的调整。
怎么关闭线程池
JVM只要有非后台线程运行,那么就不会关闭。所以停机时需要关闭每个线程,如何实现呢?。
ThreadPoolExecutor提供了2个方法:
方法 | 介绍 |
---|---|
shutdown() | 平缓的关闭线程池 |
shutdownNow() | 暴力关闭线程池 |
何为平缓,何为暴力?分析前我们列举下线程池状态,以及代表的意思(括号中的数字就可以认为就是状态的常量值):
状态 | 介绍 |
---|---|
RUNNING(-1) | 运行状态:接受新的任务或者运行队列中等待的任务 |
SHUTDOWN(0) | 关闭状态:不接受新的任务但运行队列中等待的任务 |
STOP(1) | 暴力关闭状态:不接受新任务且不执行队列中任,设置所有线程中断状态 |
TIDYING(2) | 当所有的任务都完成,所有的线程都消亡状态 |
TERMINATED(3) | 线程池完全关闭状态 |
先看看shutdown()的源码:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //设置线程池状态
interruptIdleWorkers(); //中断所有线程
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate(); //尝试终止线程池
}
可以看到,shutdown方法主要是通过中断线程来关闭线程池的。
什么时候线程会响应中断?就是挂起在队列上的take或者poll操作,当队列中有任务时,会继续执行,只有队列中没有任务时,线程挂起响应中断,跳出循环,也就代表线程死亡。
可以总结,shutdown方法关闭的流程是:拒绝新任务加入–>执行完成所有剩余的任务–>关闭线程池
现在来看看 shutdownNow()的源码:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); //线程池状态为stop
interruptWorkers();
tasks = drainQueue(); //排干所有队列中等待的任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
对比shutdown,可以看到除了状态变化为stop外,还多一个步骤,排干所有队列中的任务,让所有线程取不到任务,响应中断,从而快速地关闭线程池。
最后,如果需要异步执行任务,仅仅使用线程池是不够保险的,任务的状态,任务的执行情况,无法达到可视化,系统就很容易失控。
我最近在写一个带有可视化页面的分布式的异步任务执行组件,有兴趣的可以留言。]