ThreadPoolExecutor 解析
2019-04-11 本文已影响0人
Alex90
线程池相对于手动创建线程有以下优势:
- 减少创建销毁线程的额外消耗。
- CPU资源有限,创建的线程过多,有的任务不能及时响应。线程池可以提高响应速度。
- 有效管理线程。
ThreadPoolExecutor(JDK 1.8)
构造函数
线程池的构造函数和主要属性如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 任务阻塞队列(向线程池提交的任务在此排队)
private final BlockingQueue<Runnable> workQueue;
// Work缓存,线程池启动执行任务的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 任务拒绝策略,当不能接收线程时调用的处理方法
private volatile RejectedExecutionHandler handler;
// 如果的空闲线程(Worker)空闲时间超过 keepAliveTime
// 将被回收(从 workers 中移除,减少线程技术)。
// 大于 0 生效
private volatile long keepAliveTime;
// 核心线程数量,不会被回收的线程
private volatile int corePoolSize;
// 最大线程数量,
// maximumPoolSize = corePoolSize,为固定大小线程池
// maximumPoolSize = Integer.MAX_VALUE,可以无限制的加入任务
private volatile int maximumPoolSize;
// 构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 检查参数合法性,并初始化变量
}
}
RejectedExecutionHandler 提供了几种默认实现:
- AbortPolicy,抛出
RejectedExecutionException
,这是线程池中的默认实现 - CallerRunsPolicy,在调用者的线程中执行
run()
方法 - DiscardPolicy,不做任何事
- DiscardOldestPolicy,丢弃 workQueue 队头任务,提交新任务
提交任务
向线程池提交任务有两种方法 execute()
和 submit()
,接收的参数不同
// 使用 RunnableFuture 包装 task,调用 execute() 提交
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交一个 task
public void execute(Runnable command) {
/**
* 如果正在运行的线程数少于 corePoolSize,尝试启动新线程(Worker)
* 并且 task 作为 Worker 的第一个任务
*
* 如果一个任务可以成功地排队(进入 workQueue),再次检查线程池状态
* 判断是否需要回滚任务或启动新线程执行任务
*
* 如果一个任务不能成功排队(workQueue 的限制),再次尝试添加一个新的线程。
* 如果依然失败(线程池状态停止或执行线程数达到上限),执行拒绝任务的调用
*/
// 获取线程池中的线程数量
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 线程池中线程数量小于 corePoolSize,调用 addWorker 添加新线程执行任务。
// core = true,添加成功返回
if (addWorker(command, true))
return;
// 如果添加失败,重启获取正在运行线程数量(可能多线程原因导致失败)。
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// 如果线程池状态是 RUNNING,向 workQueue 添加任务,添加成功
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
// 再次检查线程数量后发现线程池已经关闭或者数量超出,回滚已经添加的任务(remove)
// 并且执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 可以调用 addWorker 添加新线程执行任务。
// core = false
addWorker(null, false);
}
else if (!addWorker(command, false))
// 再次添加线程,失败则调用拒绝策略。
reject(command);
}
addWorker 方法
// 尝试创建 Worker 线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 线程池状态
int c = ctl.get();
int rs = runStateOf(c);
for (;;) {
// 线程运行数量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 检查是否超过数量限制,达到数量限制不创建
return false;
// CAS操作更新 c+1
if (compareAndIncrementWorkerCount(c))
// 更新成功,跳出外层循环
break retry;
c = ctl.get();
// 更新失败,重新判断状态
if (runStateOf(c) != rs)
// 如果状态变化,跳出内层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Worker 封装任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取锁后,检查线程池运行状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加 worker
workers.add(w);
int s = workers.size();
// 池中同时存在的最大线程数。
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
// 移除 worker
addWorkerFailed(w);
}
return workerStarted;
}
Worker 相关方法
Worker 是线程池中的执行线程,需要关注以下方法:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 执行完成的任务计数
volatile long completedTasks;
// 构造函数
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 使用 ThreadFactory 创建线程,指定 thread 的 target 为 this(当前 Worker 线程)
this.thread = getThreadFactory().newThread(this);
}
// 指定了 thread 的 target 为 this
// thread 执行完成会调用 Worker 线程的 run 方法
public void run() {
// 调用线程池的 runWorker
runWorker(this);
}
}
runWorker 方法
// Worker 线程方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 如果 firstTask 不为空,或者 workerQueue 中能取出任务
// 如果 task == null,表示 Worker 空闲,跳出循环
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态为 STOP(TIDYING、TERMINATED)
// 确保 Worker 线程执行 interrupt()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 提供钩子函数 beforeExecute、afterExecute 可以重写方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 在 Worker 线程中执行任务的方法体
task.run();
} catch (RuntimeException x) {
// ...
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Worker 执行退出(workQueue 中没有任务)
processWorkerExit(w, completedAbruptly);
}
}
// 获取 workerQueue 中的任务
private Runnable getTask() {
for (;;) {
// 判断线程池的运行状态 ...
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
try {
// 如果超过 keepAliveTime 没有获取到 workQueue 中的任务,返回 null
// 如果不需要判断超时时间,一直阻塞,等待任务
// 如果返回 null,意味着 Worker 线程空闲可以回收,参考 runWorker 方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
}
}
}
// Worker 退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 判断是否被打断退出
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计线程池完成任务数
completedTaskCount += w.completedTasks;
// 移除 Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// ...
if (runStateLessThan(c, STOP)) {
// 线程池 RUNNING 状态
// 如果 workerQueue 不为空,并且活动线程数量少于 corePoolSize
// 启动新的 Worker 线程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
Executors
以上,基本上介绍了 ThreadPoolExecutor 管理线程的方法。Executors 提供了便捷创建多种不同线程池的方法
// 固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 可以无限提交的可缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 可以周期性执行任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 单线程线程池,串行执行任务
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}