深入理解线程池源码
核心属性
- corePoolSize :核心线程数,一般情况下,该数量的核心线程创建好之后,会常驻在线程池中,不会应空闲而关闭,可以设置allowCoreThreadTimeOut=true使核心线程空闲关闭
- maximumPoolSize :最大线程数,>核心线程数。
- keepAliveTime : 空闲时间,当线程获取任务时,超过keepAliveTime仍然获取不到任务,那么线程执行完所有逻辑后,自动消亡,workerSet也会移除该worker对象
- unit : 空闲事件keepAliveTime 的单位
- BlockingQueue<Runnable> workQueue : 任务的阻塞队列,当前提交任务时,工作线程已经>= 核心线程数, 则会将任务 推入阻塞队列中。如果阻塞队列达到最大长度,则会在工作线程数 不超过最大线程数maximumPoolSize的情况下,继续创建空闲线程来处理任务。
- ThreadFactory threadFactory: 创建线程的工厂
- RejectedExecutionHandler handler :任务的拒绝策略。当线程数任务阻塞队列满了,且工作线程数 大于等于 最大线程数了, 则 线程池无法调度线程则处理任务,调用构造方法传入的RejectedExecutionHandler实例的rejectedExecution()方法来拒绝任务。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态 | value | 说明 |
---|---|---|
RUNNING(当线程池创建出来的初始状态) | 111 | 能接受任务,能执行阻塞任务 |
SHUTDOWN(调用shutdown方法) | 000 | 不接受新任务,能执行阻塞任务 肯定可以 執行正在執行的任務 |
STOP(调用shutDownNow) | 001 | 不接受新任务,打断正在执行的任务,丢弃阻塞任务 |
TIDYING(中间状态) | 010 | 任务全部执行完,活动线程也没了 |
TERMINATED(终结状态) | 011 | 线程池终结 |
常用api
execute(Runable)
执行任务,无返回值
public void execute(Runnable command);
submit(Runable)
会返回Future对象,调用Future对象的get(),会阻塞,直到拿到返回值返回
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
doInvokeAny
返回最快执行完的任务的结果,集合中其他正在执行的线程会被关闭
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
invokeAll
执行所有任务,返回List<Future<T>>
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
shutdown()
不会接收新的任务,但是已经运行和在队列中的任务会执行完,然后在关闭线程
线程状态变成SHUTDOWN
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
awaitTermination(long timeout, TimeUnit unit)
等待线程池关闭,会提前也会超时
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
shutdownNow
打断所有所有正在执行的任务,返回队列中的任务
线程状态变成STOP
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 打断正在工作的线程
interruptWorkers();
// 从队列中取出等待的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 返回队列中等待的任务
return tasks;
}
源码解析
流程图
image一 、任务的执行以及线程的创建 : execute(Runnable task):
传入Runnable 对象作为要执行的任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1110000000000000000000000 | 0 = 111000000000000000000
int c = ctl.get();
// 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
// workerCountOf(c) : c & CAPACITY
// CAPACITY : (1 << COUNT_BITS)-1 : 先左移29位 0010 0000 0000..... 然后-1 0001 1111 1111 1111 1111....后面29位全是1
// 这时候&c,那么 c的 低29位 与 000 11111...(29个1) 做 & 运算,只有 为1的才保留下来, 结果为高三位 为0,标识线程数量的29位,可以很好的保留下来。
if (workerCountOf(c) < corePoolSize) {
// 则创建新增核心线程,并执行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
// 如果 阻塞队列还没有满,则是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程非运行状态,并且移除任务成功,执行线程池决拒绝任务的策略
// 啥时候移除不成功?可能并发的时候,被别的线程 取出来了 执行了。
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
// // 判断工作线程是否 = 0,这种情况是 核心线程数为0的时候,需要创建空闲线程来处理队列中的任务,比如CachedThreadPool,第一次进来
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三种情况 :如果阻塞队列满了,阻塞队列添加task失败,就会尝试创建空闲线程
// 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
else if (!addWorker(command, false))
// 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
reject(command);
}
1. 当前工作线程小于核心线程数,则创建Worker对象,加入到workerSet中
imageaddWorker(command, true)
创建Worker对象(持有线程),并调用持有线程的start方法,在run方法中执行runnable
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取当前工作线程数
int wc = workerCountOf(c);
// 如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
// 核心线程 判断是否 > corePoolSize,空闲线程,判断 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas,工作线程数+1,退出循环,因为并发问题只有对工作线程数量加成功了,才能在下面开启线程。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 重新校验线程池 运行状态
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下面走创建线程的逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 线程一个worker, Worker 是Thread的子类,传入runnable对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 不是关闭状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加到工作线程集合中
workers.add(w);
// 当前工作线程集合大小
int s = workers.size();
// 更新 线程池 线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 执行task,启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker.Run方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 执行任务
public void run() {
runWorker(this);
}
}
在run方法中调用runWorker方法,传入当前对象
- 该worker对象第一次执行任务时,w.firstTask是!= null的,所以可以进入while的循环体, 执行Runnable的run方法
- 第二次进来则从阻塞队列中拿任务。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
Runnable task = w.firstTask;
// 之后清空,第二次进来是null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 无限循环,当前有任务未执行 或者 阻塞队列中有任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行runnable的run方法执行业务逻辑
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
// 线程关闭前,从worderSet中移除worker对象
processWorkerExit(w, completedAbruptly);
}
}
2. 当前工作线程数已达到核心线程数了,但是阻塞队列还没满
则会往workQueue 存入Runnable对象
如果 队列长度还没达到上限,则offer方法会成功存入Runnable对象,返回true
如果 队列长度已达到上限,则返回false,说明当前工作线程从队列中拿task,处理task的速度还不够,会创建非工作线程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1110000000000000000000000 | 0 = 111000000000000000000
int c = ctl.get();
// 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
// workerCountOf(c) : c & CAPACITY
// CAPACITY : (1 << COUNT_BITS)-1 : 先左移29位 0010 0000 0000..... 然后-1 0001 1111 1111 1111 1111....后面29位全是1
// 这时候&c,那么 c的 低29位 与 000 11111...(29个1) 做 & 运算,只有 为1的才保留下来, 结果为高三位 为0,标识线程数量的29位,可以很好的保留下来。
if (workerCountOf(c) < corePoolSize) {
// 则创建新增核心线程,并执行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
// 如果 阻塞队列还没有满,则是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程非运行状态,并且移除任务成功,执行线程池决拒绝任务的策略
// 啥时候移除不成功?可能并发的时候,被别的线程 取出来了 执行了。
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
// // 判断工作线程是否 = 0,这种情况是 核心线程数为0的时候,workQueue.offer(command)返回false,就需要创建空闲线程来处理队列中的任务,比如CachedThreadPool,第一次进来
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三种情况 :如果阻塞队列满了,阻塞队列添加task失败,就会尝试创建空闲线程
// 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
else if (!addWorker(command, false))
// 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
reject(command);
}
3. 阻塞队列已满,添加task失败,就会尝试创建空闲线程
创建空闲线程的方法和创建核心线程的方法都是addWorker(runnable,boolean core),只不过传入的core参数是false,表示是空闲线程
如果是空闲线程创建,则会判断当前工作线程数是否 > 最大线程数maximumPoolSize, 如果是创建核心线程则判断的是 核心线程数
如果大于 最大线程数maximumPoolSize 就会创建线程失败
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取当前工作线程,
int wc = workerCountOf(c);
// 如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
// 核心线程 判断是否 > corePoolSize,空闲线程,判断 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 后面创建线程Worker对象和创建核心线程是一模一样的
}
4. 任务拒绝:阻塞队列满了,并且 工作线程数已经达到最大线程数了, 则尝试创建空闲线程会失败,走任务的拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// -100000000000000000000000000000 | 0 = -100000000000000000000000000000
int c = ctl.get();
// 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
// workerCountOf(c) : 11111111111111111111111111111 & -100000000000000000000000000000
if (workerCountOf(c) < corePoolSize) {
// 则创建新增核心线程,并执行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
// 如果 阻塞队列还没有满,则是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三种情况 :如果满了,阻塞队列添加task失败,就会尝试创建空闲线程
// 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
else if (!addWorker(command, false))
// 如果 >= maximumPoolSize,执行拒绝策略
reject(command);
}
jdk提供的拒绝策略类 :
image-
AbortPolicy 抛异常
/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
-
DiscardPolicy 丢弃 = 啥也不干
/** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
-
DiscardOldestPolicy 推出并忽略阻塞队列中的第一个任务,尝试执行当前任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// poll出阻塞队列中的第一个任务,并忽略掉
e.getQueue().poll();
// 以执行当前任务
e.execute(r);
}
}
}
................................................
二 、线程的维护
线程池中的作用维护线程,避免频繁创建,销毁线程而带来系统资源的浪费。
核心线程默认(可以配置allowCoreThreadTimeOut = true 来设置 注销核心线程 )是不会在执行完某一个任务后被注销的
空闲线程 在空闲时间达到keepAliveTime 后, 会自动注销(执行完run方法)。
1. 线程的阻塞 :
Worker.runWorker(Worker w)
线程的执行方法中,用while的方式,判断 当前是否有任务(第一次被创建出来) 或者 从阻塞队列中拿任务
- 判断 当前是否有任务(第一次被创建出来)
- 阻塞队列中有任务 :execute任务时,当工作线程数 > 大于核心线程数时且 阻塞队列没有满时, 会把任务存入阻塞队列
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
Runnable task = w.firstTask;
// 之后清空,第二次进来是null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 无限循环,当前有任务未执行 或者 阻塞队列中有任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行runnable的run方法执行业务逻辑
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
// 线程关闭前,从worderSet中移除worker对象
processWorkerExit(w, completedAbruptly);
}
}
如果可以不停的获取任务,处理任务,这种情况下 所有线程都不会被注销,因为无法退出while循环
2. 线程的注销
但是当没有任务提交时,也就是当前任务没有,阻塞队列里也拿不到任务,线程则处于空闲状态,空闲线程 空闲状态下的时间达到keepAliveTime ,则会退出while循环,结束线程。
而核心线程则会在getTask中(如果没配置allowCoreThreadTimeOut=true) 阻塞住, 不返回结果,直到阻塞队列中可以获取到任务, 再进入while循环体。
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否允许核心线程超时关闭 或者 当 工作线程数 > 核心线程数了
// 当 线程中 只剩下核心线程的时候, wc > corePoolSize 就不会返回true,则会workQueue.take()阻塞住
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果允许核心线程超时注销 或者 当前工作线程数 > 核心线程数, 则调阻塞队列的 poll,超时返回null
// 否则调take()方法,一直拿不到就一直阻塞
// 这就说明,只有允许核心线程超时注销,或者 当 当前工作线程数 > 核心线程数时,才会调 阻塞队列会超时的poll方法,
// runWorker方法才会退出while循环体, 结束线程
// 如果allowCoreThreadTimeOut被设置为true,则所有线程从队列中拿任务调用的都是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,所有线程在poll超时之后,仍然没获取到任务,则会返回 null ,退出循环体, 结束线程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从workers移除线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers移除线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果允许核心线程超时关闭,则为0,否则为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果当前工作线程数 > 最小的线程数量
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 小于最小的线程数量,添加worker
addWorker(null, false);
}
}
常用的线程池配置
jdk的Executors类提供了4个创建线程池的配置方法, 通过之前的原理,我们来分析下这些线程池的不同
1. newFixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建一个 通过操作一个共享的无界队列来复用固定数量的线程的线程池。
首先看构造方法,核心线程数和最大线程数是一样的 ,说明不存在线程池扩容的情况
空闲有效时间为0 毫秒, 由于只存在核心线程,所以不存在 线程被注销的情况
LinkedBlockingQueue 是一个无界队列,默认大小为int的最大值,所以不会出现 队列长度不够而导致 创建空闲线程的情况,也就不会出现 拒绝策略。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
总结 :
- 线程数固定,
- 没有多余线程线程回收,
- 不会出现因线程不够,队列装不下而拒绝任务的情况
2.newSingleThreadExecutor
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
只有一个线程,无界队列
3.newCachedThreadPool
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
典型的缓存策略。
说明适用于 吞吐量优先,会优先开启尽可能多的线程来保证数据处理效率,适合高并发场。如果任务不那么繁忙了,线程空闲下来,超过一分钟线程就被注销,可灵活回收空闲线程。
1. 如何灵活回收空闲线程?
从构造方法可以看到, 核心线程是0,是没有常驻线程的,所有的线程都是非核心线程。并且超时时间设置为60s,超过60没有从队列中获取到任务就会被回收。
2. 如果保证吞吐量优先?
同样从构造方法得知,非核心线程数量 最大可以达到Integer.MAX_VALUE个,也就是整形的最大数2^31-1个, 如果并发量高,会启用大量的线程来更快的处理任务,几乎不会有任务在排队的现象。
那么是有一个任务来就创建一个线程吗?
答案肯定不是的, 关键点在于 SynchronousQueue这个队列,和一般的BlockingQueue不同,这个队列的offer() 只有在 有线程阻塞在poll()方法的时候才会返回true,否则返回false,从ThreadPoolExecutor的execute(Runnable r) 可以得知, 当 workQueue.offer(command)返回false时,会调用下面那个else if里的addWorker去创建线程, 由于最大线程数 是Integer.MAX_VALUE,那么是肯定 可以新创建线程的。
image那么 当没有任何其他线程阻塞在队列的poll()方法时,有两种情况
- 要么 是第一次进来,还没有产生任何线程在作业
- 要么 就是其他线程在正在处理任务, 整个线程池的所有线程都处于繁忙的状态,就没有任何空闲下来的线程。
所以,基于第2种情况,可以看出 为了吞吐量优先,就会新创建一个线程来处理 当前要提交的任务,保证任务的及时处理。
如果有线程空闲下来,在60s之内还未被回收,那么此时提交任务,调用workQueue.offer(command)这处代码就会返回true,将任务提交至队列,让其他线程poll到 处理即可,就不会创建新的线程, 达到线程的复用,节省了线程创建,回收的资源开销。
4.newScheduledThreadPool
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
定时任务线程池
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
延迟执行
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
重复执行
自定义
schedule(){
// dosomething
// 递归
schedule();
}
api
scheduleWithFixedDelay
执行完任务再计算延迟时间
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleAtFixedRate
从任务开始执行就计算延迟时间
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
案例 每周三22点执行
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
// 计算当前时间距离目标时间还有多久
// 初始化延迟时间 = 目标时间 - 当前时间
// 周期 = 7天
int period = 7;
scheduled.scheduleAtFixedRate(() -> {
//doSomething
}, 初始化延迟时间, period, TimeUnit.DAYS)
}