ThreadPoolExecutor学习笔记
Java有两个线程池类:ThreadPoolExecutor和ScheduledThreadPoolExecutor,且均继承于ExecutorService。Java API提供了Executors工厂类来帮助创建各种线程池。
Java线程池ExecutorService继承树:
ThreadPoolExecutor
ThreadPoolExecutor相关工作流程
1.ThreadPoolExecutor添加任务:
ThreadPoolExecutor 添加任务的流程
-
如果添加任务时,线程池中的线程数量少于corePoolSize,那么会直接创建一个新的线程(不管线程池中是否有空闲的线程),然后将任务分配给新建的线程,同时将线程加入到线程池中去;
-
如果线程池的线程数量大于等于 corePoolSize,就将任务添加到任务队列;
-
如果任务队列已经饱和(对于有边界的任务队列),那么就看下线程池中的线程数量是否少于 maximumPoolSize,如果少于,就创建新的线程,将当前任务分配给新线程,同时将线程加入到线程池中。否则就对该任务执行 reject 策略。
-
在 ThreadPoolExecutor 中通过两个量来控制线程池的大小:corePoolSize 和 maximumPoolSize。corePoolSize 表示正常状态下线程池中应该持有的存活线程数量,maximumPoolSize 表示线程池可以持有的最大线程数量。
-
当线程池中的线程数量不超过 corePoolSize 时,位于线程池中的线程被看作 core 线程,默认情况下,线程池不对 core 线程进行超时控制,也就是 core 线程会一直存活在线程池中,直到线程池被关闭(这里忽略线程异常关闭的情况)。
-
当线程池中的线程数量超过 corePoolSize 时,额外的线程被看作非 core 线程,线程池会对这部分线程进行超时控制,当线程空闲一段时间之后会销毁该线程。非 core 线程主要用来处理某段时间并发任务特别多的情况,即之前的线程配置无法及时处理那么多的任务量,需要额外的线程来帮助。而当这批任务处理完成之后,额外的线程就有些多余了(线程越多占的资源越多),因此需要及时销毁。
2.ThreadPoolExecutor关闭线程池:线程池的关闭分为两种:平缓关闭(shutdown)和立即关闭(shutdownNow)。
- 调用 shutdown 方法之后,线程池不再接受新的任务,但是仍然会将任务队列中已有的任务执行完毕。
- 调用 shutdownNow 方法之后,线程池不仅不再接受新的任务,也不会再执行任务队列中剩余的任务,同时会通过中断的方式尝试停止正在执行任务的线程(我们知道对于中断,线程可能响应也可能不响应,所以不能保证一定停止线程)。
构造方法参数讲解
ThreadPoolExecutor提供4个构造函数,最终均会调用该构造函数
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:线程池核心线程数大小
- maximumPoolSize:线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务
- keepAliveTime:线程池中超过corePoolSize数目的空闲线程最大存活时间,allowCoreThreadTimeOut为
true的核心线程有效时间 - unit:时间单位
- workQueue:阻塞任务队列,用于保存任务以及为工作线程提供待执行的任务
- threadFactory:线程工厂,线程生成器
- handler:当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理;ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略:
- CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
- AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
- DiscardPolicy:直接抛弃任务,不做任何处理
- DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交
创建完线程池后,可通过submit或execute方法提交任务
线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程池最大线程数=536870911(2^29-1)
事实上COUNT_BITS =29,而上面的5重线程状态实际上是使用32位中的高3位来表示,低29位存线程数,这样线程池的状态和线程数量就由一个变量存储,即:
-
RUNNING=111: 线程池正常运行,可以接受新的任务并处理队列中的任务。
-
SHUTDOWN=000:关闭状态,不再接受新的任务,但是会执行队列中的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
-
STOP=001:不再接受新任务,不处理队列中的任务,中断进行中的任务。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
-
TIDYING=010:所有任务已经终止,workerCount为0,线程状态转换到TIDYING,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
-
TERMINATED=110:terminate()函数执行完成后进入该状态。
AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
初始化值为线程数0,状态为RUNNING。ctl变量是整个类的核心,AtomicInteger保证了对这个变量的操作是原子的,保证多线程同步问题,用这个变量保存了两个内容:
- 所有有效线程的数量
- 各个线程的状态(runState)
以下是关于ctl的一些操作:
/**
* 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111
* ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
* 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
*
* @param c 该参数为存储runState和workerCount的int值
* @return runState的值
*/
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/**
* 这个方法用于取出workerCount的值
* 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
* 保留参数的低29位,也就是workerCount的值
*
* @param c ctl, 存储runState和workerCount的int值
* @return workerCount的值
*/
private static int workerCountOf(int c) {
return c & CAPACITY;
}
/**
* 将runState和workerCount存到同一个int中
* “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
*
* @param rs runState移位过后的值,负责填充返回值的高3位
* @param wc workerCount移位过后的值,负责填充返回值的低29位
* @return 两者或运算过后的值
*/
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
// 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
Worker
内部类Worker是对任务的封装,实现Runnable接口,并继承AbstractQueuedSynchronizer(AQS,队列同步器)实现了一个简单的不可重入(也就是说该锁只能被一个线程获取一次)的互斥锁,因此每个线程实际上关联了一个互斥锁。当线程执行任务时,需要首先获得关联的 Worker 锁,执行完任务之后再释放该锁。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;//当前worker对象关联的线程
/** Initial task to run. Possibly null. */
Runnable firstTask;// 线程创建后的初始任务
/** Per-thread task counter */
volatile long completedTasks;// 线程完成的任务数量
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//禁止中断直到执行了 runWorker 方法,在shutdownNow 中断线程之前,会首先判断 state 是否大于 等于 0
// 所以这里将 state 设为 -1,可以防止当前线程被中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
// 成功获得锁
return true;
}
// 线程进入等待队列
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker继承了AQS,使用AQS来实现独占锁的功能。与ReentrantLock区别在于,它是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
worker中断:
-
中断是针对运行的线程,当线程创建之后只有调用了 start 方法,线程才真正运行,而 start 方法的调用是在 runWorker 方法中的,也就是有只有执行了 runWorker 方法,线程才真正启动
-
在 shutdown 方法中,中断线程之前会首先尝试获取线程的 Worker 锁,只有获得了 Worker 锁才对线程进行中断。而获得 Worker 锁的前提是 Worker 的锁的状态变量 state 为 0,当 state 设为 -1 之后,任何线程都无法获得该锁,那么也就无法对线程执行中断操作。
-
在 shutdownNow 方法中,会调用 Worker 的 interruptIfStarted 方法来中断线程,而 interruptIfStarted 方法只有在 state >= 0 时才会中断线程,所以将 state 设为 -1 可以防止线程被提前中断。
execute和submit
通过 execute 或者 submit 方法都可以向线程池中添加一个任务,submit 会返回一个 Future 对象来获取线程的返回值:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit 中只是将 Runnable 对象包装了一下,最终还是调用了 execute 方法。 execute 方法的实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程数量少于 corePoolSize,将改任务分配给一个新建的线程
if (workerCountOf(c) < corePoolSize) {
// true 表示将当前线程添加为核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程数量大于等于 corePoolSize,首先尝试将任务添加到任务队列
// workQueue.offer 会将任务添加到队列尾部
if (isRunning(c) && workQueue.offer(command)) {
// 重新检查状态
int recheck = ctl.get();
// 如果发现当前线程池不是处于 Running 状态,就移除之前的任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 1. 当前线程池处于 Running 状态,但是工作线程数量为 0,需要创建新的线程
// 2. 移除任务失败,但是工作线程数量为 0,需要创建新的线程来完成移除失败的任务
addWorker(null, false);
}
//如果workQueue满了,那么这时候可能还没到线程池的maxnum,所以尝试增加一个Worker
//线程池不是```RUNNING```状态
else if (!addWorker(command, false))
// 如果Worker数量到达上限,那么就拒绝此线程
reject(command);
}
- 第一步检查当前线程是否小于corePoolSize,如果小于,线程池会创建一个新的线程来执行此任务。会调用
addWorker(command, true)
创建新线程并执行任务,其中第二个参数用于标识是否添加为核心线程。addWorker(command, true)
内部实现细节后序分析 - 第二步如果前面的判断不满足条件,将此任务插入到工作队列中去,即
workQueue.offer(command)
。插入成功后存在两种情况:
- 再次检查此时的线程池是否还处于
RUNNING
状态,如果不是的话,将之前插入队列的那个任务移除,然后调用reject(command)
拒绝此任务的提交。 - 当前线程池处于
RUNNING
状态,但我们插入任务到 workQueue 中的同时, 如果此时线程池中的线程都执行完毕并终止了, 在这样的情况下刚刚插入到 workQueue 中的任务就永远不会得到执行了. 为了避免这样的情况, 因此我们由再次检查一下线程池中的线程数, 如果为零, 则调用 addWorker(null, false) 来添加一个线程。
- 第三步如果线程池不是
RUNNING
状态或者线程池饱和了,调用reject(command)
拒绝此任务的提交。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
// 这里一大段的 for 语句, 其实就是判断和处理 core 参数的.
// 当经过判断, 如果当前的线程大于 corePoolSize 或 maximumPoolSize 时(根据 core 的值来判断),
// 则表示不能新建新的 Worker 线程, 此时返回 false.
// retry 类似于 goto,continue retry 跳转到 retry 定义, 而 break retry 跳出 retry
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 当 core 为真, 那么就判断当前线程是否大于 corePoolSize
// 当 core 为假, 那么就判断当前线程数是否大于 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : ma))
return false;
// 使用 CAS 方式将线程数量增加,如果成功就跳出 retry,如果失败,证明有竞争,那么重新到retry。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程池运行状态发生了改变就从 retry(外层循环)处重新开始,
if (runStateOf(c) != rs)
continue retry;
// 程序执行到这里说 CAS 没有成功,那么就再次执行 CAS
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 程序用一个 HashSet 存储线程,而 HashSet 不是线程的安全的, 所以将线程加入 HashSet 的过程需要加锁。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 1. rs < SHUTDOWN 说明程序在运行状态
// 2. rs == SHUTDOWN 说明当前线程处于平缓关闭状态,而 firstTask == null
// 说明当前创建的线程是为了处理任务队列中剩余的任务(故意传入 null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
// 等价于
if(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
- rs >= SHUTDOWN && rs != SHUTDOWN:说明线程池处于 STOP,TIDYING 或者 TERMINATED 状态下,处于这三种状态说明线程池处理完了所有任务或者不再执行剩余的任务,可以直接返回。
- rs == SHUTDOWN && firstTask != null:如果上面的条件不成立,说明当前线程池的状态一定是处于 SHUTDOWN 状态,在 execute 方法中,我们提到了如果传入 null,说明创建线程是为了执行队列中剩余的任务(此时线程池中没有工作线程),这时就不应该返回。而如果 firstTask != null,说明不是为了处理队列中剩余的任务,可以返回。
- rs == SHUTDOWN && workQueue.isEmpty():说经任务队列中的任务已经全部执行完了,无需创建新的线程,可以返回。
runWorker
当创建了线程并成功启动之后,会执行 Worker 的 run 方法,而该方法最终调用了 ThreadPoolExecutor 的 runWorker 方法,并且将自身作为参数传进去了,下面是 runWorker 方法的实现:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 这里将 Worker 中的 state 设为 0,以便其他线程可以获得锁
// 从而可以中断当前线程
w.unlock(); // allow interrupts
// 用来标记线程是正常退出循环还是异常退出
boolean completedAbruptly = true;
try {
// 如果任务不为空,说明是刚创建线程,如果任务为空,则从队列中取任务
// 如果队列没有任务,线程就会阻塞在这里
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// 任务执行之前做一些处理,空函数,需要用户定义处理逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
// 因为 runnable 方法不能抛出 checkedException ,所以这里
// 将异常包装成 Error 抛出
throw new Error(x);
} finally {
// 任务执行完之后做一些处理,默认空函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理worker退出的逻辑
processWorkerExit(w, completedAbruptly);
}
}
w.unlock();
woker在创建时调用setState(-1)
,将state设为-1,抑制工作线程的 interrupt 信号, 直到此工作线程正是开始执行 task. 那么在 addWorker() ``中的`` w.unlock()
就是允许 Worker 的 interrupt 信号。unlock()
方法最终会调用setState(0)
将状态设为0
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
- 如果线程池的状态小于 STOP,也就是处于 RUNNING 或者 SHUTDOWN 状态,要保证线程池中的线程处于非中断状态
- 如果线程池的状态大于等于 STOP,也就是处于 STOP,TIDYING 或者 TERMINATED 状态,要保证线程池中的线程处于中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
-
!wt.isInterrupted()
,该条件说明当前线程没有被中断,只有在线程没有被中断的前提下,才有可能对线程执行中断操作。 -
(runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
: - 如果第一个条件为true,即线程池状态处于大于等于STOP,而线程没有被中断,因此线程应该需要被中断;
- 如果第一个条件为false,线程池处于 RUNNING 或者 SHUTDOWN 状态,说明此线程不应该被中断。调用
Thread.interrupted()
返回此线程的状态,而且此方法会复位线程的中断状态(设为false),即之后的!wt.isInterrupted()
会返回true。如果中断状态为false,跳出if判断; - 如果中断状态为true,说明该线程被中断过了,此时判断线程的中断是不是由 shutdownNow 方法(该方法会中断线程池的线程,并修改线程池状态为 STOP)造成的,所以需再检查一下线程池的状态,如果线程池已经变为 STOP 或者之后的状态,说明确实是由 shutdownNow 方法造成的,需要重新对线程进行中断,如果不是那就不需要再中断线程了。
总结一下runWorker方法的执行过程:
- while循环不断地通过getTask()方法获取任务;
- getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法;
- runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
getTask
getTask方法用来从阻塞队列中取任务,代码如下:
private Runnable getTask() {
//主要是判断后面的poll是否要超时
boolean timedOut = false; // Did the last poll() time out?
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
// 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
// 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 递减workerCount值
decrementWorkerCount();
return null;
}
// timed 用于判断是否需要对线程进行超时控制
boolean timed;
// 1.RUNING状态
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
// 1. allowCoreThreadTimeOut: 为 true 说明可以对 core 线程进行超时控制
// 2. wc > corePoolSize: 说明线程池中有非 core 线程
timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数量不大于 maximumPoolSize 并且没有超时,则退出循环,否则workerCount递减,返回null,结束当前thread
if (wc <= maximumPoolSize && !(timedOut && timed))
break;
// workerCount递减,结束当前thread
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
// 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 如果允许超时控制,则执行 poll 方法,该方法响应超时,当 keepAliveTime 时间内
// 仍然没有获取到任务,就返回 null。take 方法不响应超时操作,当获取不到任务时会一直等待。
// 另外不管 poll 还是 take 方法都会响应中断,如果没有新的任务添加到队列中,会直接抛出 InterruptedException
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;// 超时
} catch (InterruptedException retry) {
timedOut = false;// 线程被中断重试
}
}
}
- 首先timedOut初始值为false,如果allowCoreThreadTimeOut 为true或者线程数大于核心线程数,则timed为true,那么workQueue通过poll去取线程,如果正常返回,返回取到的task.
- 如果返回超时,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可,通过循环
wc <= maximumPoolSize && ! (timedOut && timed)
,减少线程数
*什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
runWorker()
和getTask()
这两个方法完成了任务的获取和阻塞线程的工作,大致流程:
- 通过 while 循环不断的从任务队列中获取任务,如果当前任务队列中没有任务,就阻塞线程。
- 如果 getTask 返回 null,表明当前线程应该被回收,执行回收线程的逻辑。
- 如果成功获取任务,首先判断线程池的状态,根据线程池状态设置当前线程的中断状态
- 在执行任务之前做一些预处理(用户实现)
- 执行任务
- 在执行任务之后做一些后处理(用户实现)
processWorkerExit
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
//当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,直接添加一个线程;
// 线程正常结束, 如果允许对 core 线程进行超时控制,并且任务队列中有任务, 则保证线程数量大于等于 1
// 如果不允许对 core 进行超时控制,则保证线程数量大于等于 corePoolSize
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
tryTerminate
processWorkerExit 中调用了 tryTerminate 方法,该方法根据线程池状态进行判断是否结束线程池。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RUNNING,因为还在运行中,不能停止;
* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 只能是以下情形会继续下面的逻辑:结束线程池。
// 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
// 2.STOP状态,当调用了shutdownNow方法
if (workerCountOf(c) != 0) {
// 如果工作线程数量不为 0,中断线程池中第一个线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程状态设为 TIDYING,如果设置不成功说明线程池的状态发生了变化,需要重试
//如果设置成功,则调用terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated方法默认什么都不做,留给子类实现
terminated();
} finally {
// 将线程状态设为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
在tryTerminate ()
方法中,如果满足下面两个条件,就将线程池的状态设为TIDYING,然后执行完terminated()
后,线程池状态变为TERMINATED:、
- 线程池状态为SHUTDOWN ,且线程池和任务队列均为空;
- 线程池状态为 STOP ,且线程池为空
在tryTerminate ()
方法中,当工作线程不为0的时候,回去尝试中断线程池中的一个线程,这样做的主要目的在于防止shutdown ()
方法的中断信号丢失。
当shutdown ()
方法被调用时,会执行interruptIdleWorkers()
,此方法会先检查线程是否是空闲状态,如果发现线程不是空闲状态,才会中断线程,中断线程让在任务队列中阻塞的线程醒过来。但是如果在执行interruptIdleWorkers()
方法时,线程正在运行,此时并没有被中断;如果线程执行完任务后,然后又去调用了getTask()
,这时如果workQueue中没有任务了,调用workQueue.take()
时就会一直阻塞。这时该线程便错过了shutdown()
的中断信号,若没有额外的操作,线程会一直处于阻塞的状态。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况,弥补了shutdown()
中丢失的信号。
interruptIdleWorkers
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 首先看当前线程是否已经中断,如果没有中断,就看线程是否处于空闲状态
// 如果能获得线程关联的 Worker 锁,说明线程处于空闲状态,可以中断
// 否则说明线程不能中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果 onlyOne 为 true,只尝试中断第一个线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查当前线程是否有关闭线程池的权限
checkShutdownAccess();
// 将线程池状态设为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断线程,这里最终调用 interruptIdleWorkers(false);
interruptIdleWorkers();
// hook 方法,默认为空,让用户在线程池关闭时可以做一些操作
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
public List < Runnable > shutdownNow() {
List < Runnable > tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查线程是否具有关闭线程池的权限
checkShutdownAccess();
// 更改线程状态
advanceRunState(STOP);
// 中断线程
interruptWorkers();
// 清除任务队列,并将任务返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
interruptWorkers
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 不管线程是否空闲都执行中断
for (Worker w: workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
在 interruptWorkers()
方法中,只要线程开始了,就对线程执行中断,所以 shutdownNow()
的中断信号不会丢失。drainQueue ()
主要作用是清空任务队列,并将队列中剩余的任务返回。
drainQueue
private List <Runnable> drainQueue() {
BlockingQueue <Runnable> q = workQueue;
ArrayList <Runnable> taskList = new ArrayList < Runnable > ();
// 该方法会将阻塞队列中的所有项添加到 taskList 中
// 然后清空任务队列,该方法是线程安全的
q.drainTo(taskList);
if (!q.isEmpty()) {
// 将 List 转换为 数组,传入的 Runnable[0] 用来说明是转为 Runnable 数组
for (Runnable r: q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
通过上面的分析,shutdownNow方法与shutdown方法类似,不同的地方在于:
- 设置状态为STOP;
- 中断所有工作线程,无论是否是空闲的;
- 取出阻塞队列中没有被执行的任务并返回。
shutdownNow()
方法执行完之后调用tryTerminate()
方法,目的就是使线程池的状态设置为TERMINATED。
以上都是学习一些大神相关源码分析和对照源码阅读的相关学习记录,ThreadPoolExecutor源码有些还是比较晦涩难懂的,一些地方还是理解的不是很透彻。这也是自己第一次尝试写学习笔记,也希望对正在学习了解ThreadPoolExecutor的同学提供一点帮助。
参考文章
【Java 并发】详解 ThreadPoolExecutor
深入理解Java线程池:ThreadPoolExecutor