线程池那些事之ThreadPoolExecutor
前言
我们都知道,线程是应用中宝贵的资源,创建一个线程需要很大的损耗,所以最好能够复用线程。因此我们一般都通过来线程池来维护管理线程。
ThreadPoolExecutor是JDK提供的线程池实现,我们在开发过程中最常使用到的就是这个。
使用
创建
首先我们要创建线程池,可以通过ThreadPoolExecutor的构造函数或者java提供的工具类Executors来创建
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,100,20, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<>(100));
ExecutorService executorService= Executors.newCachedThreadPool();
在Spring中也可以使用ThreadPoolTaskExecutor这个类来配置线程池,底层还是用的是java的ThreadPoolExecutor,但是能够作为bean放到ioc容器中去,也能spring其他组件适配。
使用
1.提交异步任务
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("test");
}
});
2.提交异步任务并且进行跟踪执行状态
我们可以使用submit方法提交任务,这个方法返回的Future对象可以用于跟踪任务执行状态
Future future = executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("hello");
}
});
while (future.isDone()){
System.out.println("world");
}
解析
分析ThreadPoolExecutor的源码主要是想知道线程池底层是如何处理我们提交的的Runnable,以及它的生命周期和其他细节。
线程池初始化参数
在创建线程池的时候,我们会通过构造函数配置线程池的配置参数,了解这些参数的实际含义是十分重要的
参数 | 含义 |
---|---|
corePoolSize | 线程池核心工作线程的数量 |
maximumPoolSize | 线程池工作线程最大值 |
keepAliveTime | 工作线程的存活时间 |
unit | keepAliveTime的单位 |
workQueue | 放置任务的队列 |
threadFactory | 线程工厂 |
handler | 处理任务溢出的异常处理器 |
上面的介绍相对简单,实际上的作用不是这么几个字能描述的,想了解它们的作用往下看吧。
线程池生命周期
线程池运行状态 | 描述 |
---|---|
RUNNING | 线程池初始状态,这个状态下可以接受任务并且调度任务执行 |
SHUTDOWN | 这个状态会拒绝接受新任务,但是会执行完已经加入的任务 |
STOP | 不接受新任务,并且也不执行已经加入的任务,会中断执行中的任务 |
TIDYING | 当所有任务执行结束,并且工作线程为0的时候会进入这个状态,会运行terminated方法 |
TERMINATED | terminated方法运行结束的时候进入这个状态 |
一般的话,如果不执行shutdown或者shutdownNow方法是不会进入TIDYING状态的,因为核心工作线程都还存活(cachedThreadPool除外,因为coreSize=0)。
TIDYING和TERMINATED状态切换是连续的,在tryTerminate方法内执行
所以一般的状态转换为
RUNNING->SHUTDOWN/STOP->TIDYING->TERMINATED
RUNNING->SHUTDOWN->STOP->TIDYING->TERMINATED
配置了allowCoreThreadTimeOut=true后,核心线程也能被回收。但是不是说核心线程数为0了,就会进入TIDYING状态。必须由SHUTDOWN或STOP触发。
工作线程组机制
在线程池中会维护一组工作线程来处理我们的任务。
提交任务
我们先从提交任务的execute方法入手。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.如果当前工作线程数小于配置的核心线程池大小
//直接创建一个工作线程来执行对应command任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.如果当前工作线程数大于等于配置的核心线程池大小
//把任务放到workQueue中去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池状态,如果不是RUNNING,
//那么从workQueue移除这个任务,并且reject
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.如果上一步的workQueue加入不了,那么增加工作线程
else if (!addWorker(command, false))
//如果工作线程增加失败,那么reject
reject(command);
}
代码中的注释讲的很明白了,reject的话会通过我们配置的handler做对应处理。
工作线程运行机制
我们先看下工作线程实现类Worker
Worker继承了AbstractQueuedSynchronizer,在执行任务的时候会将自己锁住,因此就可以调用work的tryLock方法来判断当前worker是否正在执行任务。
同时实现了Runnable接口,因为工作线程也是线程。
直接看Worker的Run方法
public void run() {
runWorker(this);
}
委托给了外部类的runWorker方法,为什么放到外部类,因为有很多配置都在外部类,比如keepAliveTime
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask用于从workQueue获取任务
//这边是一个循环,会不断的拿任务,如果每次都拿得到,那么不断循环,一旦拿不到,那么这个线程就会死掉。
//拿不拿得到取决于keepAliveTime的coreSize的配置
while (task != null || (task = getTask()) != null) {
//拿到任务后,执行前,会先把自己锁住
w.lock();
//判断线程池是不是STOP状态,如果是STOP状态的话,会对当前线程进行中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务执行前的hook
beforeExecute(wt, task);
Throwable thrown = null;
try {
//实际执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后的hook
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
//解锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程结束时做的一些工作
processWorkerExit(w, completedAbruptly);
}
}
worker会不断的从workQueue获取任务,一旦获取不到,那么就会退出while循环,调用processWorkerExit做这个工作线程退出的相关工作。
run方法运行到结束,也就代表这个线程工作完成,要被回收了。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//汇总工作线程完成的任务数
completedTaskCount += w.completedTasks;
//从workers中移除当前工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
//判断是否能进入terminate状态
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//completedAbruptly用来判断是否是异常退出
//如果是异常退出的话,跳过下面这个逻辑,直接补充增加一个worker
//如果是正常退出,判断当前工作线程数是否小于corePoolSize,小于则增加worker,注意allowCoreThreadTimeOut=true的情况,会忽略corePoolSize
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果当前工作线程数大于min,那么不增加工作线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit主要对结束的工作线程的数据进行收集,以及判断是否需要再添加一个工作线程。
接下来我们看下getTask的逻辑,看下它在什么情况下会返回null,让这个线程结束
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果线程池状态为SHUTDOWN并且workqueue为空
//或者线程池状态为STOP
//那么返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//判断当前获取任务时是否需要设置超时,超时获取不到任务会返回null
//allowCoreThreadTimeOut=true强制每个工作线程都会超时
//当wc(当前工作线程数) > corePoolSize也会开启超时获取
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//在工作线程数大于maximumPoolSize或者已经获取超时的情况下,返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
//cas失败进行重试
continue;
}
//下面的poll是代超时的获取任务
//take不会超时,一直阻塞直到获取任务
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//如果任务不为空,直接返回
return r;
//如果在规定时间内没获取到任务,那么设置timedOut = true;
timedOut = true;
} catch (InterruptedException retry) {
//线程中断重试??
timedOut = false;
}
}
}
在getTask方法通过超时获取返回null的方式动态调节了线程池工作线程的数量。
在processWorkerExit中可以注意到有一个tryTerminate方法,用于判断当前线程池是否需要进行terminate相关工作
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//线程池状态为RUNNING,TIDYING或者TERMINATED时
//或者状态为SHUTDOWN并且workQueue不为空时
//return,不执行后续操作
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//在上次if的过滤后,执行到这边的线程池状态为
//SHUTDOWN并且workQueue为空
//STOP状态
//在上面两个条件下,直接中断这个工作线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//执行到这边的话,说明就是工作线程数为0,并且workqueue也为空
//可以先将状态切换为TIDYING
//并且执行terminated方法
//最后将状态设置为TERMINATED,并且通知等待在termination的线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate会在线程池为STOP或者SHUTDOWN状态并且workqueue为空的时候才会生效。其他逻辑看源码注释。
增加工作线程
在熟悉了工作线程的运行机制后,我们来看下addWorker的逻辑。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 只有线程池状态为RUNNING
//或状态为SHUTDOWN,workQueue不为空,并且不是由于execute调用该方法的时候(注意firstTask == null)
//才能增加工作线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//使用自旋+cas 修改当前线程数
for (;;) {
int wc = workerCountOf(c);
//当前线程数wc超过限制,那么返回false,停止创建工作线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//下面是实际创建工作线程的逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//构造worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//只有RUNNING状态或者SHUTDOWN并且firstTask为空的情况下才能增加工作线程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果Worker内的线程已经启动,那么报错
if (t.isAlive())
throw new IllegalThreadStateException();
//将创建的工作线程添加到workers集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动Worker内的工作线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//对添加失败进行处理
addWorkerFailed(w);
}
return workerStarted;
}
有一点很奇妙,线程是在Worker的构造函数就创建了,然后在外部进行启动
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到newThread传入了worker本身。
销毁逻辑
主要是shutdown 和 shutdownNow两个方法。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
//shutdown的钩子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//判断是否可以进行terminate相关工作
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//线程池状态更新为STOP
advanceRunState(STOP);
//中断所有工作线程
interruptWorkers();
//获取所有workqueue中的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//判断是否可以进行terminate相关工作
tryTerminate();
//返回这些未执行的任务
return tasks;
}
interruptIdleWorkers和interruptWorkers读者自行查看。
从源码中可以看到shutdown和shutdownNow的主要区别是,shutdown会等所有任务执行完毕后才terminate,shutdownNow是直接强制中断所有工作线程,并且返回那些workqueue中没有被处理的任务。
最后
本文只讲解了execute这种提交异步任务的方式,在某些场景下,我们需要关注或者等待异步任务的执行,所以就需要调用submit得到Future这种方式处理,请听下回分解。