ThreadPoolExecutor(2) —— 线程池源码分析
一、前言
分析源码就像旦总的生活,朴实无华且枯燥。
—— 若琳
分析源码虽然是枯燥的,但是我们带着一颗求知的好奇心,是不会感到特别枯燥。就像我们去慢慢挖掘底层的真相,会别有一番收获,别有一番欣喜。
二、ThreadPoolExecutor 的 execute
在向线程池提交任务时,会通过两个方法:execute和submit。
本文着重讲解execute方法。submit方法放在下次和Future、Callable一起分析。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt记录着runState和workerCount
int c = ctl.get();
//workerCountOf方法取出低29位的值,表示当前活动的线程数
//然后拿线程数和 核心线程数做比较
if (workerCountOf(c) < corePoolSize) {
// 如果活动线程数<核心线程数,则添加一个Worker
//addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
if (addWorker(command, true))
// 如果成功则返回
return;
// 如果失败则重新获取 runState和 workerCount
c = ctl.get();
}
// 如果当前线程池是运行状态并且任务添加到队列成功
// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取 runState和 workerCount
int recheck = ctl.get();
// 如果不是运行状态并且移除任务失败
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//第一个参数为null,表示在线程池中创建一个线程,但不去启动
// 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
addWorker(null, false);
}
//再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
else if (!addWorker(command, false))
//如果失败则拒绝该任务
reject(command);
}
总结一下它的工作流程:
- 当workerCount < corePoolSize,创建线程执行任务。
- 当workerCount >= corePoolSize&&阻塞队列workQueue未满,把新的任务放入阻塞队列。
- 当workQueue已满,并且workerCount >= corePoolSize,并且workerCount < maximumPoolSize,创建线程执行任务。
- 当workQueue已满,workerCount >= maximumPoolSize,采取拒绝策略,默认拒绝策略是直接抛异常。
对照上一篇文章的图来理解:

三、addWorker方法
该方法的核心作用是 —— 主要工作是在线程池中创建一个新的线程并执行
参数定义:
firstTask: the task the new thread should run first (or null if none). (指定新增线程执行的第一个任务或者不执行任务)
core: if true use corePoolSize as bound, else maximumPoolSize.(core如果为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其他状态后读取新值))
3.1 源码
runWorker 方法可以分为两部分。
第一部分 —— 是线程池安全的(通过CAS保证线程安全)增加工作线程数
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
// (rs > SHUTDOWN) ||
// (rs == SHUTDOWN && firstTask != null) ||
// (rs == SHUTDOWN && workQueue.isEmpty())
// 1. 线程池状态大于SHUTDOWN时,直接返回false
// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层自旋
for (;;) {
int wc = workerCountOf(c);
// worker数量超过容量,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS的方式增加worker数量。
// 若增加成功,则直接跳出外层循环进入到第二部分
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变化,对外层循环进行自旋
if (runStateOf(c) != rs)
continue retry;
// 其他情况,直接内层循环进行自旋即可
// else CAS failed due to workerCount change; retry inner loop
}
}
// ...
}
第二部分 —— 是线程池安全的(通过ReetrantLock)创建并添加Worker实例,然后启动Worker去工作
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必须是串行的,因此需要加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 这儿需要重新检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker已经调用过了start()方法,则不再创建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker创建并添加到workers成功
workers.add(w);
// 更新`largestPoolSize`变量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动worker线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这里做一下插曲:Worker 我们可以把它简单理解为执行任务的工人,它本身也持有线程对象,我们之后再细讲它。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
3.2 addWorker 方法中的几个关键节点
3.2.1 线程池内是如何保存线程的
- 在 addWorker 方法第二部分的第 7 行代码,先是创建了一个 Worker,并把 Runnable 对象交给它。
- 如果状态条件正确,则在第二部分的第 26 行将当前 Worker 添加到线程池的线程集合中,也就是 workers(HashSet<Worker>)
- 成功添加 Worker 之后,在第二部分的第 38 行启动 Worker 中的 thread(为什么这里需要启动 Worker 中的 thread,后面的文章再讲)
3.2.2 for 循环
方法中有两个 for 循环,唯一能继续进行下去的出口就是第一部分的第 32 行,成功增加了线程池内工作线程数。
四、总结
这里大体写了一下线程池中上层调度的源码,有些地方写的不是特别详细,接下来会慢慢进行完善。
如果有哪些地方理解的不对,或者缺漏,还望留言,大家一起交流,一起学习,一起进步。
参考:
https://www.jianshu.com/p/389b58856894
https://www.cnblogs.com/javazhiyin/p/10605511.html