【Java 并发笔记】线程池相关整理(下)
文前说明
作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。
本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。
2.3 ThreadPoolExecutor
ThreadPoolExecutor 结构
- ThreadPoolExecutor 继承了 AbstractExecutorService 类,并提供了四个构造器方法。
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
构造器参数
| 参数 | 说明 |
|---|---|
| corePoolSize | 核心线程数量。 |
| maximumPoolSize | 最大线程数量。 |
| allowCoreThreadTimeOut | 是否允许线程超时(设置为 true 时与 keepAliveTime,unit 一起起作用) |
| keepAliveTime | 线程存活时间 |
| unit | 单位 |
| workQueue | 存储任务的阻塞队列(缓存队列)。 |
| handler | 拒绝处理任务类(默认:AbortPolicy 会抛异常)。 |
| threadFactory | 线程工厂(默认:DefaultThreadFactory)。 |
corePoolSize
- 核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了
prestartAllCoreThreads()或者prestartCoreThread()方法预创建线程,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。- 默认情况下,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到阻塞队列(缓存队列)当中。
- 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程。
maximumPoolSize
- 线程池最大线程数,表示在线程池中最多能创建多少个线程。
- 当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize,当阻塞队列是无界队列, 则 maximumPoolSize 则不起作用,任务会一直持续地放入 workQueue。
- corePoolSize 和 maximumPoolSize 设置的边界自动调整池大小。
- corePoolSize < 运行的线程数 < maximumPoolSize。仅当队列满时才创建新线程。
- corePoolSize = 运行的线程数 = maximumPoolSize。创建固定大小的线程池。
线程池结构
keepAliveTime
- 表示线程没有任务执行时最多保持多久时间会终止。
- 默认情况下,当线程池中的线程数大于 corePoolSize,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。如果调用了
allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0。
- 默认情况下,当线程池中的线程数大于 corePoolSize,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。如果调用了
unit
- 参数 keepAliveTime 的时间单位,有 7 种取值。
| 单位 | 说明 |
|---|---|
| TimeUnit.DAYS | 天 |
| TimeUnit.HOURS | 小时 |
| TimeUnit.MINUTES | 分钟 |
| TimeUnit.SECONDS | 秒 |
| TimeUnit.MILLISECONDS | 毫秒 |
| TimeUnit.MICROSECONDS | 微妙 |
| TimeUnit.NANOSECONDS | 纳秒 |
workQueue
- 一个 阻塞队列,用来存储等待执行的任务。
- ArrayBlockingQueue 和 PriorityBlockingQueue 使用较少,一般使用LinkedBlockingQueue 和 SynchronousQueue。
- 线程池的排队策略与 BlockingQueue 有关。
| 阻塞队列 | 说明 |
|---|---|
| ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列。 |
| LinkedBlockingQueue | 一个由链表结构组成的有界阻塞队列。 |
| PriorityBlockingQueue | 一个支持优先级排序的无界阻塞队列。 |
| DelayQueue | 一个使用优先级队列实现的无界阻塞队列。 |
| SynchronousQueue | 一个不存储元素的阻塞队列。 |
| LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列。 |
| LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。 |
handle
- 定义处理被拒绝任务的策略,默认使用 ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出 RejectExecutorException 异常。
| 策略 | 说明 |
|---|---|
| ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出 RejectedExecutionException 异常。 |
| ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常。 |
| ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) |
| ThreadPoolExecutor.CallerRunsPolicy | 由调用线程处理该任务。 |
- 也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
threadFactory
- 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为 DefaultThreadFactory。
工作顺序
- corePoolSize --> 任务队列 --> maximumPoolSize --> 拒绝策略。
类的属性
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 存放工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 最大线程池容量
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器
private volatile RejectedExecutionHandler handler;
// 线程等待运行时间
private volatile long keepAliveTime;
// 是否运行核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;
// 默认拒绝执行处理器
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
//
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
ctl 变量
-
AtomicInteger 保证了对这个变量的操作是原子的,ThreadPoolExecutor 用这一个变量保存了两部分内容。
- 所有有效线程的数量。
- 线程池的状态(runState)。
- 低 29 位保存线程数,高 3 位存 runState。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//-00100000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;//00000000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;//00100000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;//01000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;//01100000000000000000000000000000
| 状态 | 10 进制值 | 二进制值 |
|---|---|---|
| RUNNING | -536870912 | -00100000000000000000000000000000 |
| SHUTDOWN | 0 | 0 |
| STOP | 536870912 | 00100000000000000000000000000000 |
| TIDYING | 1073741824 | 01000000000000000000000000000000 |
| TERMINATED | 1610612736 | 01100000000000000000000000000000 |
- CAPACITY 的值为 00011111111111111111111111111111。
- ~CAPACITY 的值为 11100000000000000000000000000000。
/**
* 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111
* ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
* 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
*
* @param c
* 该参数为存储runState和workerCount的int值
* @return runState的值
*/
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/**
* 这个方法用于取出workerCount的值
* 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
* 保留参数的低29位,也就是workerCount的值
*
* @param c
* ctl, 存储runState和workerCount的int值
* @return workerCount的值
*/
private static int workerCountOf(int c) {
return c & CAPACITY;
}
/**
* 将runState和workerCount存到同一个int中
* “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
*
* @param rs
* runState移位过后的值,负责填充返回值的高3位
* @param wc
* workerCount移位过后的值,负责填充返回值的低29位
* @return 两者或运算过后的值
*/
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
// 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
- 当创建线程池后,初始时线程池处于 RUNNING 状态。只有 RUNNING 状态会小于 0。
- 如果调用了
shutdown()方法,则线程池处于 SHUTDOWN 状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕。 - 如果调用了
shutdownNow()方法,则线程池处于 STOP 状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务。 - 当线程池处于 SHUTDOWN 或 STOP 状态,并且所有工作线程已经销毁,任务阻塞队列(缓存队列)已经清空或执行结束后,线程池被设置为 TERMINATED 状态。
线程池状态转换
2.3.1 Worker
- ThreadPoolExecutor 的核心内部类为 Worker,其对资源进行了复用,减少创建线程的开销,还有若干个策略类。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
- 对任务进行封装,所有 submit 的 Runnable 都被封装成了 Worker,本身也是一个Runnable, 利用 AQS 实现非重入互斥锁, 实现互斥锁的主要目的是为了中断的时候判断线程是在空闲还是运行。
// state只有0和1,互斥
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;// 成功获得锁
}
// 线程进入等待队列
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
- 不使用 ReentrantLock 是为了避免任务执行的代码中修改线程池的变量,例如
setCorePoolSize(),因为 ReentrantLock 是可重入的。 - Worker 主要是重写了 AQS 的相应函数(
tryAcquire()和tryRelease())和重写了 Runnable 的 run 函数(执行了runWorker()方法)。
execute 方法
-
execute()方法主要三个步骤:- 活动线程小于 corePoolSize 时创建新的线程。
- 活动线程大于 corePoolSize 时先加入到任务队列当中。
- 任务队列满了再启动新的线程,如果线程数达到最大值就拒绝任务。
-
execute()方法在 AbstractExecutorService 中没有实现,从 Executor 接口直到 ThreadPoolExecutor 才实现了该方法,ExecutorService 中的submit()、invokeAll()、invokeAny()都调用了execute()方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 活动线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize
if (addWorker(command, true))
// 添加成功返回
return;
c = ctl.get();
}
// 活动线程数 >= corePoolSize
// runState为RUNNING && 队列未满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// double check
// 非RUNNING状态 则从workQueue中移除任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);// 采用线程池指定的策略拒绝任务
// 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
else if (workerCountOf(recheck) == 0)
// 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);
// 两种情况:
// 1.非RUNNING状态拒绝新的任务
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
-
wonrkerCountOf()方法能够取得当前线程池中线程的总数。 -
之所以 double check 线程池的状态,是因为在多线程环境下,线程池的状态时刻在变化,而
ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。紧接着判断是否将 command 加入 workque 获取的是线程池之前的状态。- 倘若没有 double check,万一线程池处于非 RUNNING 状态(在多线程环境下很有可能发生),那么 command 将永远不会执行。
addWorker 方法
- addWorker 的主要任务是创建并启动线程。
- addWorker 共有四种传参方式。
-
addWorker(paramRunnable, true)线程数小于 corePoolSize 时,放一个需要处理的 task 进 workers Set。如果 workers Set 长度超过 corePoolSize,就返回 false。 -
addWorker(null, false)放入一个 null 的 task 进 workers Set,长度限制是 maximumPoolSize。- 这样一个 task 为空的 worker 在线程执行的时候会去任务队列里拿任务,相当于创建了一个新的线程,只是没有马上分配任务。
-
addWorker(paramRunnable, false)当等待队列被放满时,就尝试将这个新来的 task 直接放入 workers Set,而此时 workers Set 的长度限制是 maximumPoolSize。如果线程池满了的话就返回 false。 -
addWorker(null, true)放一个 null 的 task 进 workers Set,在小于 corePoolSize 时,如果此时 Set 中的数量已经达到 corePoolSize 就返回 false。实际使用中会在prestartAllCoreThreads()方法中调用,这个方法用来为线程池预先启动 corePoolSize 个 worker 等待从 workQueue 中获取任务执行。
-
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);// 当前线程池状态
// Check if queue empty only if necessary.
// 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
// workQueue.isEmpty())
// 满足下列调价则直接返回false,线程创建失败:
// rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束
// rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务
// rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,
// false),任务为null && 队列为空
// 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?
// 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务
// 但是此时队列不为空,那么还得创建线程把任务给执行完才行。
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
// 走到这的情形:
// 1.线程池状态为RUNNING
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// 原子操作递增workCount
break retry;// 操作成功跳出的重试的循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)// 如果线程池的状态发生变化则重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// wokerCount递增成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 并发的访问线程池workers对象必须加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
// RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新启动的线程添加到线程池中
workers.add(w);
// 更新largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
// 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
if (workerAdded) {
t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法
workerStarted = true;
}
}
} finally {
// 线程启动失败,则从wokers中移除w并递减wokerCount
if (!workerStarted)
// 递减wokerCount会触发tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}
-
addWorker方法的执行流程。- 判断线程池当前是否为可以添加 worker 线程的状态,可以则继续下一步,不可以则返回 false。
- 线程池状态 > SHUTDOWN,可能为 STOP、TIDYING、TERMINATED,不能添加 worker 线程。
- 线程池状态 == SHUTDOWN,firstTask 不为 null,不能添加 worker 线程,因为 SHUTDOWN 状态的线程池不接收新任务。
- 线程池状态 == SHUTDOWN,firstTask 为 null,workQueue 为空,不能添加 worker 线程,因为 firstTask 为空是为了添加一个没有任务的线程再从 workQueue 获取 task,而 workQueue 为空,说明添加无任务线程已经没有意义。
- 线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),超过则返回 false,没超过则对
workerCount + 1,继续下一步。 - 在线程池 ReentrantLock 保证下,向 workers Set 中添加新创建的 worker 实例,添加完成后解锁,并启动 worker 线程,如果都成功返回 true,如果添加入 worker Set 失败或启动失败,调用
addWorkerFailed()方法。
- 判断线程池当前是否为可以添加 worker 线程的状态,可以则继续下一步,不可以则返回 false。
runWorker 方法
- 任务添加成功后的
t.start()实际执行的是runWorker()这个方法。 -
runWorker()方法完成的事情。- 第一次启动会执行初始化传进来的任务 firstTask。
- 然后会从 workQueue 中取任务执行,如果队列为空则等待 keepAliveTime 时间。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// Worker的构造函数中抑制了线程中断setState(-1),所以这里需要unlock从而允许中断
w.unlock();
// 用于标识是否异常终止,finally中processWorkerExit的方法会有不同逻辑
// 为true的情况:1.执行任务抛出异常;2.被中断。
boolean completedAbruptly = true;
try {
// 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前可以插入一些处理,子类重载该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();// 执行用户任务
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 和beforeExecute一样,留给子类去重载
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 结束线程的一些清理工作
processWorkerExit(w, completedAbruptly);
}
}
getTask 方法
-
getTask()方法从阻塞队列中获取等待的任务。- allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁。倘若为 ture,在 keepAliveTime 内仍空闲则会被销毁。
-
timed == false,workQueue.take 任务,如果阻塞队列为空,当前线程会被挂起等待。当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行。 -
timed == true,workQueue.poll 任务,如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null。
-
- allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁。倘若为 ture,在 keepAliveTime 内仍空闲则会被销毁。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
// 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
// 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了,所以必须要decrement
// wokerCount
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 递减workerCount值
decrementWorkerCount();
return null;
}
// 标记从队列中取任务时是否设置超时时间
boolean timed; // Are workers subject to culling?
// 1.RUNING状态
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
// 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
// 2.allowCoreThreadTimeOut == false && wc >
// corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只要超过的线程才会
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
// 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
// poll出异常了重试
// 2.timeOut == true && timed ==
// false:看后面的代码workerQueue.poll超时时timeOut才为true,
// 并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true)
// 所以超时不会继续执行而是return null结束线程。(重点:线程是如何超时的???)
if (wc <= maximumPoolSize && !(timedOut && timed))
break;
// workerCount递减,结束当前thread
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
// 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 1.以指定的超时时间从队列中取任务
// 2.core thread没有超时
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;// 超时
} catch (InterruptedException retry) {
timedOut = false;// 线程被中断重试
}
}
}
- 此函数用于从 workerQueue 阻塞队列中获取 Runnable 对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。
- 在该函数中还会响应
shutDown()和shutDownNow()函数的操作,若检测到线程池处于 SHUTDOWN 或 STOP 状态,则会返回 null,而不再返回阻塞队列中的 Runnalbe 对象。
- 在该函数中还会响应
processWorkerExit 方法
- 线程退出会执行这个方法做一些清理工作,是在 worker 退出时调用到的钩子函数。
- 引起 worker 退出的主要因素有。
- 阻塞队列已经为空,即没有任务可以运行了。
- 调用了
shutDown()或shutDownNow()函数。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果被中断,则需要减少workCount // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 获取可重入锁
final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
// 将worker完成的任务添加到总的完成任务中
completedTaskCount += w.completedTasks;
// 从workers集合中移除该worker
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 尝试终止
tryTerminate();
// 获取线程池控制状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
min = 1;
if (workerCountOf(c) >= min) // workerCount大于等于min
// 直接返回
return; // replacement not needed
}
// 添加worker
addWorker(null, false);
}
}
- 此函数会根据是否中断了空闲线程来确定是否减少 workerCount 的值,并且将 worker 从 workers 集合中移除并且会尝试终止线程池。
tryTerminate 方法
-
processWorkerExit()方法中会尝试调用tryTerminate()来终止线程池。- 这个方法在任何可能导致线程池终止的动作后执行,比如减少 wokerCount 或 SHUTDOWN 状态下从队列中移除任务。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下状态直接返回:
// 1.线程池还处于RUNNING状态
// 2.SHUTDOWN状态但是任务队列非空
// 3.runState >= TIDYING 线程池已经停止了或在停止了
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 只能是以下情形会继续下面的逻辑:结束线程池。
// 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
// 2.STOP状态,当调用了shutdownNow方法
// workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
// 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
// ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 进入TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子类重载:一些资源清理工作
terminated();
} finally {
// TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 继续awaitTermination
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdown 和 shutdownNow 方法
-
shutdown()方法会将 runState 置为 SHUTDOWN,会终止 所有空闲 的线程。 -
shutdownNow()方法将 runState 置为 STOP。和shutdown()方法的区别在于这个方法会终止 所有 的线程。 -
shutdown()调用的是interruptIdleWorkers()方法,而shutdownNow()实际调用的是 Worker 类的interruptIfStarted()方法。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查shutdown权限
checkShutdownAccess();
// 设置线程池控制状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲worker
interruptIdleWorkers();
// 调用shutdown钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止
tryTerminate();
}
-
shutdown()会按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。- 首先会检查是否具有 shutdown 的权限,然后设置线程池的控制状态为 SHUTDOWN,之后中断空闲的 worker,最后尝试终止线程池。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 返回队列中还没有被执行的任务。
tasks = drainQueue();
}
finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
2.3.2 任务的提交
public class Test{
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
Future<String> future = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future result";
}
});
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
submit()任务,等待线程池execute()。- 执行 FutureTask 类的
get()方法时,会把主线程封装成 waitNode 结点并保存在 waiters 链表中, 并阻塞等待运行结果。 - FutureTask 任务执行完成后,通过 Unsafe 设置 waiters 相应的 waitNode 为 null,并通过 LockSupport 类 unpark 方法唤醒主线程。
- 执行 FutureTask 类的
- 实际业务场景中,Future 和 Callable 基本是成对出现的,Callable 负责产生结果, Future 负责获取结果。
- Callable 接口类似于 Runnable,只是 Runnable 没有返回值。
- Callable 任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即 Future 可以拿到异步执行任务各种结果。
-
Future.get()方法会导致主线程阻塞,直到 Callable 任务执行完成。
submit 方法
-
AbstractExecutorService.submit()实现了ExecutorService.submit()
可以获取执行完的返回值,而 ThreadPoolExecutor 是AbstractExecutorService.submit()的子类,所以submit()方法也是 ThreadPoolExecutor 的方法。
// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
......
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
- 通过
submit()方法提交的 Callable 任务会被封装成了一个 FutureTask 对象。 - 通过
Executor.execute()方法提交 FutureTask 到线程池中等待被执行,最终执行的是 FutureTask 的run()方法。
2.3.3 扩展线程池
-
ThreadPoolExecutor 是可以拓展的,它提供了几个可以在子类中改写的方法。
- beforeExecute
- afterExecute
- terimated
-
在执行任务的线程中将调用 beforeExecute 和 afterExecute,可以添加日志,计时,监视或统计收集的功能。
-
也可以用来输出有用的调试信息,帮助系统诊断故障。
3. 常用线程池
| 线程池名称 | 说明 |
|---|---|
| newCachedThreadPool | 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 |
| newFixedThreadPool | 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 |
| newScheduledThreadPool | 创建一个定长线程池,支持定时及周期性任务执行。 |
| newSingleThreadExecutor | 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行。 |
Executors
- Executors 利用工厂模式向我们提供了 4 种线程池实现方式。
- 但是并不推荐使用,Executors 创建线程池不会传入参数而使用默认值,而默认使用的参数会导致资源浪费,不可取。
- newFixedThreadPool 和 newSingleThreadExecutor 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
- newCachedThreadPool 和 newScheduledThreadPool 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
- Executors 的线程池如果不指定线程工厂会使用 Executors 中的 DefaultThreadFactory,默认线程池工厂创建的线程都是非守护线程。
- 使用自定义的线程工厂可以做很多事情,比如可以跟踪线程池在何时创建了多少线程,也可以自定义线程名称和优先级。如果将新建的线程都设置成守护线程,当主线程退出后,将会强制销毁线程池。
newSingleThreadExecutor
- 创建一个 单线程 的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
- 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
- 此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
- 阻塞队列使用的是 LinkedBlockingQueue,若有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。
- 按照 先入先出 的顺序执行任务。
- newSingleThreadExecutor 的执行流程。
- 线程池中没有线程时,新建一个线程执行任务。
- 有一个线程以后,将任务加入阻塞队列,不停增加。
- 唯一的这一个线程不停地去队列里取任务执行。
- SingleThreadExecutor 用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。
newFixedThreadPool
- 创建 固定大小 的线程池。
- 每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。
- 线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- 固定大小的线程池,可以指定线程池的大小,该线程池 corePoolSize 和 maximumPoolSize 相等,阻塞队列使用的是 LinkedBlockingQueue,大小为整数最大值。
- 该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。
- 对于固定大小的线程池,不存在线程数量的变化。
- 同时使用 无界 的 LinkedBlockingQueue 来存放执行的任务。
- 当任务提交十分频繁的时候,LinkedBlockingQueue 迅速增大,存在着耗尽系统资源的问题。
- 而且在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源,需要
shutdown()。 - newFixedThreadPool 线程池执行任务的流程。
- 线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务。
- 线程数等于核心线程数后,将任务加入阻塞队列,由于队列容量非常大,可以一直增加。
- 执行完任务的线程反复去队列中取任务执行。
- FixedThreadPool 用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。
newCachedThreadPool
- 创建一个可缓存的线程池。
- 如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
- 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
- 缓存线程池,缓存的线程默认存活 60 秒。
- 线程的核心池 corePoolSize 大小为 0,线程池最大为 Integer.MAX_VALUE,阻塞队列使用的是 SynchronousQueue 是一个直接提交的阻塞队列,总会迫使线程池增加新的线程去执行新的任务。
- 在没有任务执行时,当线程的空闲时间超过 keepAliveTime(60 秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。
- 如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
- newCachedThreadPool 的执行流程。
- 没有核心线程,直接向 SynchronousQueue 中提交任务。
- 如果有空闲线程,就去取出任务执行,如果没有空闲线程,就新建一个。
- 执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则终止。
- 由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
- CachedThreadPool 用于并发执行大量短期的小任务,或者是负载较轻的服务器。
newScheduledThreadPool
new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
- 定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。
- scheduleAtFixedRate 是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
- schedultWithFixedDelay 是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
- ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor, 最多线程数为 Integer.MAX_VALUE ,使用 DelayedWorkQueue 作为任务队列。
- ScheduledThreadPoolExecutor 的执行流程。
- 调用 scheduleAtFixedRate 或者 schedultWithFixedDelay 方法添加一个任务。
- 线程池中的线程从 DelayQueue 中取任务。
- 然后执行任务。
- ScheduledThreadPoolExecutor 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
4. 使用线程池注意
避免使用无界队列
- 尽量不要使用
Executors.newXXXThreadPool()快捷方法创建线程池,因为这种方式会使用无界的任务队列,为避免 OOM,应该使用 ThreadPoolExecutor 的构造方法手动指定队列的最大长度。
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), // 使用有界队列,避免OOM
new ThreadPoolExecutor.DiscardPolicy());
明确拒绝任务时的行为
- 线程池默认的拒绝行为是 AbortPolicy,抛出 RejectedExecutionHandler 异常,该异常是非受检异常,很容易忘记捕获。
- 如果不关心任务被拒绝的事件,可以将拒绝策略设置成 DiscardPolicy,多余的任务会悄悄的被忽略。
获取处理结果和异常
- 线程池的处理结果、以及处理过程中的异常都被包装到 Future 中,并在调用
Future.get()方法时获取,执行过程中的异常会被包装成 ExecutionException,submit()方法本身不会传递结果和任务执行过程中的异常。 - 获取执行结果的代码可以这样写。
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new RuntimeException("exception in call~");// 该异常会在调用Future.get()时传递给调用者
}
});
try {
Object result = future.get();
} catch (InterruptedException e) {
// interrupt
} catch (ExecutionException e) {
// exception in Callable.call()
e.printStackTrace();
}
性质不同的任务可用使用不同规模的线程池分开处理
- CPU 密集型。尽可能少的线程,Ncpu+1。
- IO 密集型。尽可能多的线程,Ncpu*2,比如数据库连接池。
- 混合型。如果 CPU 密集型的任务与 IO 密集型任务的执行时间差别较小,可以拆分为两个线程池。否则没有必要拆分。
参考资料
https://www.cnblogs.com/dolphin0520/p/3932921.html
https://blog.csdn.net/programmer_at/article/details/79799267#2-threadpoolexecutor
https://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html
https://www.cnblogs.com/superfj/p/7544971.html
https://blog.csdn.net/qq1137623160/article/details/79772505
https://blog.csdn.net/programmer_at/article/details/79799267#4-threadpoolexecutor%E6%BA%90%E7%A0%81
https://www.cnblogs.com/leesf456/p/5585627.html