死磕源码系列 - ThreadPoolExecutor
本文是本人对线程池最常用的一种实现ThreadPoolExecutor
的源码的一些理解, 边看边写, 不定时更新.
一、线程池的状态控制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 线程池是用一个
AtomicInteger
类型的变量ctl
来控制其数量与状态 - 高三位为状态, 第一位是符号位哦
- 低二十九位为线程池中活跃线程数量, 最大值为
2^29 - 1
下面列出了左移二十九位后的高三位的值及其状态说明, 注意第一位是符号位.
// 111
// 接受新任务并且处理阻塞队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000
// 拒绝新任务但是处理阻塞队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001
// 拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
// 010
// 所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
private static final int TIDYING = 2 << COUNT_BITS;
// 011
// 终止状态。terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
那么如何通过一个变量(ctl
)控制两个变量(workerCount
/runState
)呢?
-
ctlOf
方法
/**
* rs 线程状态,即上面的五个常量
* wc 活跃线程数
* @return 控制位
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
-
rs
只影响高三位,后二十九位全0 -
wc
只影响低二十九位,高三位全0 - 二者相或,结果就是高位取
rs
,低位取wc
runStateOf
private static int runStateOf(int c) { return c & ~CAPACITY; }
- 参数为控制位
ctl
-
~
运算符是连符号位一起取反后以补码取原码
没有溢出的情况下。
实际也不可能溢出, 由于 -0 是负数的最小值, 负数的最小值的绝对值比最大正数大一
可以参考 [-128, 127]
0001 1111 CAPACITY 原始值 高三位全0 (低位全1, 只写了高八位)
1010 0000 `~CAPACITY`运算后值 (这里就变成负数了)
1110 0000 CAPACITY 运算值 (补码, 计算机都是补码参与运算)
-
~CAPACITY
参与运算是高三位全1、低位全0, 这样相与就把控制位的高三位取出来了, 也就是状态位.
workerCountOf
private static int workerCountOf(int c) { return c & CAPACITY; }
- 与
runStateOf
类似,这里只提一下不一致的地方
0001 1111 CAPACITY 原始值(低位全1, 只写了高八位)
0001 1111 CAPACITY 运算值(正数的 原码/反码/补码 一致)
- 所以取的是低二十九位的值, 也就是
workerCount
二、ThreadPoolExecutor
的构造函数
这里就只说一个最终的构造函数, 其他的参数个数少于7
个的都是调用此完整版
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
一个一个来看下:
名称 | 类型 | 描述 |
---|---|---|
corePoolSize |
int |
核心线程数, 活跃线程数先达到的数量 |
maximumPoolSize |
int |
最大线程数, 活跃线程数的最大值, 为核心线程+非核心线程数(队列中的任务不属于此类) |
keepAliveTime |
long |
超时时间, max - core 的这部分活跃线程最大等待时间. 还有一个参数 allowCoreThreadTimeOut , 如果为true, 则适用于全部活跃线程(也就是哪怕是核心线程core 如果没有任务处理也会被close ) |
unit |
TimeUnit |
keepAliveTime 的单位, 是一个枚举值 |
workQueue |
BlockingQueue<Runnable> |
当core 满了以后, 再加进来的任务会进入此队列等待. 要求是一个实现了BlockingQueue 接口的类, 自然的, 塞进去的任务要求实现了Runnable 接口 |
threadFactory |
ThreadFactory |
线程工厂类, 要求实现ThreadFactory 接口, 重写newThread 方法 |
handler |
RejectedExecutionHandler |
任务拒绝策略, 当阻塞队列已满且活跃线程达到max 之后加入进来的任务会执行拒绝策略, 需要实现RejectedExecutionHandler , 重写rejectedExecution 方法 |
举个例子:
ThreadFactory
public class HyThreadFactory implements ThreadFactory{
// 线程名称前缀
private String namePrefix;
// 线程优先级
private int priority;
// 守护线程
private boolean isDaemon;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
public HyThreadFactory(String namePrefix){
this(namePrefix, Thread.NORM_PRIORITY, false);
}
private HyThreadFactory(String namePrefix, int priority, boolean isDaemon){
this.namePrefix = poolNumber.getAndIncrement() + "-" +namePrefix;
this.priority = priority;
this.isDaemon = isDaemon;
}
@Override
public Thread newThread(Runnable r){
Thread th = new Thread(r, namePrefix + threadNumber.getAndIncrement());
th.setDaemon(isDaemon);
th.setPriority(priority);
return th;
}
}
RejectedExecutionHandler
public class ThreadRejection implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
System.out.println(r.toString() + " I am rejected...");
}
}
- 构造
ExecutorService es = new ThreadPoolExecutor(
10,
10,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10 << 7),
new HyThreadFactory("hy-thread-"),
new ThreadRejection());
-
core
= 10 -
max
= 10 - 重写了线程工厂, 目的是自定义线程名, 查堆栈时方便知道哪些线程是我们需要的
- 重写了拒绝策略, 其实
JDK
有提供四个内部类AbortPolicy
/DiscardPolicy
/DiscardOldestPolicy
/CallerRunsPolicy
这里贴一下默认策略AbortPolicy
的实现, 其他几个可以去ThreadPoolExecutor
源码中看看
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
说到这里, 讲讲我对线程池中任务
和线程
的理解
-
任务
, 线程池只是一个维护一定数量线程
的池子,线程
是用来干什么的? 用来执行任务的. (阻塞队列中的也是任务
) -
任务
要求是一个实现了Runnable
接口的类, 所以任务
其实也是一种线程, 这个任务
跟线程
有什么关系? 继续看~
问题: 请问一个线程池构造成功后, 其所能同时容纳的最大任务数量是多少?
三、线程池任务执行流程 -- 啃源码~
先看下继承结构
![](https://img.haomeiwen.com/i15085536/cdcb13790671bbae.png)
这一串
类
/接口
中, 我们只拎出来接下来要讲的执行过程中涉及到的方法,
-
Execuotor
接口
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
这个顶级接口中只有一个抽象的任务执行方法
-
ExecutorService
接口
在Executor
接口的execute
方法上扩展了很多方法, 这些方法都是对线程池的操作, 以便更好地管理线程的执行
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
我们通常都是基于此接口规范进行线程池的处理, 比如下面的主角ThreadPoolExecutor
.
-
AbstractExecutorService
抽象类
ExecutorService
的一个实现类, 实现了几个重要的方法比如submit
/doInvokeAny
/invokeAll
, 这里就看一下最常用的一种submit
方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit
方法就做了三件事
- 将传入线程
Runnable
转为RunnableFuture
对象 - 将此对象作为入参调用
execute
方法 - 返回此对象
RunnableFuture
是一个同时实现了Runnable
和Future
(跟Execute
类似的顶级接口)的接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
submit
与execute
两个方法的区别你知道几条?
-
ThreadPoolExecutor
类
这个类就是本文的主角了, 也是execute
的具体实现类, 先贴下这个方法的源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 可以看到这个方法中如果把
addWorker
方法抽出来待会说, 那就很清晰了. - 一个新的任务加入线程池, 线程池分三种情况处理
- 当前线程池中活跃线程数是否小于
core
- 未达到
core
, 尝试调用addWorker(command, true)
方法, 失败则重新获取控制位继续
- 判断当前线程池是否在运行状态, 是则入阻塞队列并做二次check
- check通过, 判断当前活跃线程数, 若为0则调用
addWorker(null, false)
方法 - check没过, 从队列移除此任务并执行拒绝策略
- 线程池不是运行状态或入队失败, 尝试调用
addWorker(command, false)
方法, 失败则执行拒绝策略.
- 当前线程池中活跃线程数是否小于
这三个
addWorker
方法的参数都不一样, 后面马上说, 这里你心里有个数就行
总结
- 先创建
core
数量的核心线程, -
core
满了加入阻塞队列 - 队列满了再由
core
延伸到max
继续创建非核心线程 - 活跃线程数达到了
max
, 拒绝此任务
所以一个创建好的线程池最大同时容纳任务数是
max
+workQueue.size()
(一个线程同时只能处理一个任务哦~)
好了, 现在的问题就剩addWorker
方法了, 这个方法我们分两步来说:
/**
* firstTask 用于执行的第一个任务, 可以为null
* core 此次添加的是否核心线程
*/
private boolean addWorker(Runnable firstTask, boolean core){...}
- 第一步, 判断线程池状态,是否满足加入条件
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
这一步由1、2两层循环组成,分别看下这两层循环做了什么
- 获取控制位,拿到运行状态
- 判断是否允许入库, 上面这个判断我换了一种写法
rs >= SHUTDOWN &&
( rs != SHUTDOWN || firstTask != null || workQueue.isEmpty() )
很明显了, 不执行此方法的情况有三种
(1) rs
= STOP
/TIDYING
/TERMINATED
(2) rs
= SHUTDOWN
,firstTask
不为null
(3) rs
= SHUTDOWN
,firstTask
为null
,阻塞队列workQueue
中没有任务待执行
还记得SHUTDOWN
的意思吗, 回顾一下[允悲]
拒绝新任务但是处理阻塞队列里的任务
下面对应着来尝试解释下上面三种情况:
(1) 除了RUNNING
之外的四种状态, 是不会允许添加新任务的
(2) firstTask
为null
时的addWorker
方法会做什么事? 等会详细说, 这里先给出答案, 会创建一个Worker线程去处理阻塞队列中的任务(核心与否取决于 另一个参数core
). 所以不为null
的firstTask
是不会被SHUTDOWN
状态的线程池接受的
(3) SHUTDOWN
状态的线程池还是要处理队列中的任务, 所以如果队列为空也就不用再创建Worker线程去处理了
firstTask
为null
的addWorker
方法创建的是Worker
线程而不是添加新的任务进入线程池, 目的是处理队列中的任务
第二层循环, 修改活跃线程数量
(1) 拿到活跃线程数量, 判断是否超过最大值
(2) 没有超过则CAS
更改活跃线程数量(+1
),
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
成功则跳出所有循环, 进入第二步
(3) CAS
失败, 重新获取ctl
, 判断状态位与开始是否一致
- 一致继续内层循环, 也就是重试CAS
- 不一致需要重新判断当前任务是否满足加入条件, 外层循环
- 第二步, 添加
Worker
线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
- 使用
firstTask
初始化Worker
得到w
, 拿到其内部线程t
, 这个t
是以Worker
本身构造的 - 获取
ReentrantLock
锁, 然后做两件事
- 线程池状态 +
firstTask
判断是否继续执行 -
t
没有启动则将w
加入workers
, 同时更新largestPoolSize
, 启动t
-
t
已经启动抛出异常, 传入了一个已经启动的线程
四、Worker
类解析
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
- 实现了
Runnable
接口, 是一个常规意义上的线程类 - 继承了
AbstractQueuedSynchronizer
, 俗称AQS
, 这个东西就厉害了, 讲Worker
主要就是为了讲下AQS
怎么玩的
- 先看看字段
** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
(1) thread
-
t
是以w
为参数构建出来的线程 -
addWorker
时启动的是t
- 启动顺序
`getTask`
`t` > `w` > ---- > `task`
(2) firstTask
- 传入时, 如果还没有达到核心线程数, 那就没必要先入队再
马上
出队, 可以直接得到运行 - 循环判断条件是
null != `firstTask` || null != `getTask`
- 构造函数
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
- 第一步 禁止中断,
Worker
里面的AQS
操作只有0
跟1
之间的转换, 那么设置成其他的值,CAS
都会失效, 而此种情况下的中断都会阻塞(或者说失败? 这个点后面再看看) - 第二步 设置
firstTask
的值, 允许为空 - 第三步 使用当前
Worker
构造一个线程出来, 给线程池使用
-
Runnable
部分
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
-
w.unlock
中做的事就是将state
置0
(还记得前面构造中将state置为-1
了吗?), 这一步是为了允许中断. - 循环条件就是取当前任务或队列中任务处理, 直到为空
- 判断是否需要中断当前线程
If pool is stopping, ensure thread is interrupted;
如果线程不在RUNNABLE状态, 确保线程中断
if not, ensure thread is not interrupted.
如果不在, 确保线程不会被中断
This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
第二种情况需要做二次check 以防止在清除中断标识时调用shutdownNow方法
-
接下来启动获取到的任务
task
, 这里还有俩方法beforeExecute
和afterExecute
等待后人重写~~ -
任务置null方便GC, 当前
Worker
处理任务数自增, 解锁后继续获取下一个任务处理 -
有一个点挺有意思的,如何启动一个线程? 所有人都告诉我们要调用线程的
start()
方法, 这里调用的却是run()
方法. 其实很简单, 如果这里调用start()
方法, 那么你设置的一堆core
/max
参数就失去了意义. (每一个start()
都会另起一个线程.) -
这里有一个变量要说下
completedAbruptly
, 这个变量翻译是突然完成. 初始值是true
,什么情况下会发生变化?
task
是用户自定义的线程任务, 其run
方法中可能会抛出非受检异常, 代码捕获到此异常后又立马往外抛出了, 这时就会跳过这个赋值 (可以多看两遍代码层次)
completedAbruptly = false;
这时completedAbruptly
就是true
了. 接着往下看其调用.
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
-
我们先看公共部分, 即中间处理worker线程的部分, 这部分很清晰
(1) 获取重入锁
(2) 将当前worker
处理的任务数量加到总的任务数上
(3) 将当前worker
从worker
集合中移除
(4) 释放重入锁
(5) 尝试终止线程池(每个Worker
结束时都会尝试关闭线程池) -
分情况来看下
completedAbruptly
为true
和false
的情况下增加的操作
(1)completedAbruptly
为false
, 线程正常结束(队列空了)
1> 判断状态为RUNNABLE
<1>allowCoreThreadTimeOut
为true
且队列为空, 直接结束
<2>allowCoreThreadTimeOut
为true
且队列不为空, 至少保留一个线程
<3>allowCoreThreadTimeOut
为false
, 若当前活跃线程数小于core
则增加一个max
类型的Worker
, 否则退出
2> 否则结束
(2)completedAbruptly
为false
, 线程非正常退出(task.run()
发生异常)
1>wc
自减1
(If abrupt, then workerCount wasn't adjusted
)
2> 判断状态为RUNNABLE
, 增加一个max
的Worker
3> 否则退出
ThreadPoolExecutor 如何保证内部始终存活 core 数量的核心线程?
- 意外退出的情况,
wc
自减, 直接新增一个max
类型的线程, 因为不知道此时线程池的活跃线程数是否超过core
, 而要保证挂了一个起一个, 所以最好传一个max
类型的.(firstTask
为null
是因为没有任务给它传~~) - 正常结束, 此时队空状态, 不动
wc
, 因为这种情况在getTask
方法中就已经将wc
减一了, 然后判断wc
是否大于core
, 大于则直接退出此Worker, 否则还要尝试新加一个max
的线程以确保core
数量的核心线程存活.
-
getTask
方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 循环获取任务
-
SHUTDOWN
且队空或STOP
以上状态,wc
减一, 返回null. 这个很好理解,SHUTDOWN
状态下还需要处理队列任务, 而STOP
以上则直接不用处理了. - 判断是否需要超时控制,
wc
使用CAS
减一, 失败继续循环, 成功返回null.
getTask
返回null
时都会将wc
减一, 对应processWorkerExit
中的处理逻辑
- 尝试从队列获取任务, 需要超时使用
keepAliveTime
时间的超时poll
方法, 否则直接使用阻塞的take
方法.
两个方法的区别取决于具体的阻塞队列, 这里我想说的是, 线程池如何保证
core
数量的核心线程在活跃中呢? 如果允许核心线程超时那自然不用保证, 一般都是不允许也就是默认的allowCoreThreadTimeOut = false
, 这种情况下timed
的值取决于wc > corePoolSize
, 那很明显了, 如果大于, 说明当前wc
已经是max
级别了, 不需要阻塞执行, 带超时时间去取任务就好; 如果是小于, 那么wc
就是core
级别, 所有的worker
都是核心线程, 需要阻塞执行以确保core
数量的核心线程维持活跃(也就是不结束).
五、线程池的其他方法
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
这俩方法放一块看看
- 首先就是区别, 也就是
interruptIdleWorkers
方法与interruptWorkers
方法的区别,idle
是空闲的意思, 分别来看看源码, 都比较简单
interruptWorkers
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
-
shutDownNow
方法会直接中断当前所有工作线程(Worker
), 并且调用drainQueue()
返回等待任务列表 -
shutdown
方法会中断当前空闲线程, 而且会处理完队列中任务
tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 在所有对线程池有
debuff
操作的时候, 都会尝试终止线程池, 即调用tryTerminate
方法 - 三个条件下直接返回
- 运行中
- 已经终止
-
SHUTDOWN
且队不空
-
wc
不为0, 调用一次关闭空闲线程, 退出 - 否则开始终止线程池, 这里看到状态变化, 先为
TIDYING
, 再到TERMINATED
,CAS
失败继续循环.
感觉写的乱七八糟的, 自己做做笔记, 后面有新思路再来优化.
拓展1
线程池的关闭
在ThreadPoolExecutor
的开头注释中就写了
<dd>A pool that is no longer referenced in a program <em>AND</em>
has no remaining threads will be {@code shutdown} automatically. If
you would like to ensure that unreferenced pools are reclaimed even
if users forget to call {@link #shutdown}, then you must arrange
that unused threads eventually die, by setting appropriate
keep-alive times, using a lower bound of zero core threads and/or
setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
这段注释告诉我们关闭线程池的两个条件 (然后还有一些建议):
- 没有程序引用此线程池
- 线程池中没有存活的线程
我们需要做的就从第二点入手来关闭线程池, 如果一个fixed
线程池创建后使用完没有调用shutdown
方法会怎么样?
其core
数量的核心线程会一直在getTask()
方法中take()
而不会结束, 那么此线程池也不会自动终止, 这是原因.
官方给的建议是什么呢?
-
core
数据置为0
并设置keeAliveTime
allowCoreThreadTimeOut = true
建议每次用完还是显示调用
shutdown()
方法
2019-10-09更新
拓展2
聊聊Worker模式控制中断的一些想法
对线程池通常的理解是:线程池中接收到任务后启动线程,由线程去执行任务。
这句话说起来非常简单,但其实内部做了不少事,其中一点就是使用AQS来控制中断。
假设你已经理解了AQS的基本使用,那么有如下建议(或者说标准):
- 对AQS实现类的首次操作都需要尝试获取锁 (
tryLock
)
举个例子,比如要中断Worker
的线程
try (worker.tryLock()) {
Thread th = worker.thread;
th.interrupt();
}
记住这个例子。
而Worker
的构造函数中有一步操作是改变状态为-1
setState(-1); // inhibit interrupts until runWorker
在runWorker
之前禁止中断. 在runWorker
方法中有一步w.unLock()
操作将state
字段置为了0
.
这看起来好像是我把状态设置为-1
后你就中断不了我一样?
这里有个很重要的东西要搞明白:
中断线程跟你的AQS
状态没有一点关系
那么搞这么复杂还说要控制中断如何理解?
下面是我自己的一些想法,可能不对,但我目前只能这么理解。
在Thread
类的interrupt()
方法注释最后有一句是这么说的
Interrupting a thread that is not alive need not have any effect.
中断一个尚未激活的线程不会有任何效果.
- 什么叫尚未激活?
- 没有调用
start()
方法的都属于未激活,是的,哪怕你直接调用了run()
其状态仍为NEW
来捋一捋现在我们已经掌握的信息
- 中断线程跟你的
AQS
状态没有一点关系 - 对AQS实现类的首次操作都需要尝试获取锁
- 中断一个尚未激活的线程不会有任何效果
按照Worker
类逻辑来一遍,假如在Worker
线程创建后启动前被中断了会发生什么情况呢?
-
t1
时刻,Worker
线程创建好 (state = -1
) -
t2
时刻,尝试中断Worker
线程 (此时获取锁失败, 因为是CAS(0, 1)
而state = -1
) -
t3
时刻,获取锁失败,执行获取锁失败的逻辑 -
t4
时刻,runWorker()
执行,检查线程中断标识,无异常,正常执行。
那么如果Worker
构造函数中没有setState(-1)
会怎么样?
-
t1
时刻,Worker
线程创建好 (state
还初始化的0
) -
t2
时刻,尝试中断Worker
线程(CAS(0, 1)
可以成功获取锁) -
t3
时刻,获取锁成功后,就可以调用interrupt()
方法来中断此线程了,然后呢?然后什么都不会发生 -
t4
时刻,runWorker()
执行,检查线程中断标识,无异常,正常执行。
结论:
- 如果没有
setState(-1)
就会有线程中断而线程无感知。 - 无感知跟无响应是两个概念,无响应是你可以主动忽略这个中断但你知道,而无感知是你啥都不知道。
感觉还有些地方是模糊的,还需要再想想。