ThreadPoolExecutor源码解析
2020-04-08 本文已影响0人
老荀
类定义
public class ThreadPoolExecutor extends AbstractExecutorService {
...
}
重要成员
ctl
线程池也采用了一个字段分高低位多用的方法
int一共32位,高3位用来表示线程池的状态,低29位用来记录线程池中正在执行的任务数量
// 11100000 00000000 00000000 00000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
以及和该字段配套的字段和方法
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111 11111111 11111111 11111111 = 536870911, 5亿多
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
有了上面那些数字的二进制解释,这里的3个方法就不难理解了
private static int runStateOf(int c) {
/*
* ~CAPACITY = 11100000 00000000 00000000 00000000
* 与上任何一个数都会忽略低29位的数字,只看最高的3位,就能获取线程池状态了
*/
return c & ~CAPACITY;
}
// 原理同上
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private static int ctlOf(int rs, int wc) {
// 高3位和低29位‘或’就能得到完整的32位数字
return rs | wc;
}
workQueue
线程池内部有一个阻塞队列(线程安全),当核心线程吞吐量不够时,用来存放任务
private final BlockingQueue<Runnable> workQueue;
Worker
重要内部类,以及配套的HashSet,从类定义上能看到Worker既是一个线程任务,也是一个AQS
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
...
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
...
}
private final HashSet<Worker> workers = new HashSet<Worker>();
其他构造器参数
这些参数从一定程度上会影响到线程池的逻辑
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile RejectedExecutionHandler handler;
构造器
一共有4个构造,前3个是第4个的重载
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
默认的RejectedExecutionHandler
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// 线程池一共内置了4种策略
public static class CallerRunsPolicy implements RejectedExecutionHandler {
// 不采用多线程的方式运行,依附于Caller的线程
}
public static class AbortPolicy implements RejectedExecutionHandler {
// 抛异常(默认)
}
public static class DiscardPolicy implements RejectedExecutionHandler {
// 直接丢弃
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
// 会尝试丢弃阻塞队列中最早放入的任务,然后重新丢入线程池尝试执行
}
重要方法
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前活跃线程数,小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 增加一个worker,标记为核心
if (addWorker(command, true))
return;
c = ctl.get();
}
// 核心线程达到上线,就尝试放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 线程池被关闭了,走reject流程
reject(command);
else if (workerCountOf(recheck) == 0)
// 放到了阻塞队列后,如果当前活跃线程为0,就添加一个,防止阻塞队列中没有work去获取
addWorker(null, false);
}
// 放入阻塞队列失败,就会再添加一个worker,并标记为非核心
else if (!addWorker(command, false))
// 如果还是失败就走reject流程
reject(command);
}
reject
先看简单的reject流程
final void reject(Runnable command) {
// 就是调用构造器的策略去处理
handler.rejectedExecution(command, this);
}
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 线程池处于关闭阶段了
return false;
for (;;) {
int wc = workerCountOf(c);
// 超过最大容量了
if (wc >= CAPACITY ||
// 或者 设置的核心线程数或者是最大线程数,取决于是否是核心线程
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
// worker count 增加成功了就退出最外层的循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下面是真正添加worker的过程
// boolean core 仅仅是上面判断用到了,创建的worker是不区分核心还是非核心的,一视同仁
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 直接new的,上面提到了Worker本身是一个Runnable也是一个AQS
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 由于要对 workers largestPoolSize这些成员进行修改,这里要加上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 这里的largestPoolSize只会加不会减,用来记录当前线程池最大的并发数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 这个才是线程池多线程运行的关键,这里其实是把Worker包装成线程对象启动的
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果启动失败走失败流程
addWorkerFailed(w);
}
return workerStarted;
}
启动worker失败流程
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 因为workers这个变量是HashSet不是线程安全的,所以对其修改都要加锁
mainLock.lock();
try {
if (w != null)
workers.remove(w);
// 把worker count减1
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
// 如果线程池还在运行,或者是SHUTDOWN状态并且阻塞队列中还有任务,就不会走terminate
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
// 下面就是STOP或者是SHUTDOWN状态,变到TIDYING和TERMINATED的流程了
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// 先改成TIDYING
try {
// 这个是hook方法留给子类实现的
terminated();
} finally {
// 最后改成TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有termination条件的线程,主流程是用不到的
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
下面来看看Worker的核心方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
...
public void run() {
runWorker(this);
}
// 下面三个方法就是封装了下AQS提供的方法
// 作用就是当成锁,保证一个Worker同时只能处理一个Runnable的task
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
...
}
runWorker
直接看runWorker的逻辑
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// Worker在初始化的时候有时候会带一个task对象,如果不为空就优先处理该任务
// 没有这个task就会从阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
// 加锁保证同时只能执行一个任务,还有一个用就是当线程池SHUTDOWN的时候,加了锁的Worker说明正在执行任务,可以避免被线程池关闭
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// hook方法在跑task之前执行
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 直接调用task的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// hook方法在task执行之后执行
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的task数量+1, 在worker退出的时候会累加到线程池的统计上
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 如果走到这里,说明worker跳出了上面的while循环,就是因为从阻塞队列中获取不到任务,或者超时了(如果有设置超时的话)
completedAbruptly = false;
} finally {
// 走worker的裁员流程,completedAbruptly=true代表是worker是因为抛异常中断的,false是正常退出
processWorkerExit(w, completedAbruptly);
}
}
getTask
这个方法就很重要了,控制了worker是否能获取到任务,就是控制了worker什么时候退出
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查下线程池状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否允许核心线程超时,不允许的话看当前活跃线程数是否大于核心线程大小
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 这里两个标志位都true时,并且阻塞队列中的确没有任务了,就会直接返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 这里返回null就会导致worker跳出外面都while循环,走裁员流程
return null;
continue;
}
try {
Runnable r = timed ?
// 如果核心线程能超时或者非核心线程
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// take这个方法是无限期的阻塞,直到获取元素的,所以如果是核心线程不超时的话,永远也不会退出,就在这里体现
workQueue.take();
if (r != null)
// 拿到了task就会返回
return r;
// 没拿到就只能是超时,这个标志位就会修改
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
以上就是线程池执行任务都主流程了
下面再来看看线程池关闭的流程,在此之前先看看线程池的状态
- RUNNING 初始化的状态,也是线程池绝大部分时间的状态
- SHUTDOWN 当调用线程池的shutdown方法后,线程池就会尝试转为状态,并且会尝试关闭空闲的线程,但是已经有任务在执行的线程会等他们执行完成
- STOP 当调用线程池的shutdownNow方法,线程池会尝试转为该状态,会关闭所有的线程,无论是否有任务
- TIDYING 当SHUTDOWN和STOP的关闭线程任务完成,并且队列中也没有要执行的任务时,会尝试转换为该状态
- TERMINATED 当terminated方法执行完毕后,会尝试转换为该状态
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 尝试转为SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 这个hook,我们是用不到的,因为访问权限是默认的,只有在一个包下的子类才能重写
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void advanceRunState(int targetState) {
for (;;) {
// 会一直循环到改成功为止(也可能是别的线程)
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 这里的tryLock就是用来过滤已经在执行任务的worker的
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 尝试转为STOP状态
advanceRunState(STOP);
interruptWorkers();
// 这个就是把阻塞队列中剩下的task,都取出来返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
以上就是线程池主要的方法介绍了