Java线程池如何保证线程池的核心线程存活不被销毁?execut
2021-01-18 本文已影响0人
湖与月
线程池介绍
基本元素
线程池是一个创建使用线程并能保存使用过的线程以达到服用的对象,他使用workQueue当作阻塞队列,workers作为线程的封装操作对象。
/**
* 用于保留任务并移交给工作线程的队列。
* 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),因此仅依靠isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN过渡到TIDYING时必须这样做)。
* 这可容纳特殊用途的队列,例如DelayQueues,允许poll()返回null,即使它在延迟到期后稍后可能返回non-null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 集合包含池中的所有工作线程。仅在mainLock上锁后访问。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
保证线程安全的元素
/**
* 主池控制状态变量ctl包含了两个概念字段,workerCount表示有效线程数,runState表示是否正在运行,正在关闭等
* 为了将它们打包为一个int,我们将workerCount限制为(2 ^ 29)-1(约5亿)个线程,而不是(2 ^ 31)-1(20亿)可以表示的线程。如果将来有问题,可以将该变量更改为AtomicLong,并调整以下移位掩码常量。但是在需要之前,使用int可以使此代码更快,更简单。
* workerCount是已被允许启动但不允许停止的工人数。该值可能与活动线程的实际数量暂时不同,例如,当ThreadFactory在被询问时创建线程失败,并且退出线程仍在终止之前执行簿记操作时,该值会有所不同。用户可见的线程池大小是工作集合的当前大小。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 锁定时要锁定工人集合和相关簿记。
* 虽然我们可以使用某种并发集合,但事实证明,通常最好使用锁。 原因之一是,这可以序列化interruptIdleWorkers,从而避免了不必要的中断风暴,尤其是在关机期间。 否则,退出线程将同时中断那些尚未中断的线程。 它还简化了一些相关的统计数据,如largePoolSize等。我们还在shutdown和shutdownNow上保留mainLock,以确保在单独检查中断和实际中断的权限时,工人集合是稳定的。
*/
private final ReentrantLock mainLock = new ReentrantLock();
execute(Runnable)执行逻辑
execute()执行逻辑,来自下方参考链接/*
* 1.如果正在运行的线程少于corePoolSize线程,会创建新线程。对addWorker方法的调用从原子上检查runState和workerCount,从而通过返回false来防止在不应该添加线程的情况下发出虚假警报。
* 2.如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有线程自上次检查后就死掉了)或自从进入该方法以来该池已关闭。因此,我们重新检查状态,并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。
* 3.如果我们无法将任务排队,则尝试添加一个新线程。如果失败,线程池可能已关闭或已饱和,因此拒绝该任务。
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
////获取线程池控制状态
int c = ctl.get();
//通过workerCountOf计算出实际线程数
if (workerCountOf(c) < corePoolSize) {
//未超过核心线程数,则新增 Worker 对象,true表示核心线程,false表示最大线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//核心线程满了,如果线程池处于运行状态则往队列中添加任务,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果不在运行状态则删除阻塞队列中的任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果实际线程数为0,添加非核心线程(我也不知道为什么)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//尝试调用最非核心线程,失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
其中主要方法是addWorker()。
private boolean addWorker(Runnable firstTask, boolean core) {
//这部分主要是对运行状态的判断,略过。。。
boolean workerStarted = false;//线程是否启动
boolean workerAdded = false;//线程是否添加
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 线程是否正在执行任务
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//线程启动!
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
经过一系列逻辑运行后,终于 t.start() 了!,然后他会调用Worker类重写的run()方法,而里面只有runWorker()方法调用。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//轮询调用 getTask 用于获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池被shutdown了,则执行interrupt()方法
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//方法里面是空的,需要自定义实现
Throwable thrown = null;
try {
//执行我们传入的代码
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//方法里面是空的,需要自定义实现
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//最后执行线程退出前的操作
processWorkerExit(w, completedAbruptly);
}
}
这个方法的主要逻辑是通过循环不停调用getTask()获取任务并执行,直到getTask()返回为空。
/**
* 根据当前配置设置执行阻塞或定时等待任务,或者线程由于以下任何原因而必须退出,则返回null:
* 1. 超过最大线程数。
* 2.线程池已停止。
* 3.线程池被关闭并且队列已经空了。
* 4.线程超时等待任务,并且在定时等待之前和之后都将终止线程(即{@code allowCoreThreadTimeOut || workerCount> corePoolSize}),并且如果队列为非空,此工作程序不是线程池中的最后一个线程。
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态以及阻塞队列是否为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允许核心线程超时或者实际线程数大于核心线程则为true,否则为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//poll():使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。
//take():使当前线程等待,直到发出信号或被中断为止。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结
最后总结一下,线程会在runWorker()方法里不停循环获取任务并执行,直到返回为空,而在getTask()方法里它就是调用阻塞队列的poll()或take()等待获取其中的任务。这两个方法的具体实现逻辑依赖线程池的阻塞队列。