面试官:请叙述ThreadPoolExecutor的执行过程与原
2023-02-21 本文已影响0人
互联网高级架构师
核心方法一:execute
每次使用线程池都是从execute执行任务,只知道通过该方法就可以开启多线程并执行任务,但一直不清楚其背后执行的逻辑与原理,下面通过 ThreadPoolExecutor 的核心源码解析了解它的执行过程
public class ThreadPoolExecutor extends AbstractExecutorService {
// 核心线程池大小
private final int corePoolSize;
// 最大线程池大小
private final int maximumPoolSize;
// 线程空闲超时时间
private final long keepAliveTime;
// 时间单位
private final TimeUnit unit;
// 等待队列,用于保存等待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 线程池中当前线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程工厂,用于创建新的线程
private final ThreadFactory threadFactory;
// 拒绝策略,用于处理无法处理的任务
private final RejectedExecutionHandler handler;
// 等待任务的线程是否可以中断
private volatile boolean allowCoreThreadTimeOut;
// 已经完成任务的计数器
private final AtomicLong completedTaskCount = new AtomicLong();
// 构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
this.unit = unit;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
this.handler = handler;
}
// 执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果线程数小于核心池大小,则创建新的线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于运行状态,将任务添加到等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已经停止或者任务队列已满,移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池中没有线程,则创建新的线程来执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果无法将任务添加到等待队列,则创建新的线程来执行任务,如果失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
}
从上述源码execute方法中可以看出,其中最重要的方法是addWorker,该方法是创建线程和执行任务的核心,看看该方法的源码:
核心方法二:addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // 获取线程池的 ctl 状态值
int rs = runStateOf(c); // 获取线程池的运行状态
// 如果线程池的状态为 SHUTDOWN 并且下列三个条件同时满足,则不添加新的工作线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 获取线程池中的工作线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 通过execute可得知,firstTask就是传进来的任务
w = new Worker(firstTask);
// Worker中thread成员变量是通过 getThreadFactory().newThread(this); 构建出来的Thread
// Work被当作任务(this)传入Thread,所以w.thread; 就相当于new Thread(w);
// 所以当t.start() 的时候,其实就会执行worker中的run方法,run方法调用了runWorker方法
// runWorker方法也是线程池为什么会一直阻塞等待任务的方法。
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker实现了Runnable接口,所以对于线程池来说,worker就是一个待执行的任务。Worker类自身的源码很简单,Worker构造函数中做了两件事:
- 将传入task赋值给 firstTask
- 通过 getThreadFactory().newThread(this); 构建出来 Thread 并将自身作为任务
在addWorker方法中再通过 Worker 构建出来的Thread执行,看上述源码中t.start(),由于worker作为task,所以开启线程时会执行Worker的run方法,worker的run方法调用runWorker方法,runWorker方法也是线程池开启如果没有手动调用关闭,那么会一直存在,源码如下:
核心方法三:runWorker
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取 Worker 对象的 firstTask 属性
Runnable task = w.firstTask;
w.firstTask = null;
// Worker 的 lock() 方法是一个独占式的获取锁,因此如果已经有线程获取到了锁,当前线程就会一直阻塞在这里,直到获取到锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果第一个任务不为空,则执行第一个任务
while (task != null || (task = getTask()) != null) {
// 加锁,防止任务执行过程中被中断
w.lock();
// 如果线程已经被中断,抛出异常
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// 执行任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 执行结束,如果线程未被中断,则执行 workerDone() 方法进行清理工作
processWorkerExit(w, completedAbruptly);
}
}
从addWorker中得知,firstTask 是我们传进来需要被执行的任务,在runWorker方法中,他再次取worker中的firstTask并执行。
结论
通过源码分析得知从 execute -> addWorker -> runWorker,忽略其中的细节,Worker贯穿了线程池执行流程的全部,弄清楚Worker类的作用,想必对线程池的执行流程就会很清晰了。
作者:刺客567
链接:https://juejin.cn/post/7202188243113263162
来源:稀土掘金