并发编程(5)ThreadPoolExecutor原理解析
概述
由于线程的创建跟销毁是比较消耗资源的,也是比较耗时的。可能为了程序的需要,我们会创建很多线程,所以很有必要对线程进行一个统一的管理,所以就出现了线程池。通过线程池,我们可以重复利用一些线程资源,同时可以统一管理应用内的线程,防止内存泄露。
运行机制
Executor当我们创建一个任务之后,放进线程池之后,线程池会做如下判断
- 1.判断核心线程池里的线程是否都在执行任务:否的话则将新任务放入线程池中进行执行,否则进行下一步。
- 2.判断缓存队列是否未满:是的话,将新任务放入缓存队列,否则进行下一步
- 3.判断线程池的线程是否都处于工作状态:是的话就就执行线程抛弃策略,否则就执行当前任务
继承关系
ScheduledThreadPoolExecutorExecutor 接口定义了线程池最基本的方法,提交Runnable 任务
public interface Executor {
void execute(Runnable command);
}
ExecutorService 扩充了提交任务的类型,并且定义了线程池关闭任务的方法。
AbstractExecutorService 是抽象类,主要是对ExecutorService 的一些具体实现
ThreadPoolExecutor 是最核心的一个类,下面会具体分析其源码。
ScheduledThreadPoolExecutor则是在 在 ThreadPoolExecutor 的基础上增加了时间调度的功能
成员变量
//32-3=29,线程数量所占位数
private static final int COUNT_BITS = Integer.SIZE – 3;
//低29位表示最大线程数,2的29次幂-1
private static final int CAPACITY = (1 << COUNT_BITS) – 1;
//线程池自身的状态
//符号位101
private static final int RUNNING = -1 << COUNT_BITS;
//高3位000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高3位001
private static final int STOP = 1 << COUNT_BITS;
//高3位010
private static final int TIDYING = 2 << COUNT_BITS;
//高3位011
private static final int TERMINATED = 3 << COUNT_BITS;
//缓存队列,等待中的线程任务队列
private final BlockingQueue<Runnable> workQueue;
//线程池中工作的线程集合
private final HashSet<Worker> workers = new HashSet<>();
//最大线程数
private int largestPoolSize;
//完成任务的线程数量
private long completedTaskCount;
//创建线程池的工厂类
private volatile ThreadFactory threadFactory;
//线程池丢弃策略
private volatile RejectedExecutionHandler handler;
//在等待执行任务的线程的最大等待时间
private volatile long keepAliveTime;
//核心线程数
private volatile int corePoolSize;
//线程池最大可容纳的线程数
private volatile int maximumPoolSize;
//默认的线程丢弃策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
//int 型变量,低3位表示线程池状态,剩余的位数表示最大线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
构造方法
constructors其实前面的三个构造方法最终都调用了最后一个构造方法,所以就来看看最后一个构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
参数比较多,下面来解释一下
-
corePoolSize:
核心池的大小
,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; -
maximumPoolSize:
线程池最大线程数
,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程; -
keepAliveTime:
表示线程没有任务执行时最多保持多久时间会终止
。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过- corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0; - unit:keepAliveTime的时间单位,有如下几种取值
时间单位 | 解释 |
---|---|
TimeUnit.DAYS | 天 |
TimeUnit.HOURS | 小时 |
TimeUnit.MINUTES | 分钟 |
TimeUnit.SECONDS | 秒 |
TimeUnit.MILLISECONDS | 毫秒 |
TimeUnit.MILLISECONDS | 微妙 |
TimeUnit.NANOSECONDS | 纳秒 |
- workQueue : 一个阻塞队列,用来存储等待执行的任务,参考下图
阻塞队列,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。
-
ArrayBlockingQueue(有界队列): FIFO 队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小
-
LinkedBlockingQueue(无界队列):FIFO 队列,大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。
-
PriorityBlockingQueue:优先级队列, 类似于LinkedBlockingQueue,但队列中元素非 FIFO, 依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序
-
SynchronousQueue(直接提交策略): 交替队列,队列中操作时必须是先放进去,接着取出来,交替着去处理元素的添加和移除
threadFactory::创建线程池的工厂
RejectedExecutionHandler: 线程丢弃策略,常见的有如下几种
丢弃策略 | 解释 |
---|---|
DiscardPolicy | 丢弃任务,但是不抛出异常 |
CallerRunsPolicy | 由调用线程处理该任务 |
AbortPolicy | 丢弃任务并抛出RejectedExecutionException |
DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) |
execute方法
提交Runnable任务
execute方法
public void execute(Runnable command) {
//为空的话,抛出空指针异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断加入当前任务后,线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//如果小于核心线程数,则将立即执行当前任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//核心线程数满了之后,先判断线程池的状态,然后判断缓存队列是否还能进行缓存
if (isRunning(c) && workQueue.offer(command)) {
//如果两者都满足条件,则将线程加入缓存队列
int recheck = ctl.get();
//如果当前任务没有在运行已经被移除,执行线程丢弃策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 正在运行的线程数如果是0,则直接运行当前线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//加入当前任务失败,则执行丢弃策略
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
有两个参数,一个是firstTask,表示加入的Runnable任务,一个是core,表示是否添加到核心线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry://定义了循环的名称,便于后面直接中断循环
for (;;) {
int c = ctl.get();//获取状态与数量的标志位
int rs = runStateOf(c);//判断线程状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//线程池处于关闭状态,firstTask为null,或者缓存队列为空,返回false
return false;
//死循环
for (;;) {
int wc = workerCountOf(c);//获取线程池数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//比对核心线程数与最大线程数
return false;
if (compareAndIncrementWorkerCount(c))
//添加线程成功,中断循环
break retry;
c = ctl.get(); //重新获取线程状态与数量的标志位
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个心的worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//给当前线程上锁
mainLock.lock();
try {
//获取当前线程池状态跟线程数的标记为
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将worker添加到缓存队列中去
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//添加成功,改变标记位
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//添加成功之后,开启线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//添加失败,调用addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
submit
FutureTask
继承关系
先看一下FutureTask的继承关系
FutureTask
Runnable
很常见的接口,定义了run方法
public interface Runnable {
public abstract void run();
}
Future
带有返回值的泛型接口
public interface Future<V> {
boolean isCancelled();//任务是否取消
boolean isDone();//任务是否完成
//同步方法,任务执行的返回值
V get() throws InterruptedException, ExecutionException;
//timeout后获取等待结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture
继承自Runnable,Future的泛型返回接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
Callable
//带有返回值的Runnable额接口
public interface Callable<V> {
V call() throws Exception;
}
构造方法
Callable构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
Runnable+Result构造方法
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
可以看到不管是Callable还是Runnable构造方法,最后都是使用Callable来进行构造的,之所以这么做,是因为FutureTask需要返回值
提交任务
提交Runnable任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
提交callable任务+返回值
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
提交callable任务
public <T> Future<T> submit(Callable<T> task) {
//首先判断是否为空
if (task == null) throw new NullPointerException();
//将Callable转换成Future
RunnableFuture<T> ftask = newTaskFor(task);
//执行execute方法
execute(ftask);//最后依然会调用execute Runnable方法
return ftask;
}
不管是提交什么样task,最后都会被包装成Runnable方法来执行,还是会调用Executor的execute方法。
tryTerminate
这个方法是当线程池关闭的时候会调用
final void tryTerminate() {
//开启死循环
for (;;) {
//获取线程池状态跟数量的标志位
int c = ctl.get();
//判断三个条件
//1.线程是否在运行
//2.线程池状态小于TIDYING,TERMINATED
//3.线程池已经关闭并且队列为空
满足上面的任意一个条件就会直接返回,很好理解
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
// 如果线程数不为0,才有资格去终止
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//CAS设置状态成功,调用terminated,默认空实现
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
advanceRunState();
更改当前线程池的状态
private void advanceRunState(int targetState) {
for (;;) {
//获取当前线程的状态及数量的标志位
int c = ctl.get();
//更改线程状态
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
shutdown
public void shutdown() {
//锁住线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
advanceRunState(SHUTDOWN);
// 中断等待中的线程
interruptIdleWorkers();
onShutdown();
} finally {
//释放锁
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//传入Stop状态,其余跟shutdown保持一致
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow() 和 shutdown()的大体流程相似,差别是:
- 1、advanceRunState传入的是Stop
- 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
- 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
Executors
Executors是Java提供的一个线程池的帮助类,可以帮助我们快速的处理线程池。
构造线程池
由于Java的线程池的构造方法比较复杂,所以Java又提供了Executors这个辅助类,帮助我们更快速地创建ThreadPoolExecutor,可以帮助我们创建4种类型的ThreadPool
Executors_create
-
单线程异步队列:Executors.newSingleThreadExecutor(),创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行>所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池>保证所有任务的执行顺序按照任务的提交顺序执行。
-
周期性调度:Executors.newFixedThreadPool(),创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
-
可缓存的线程:Executors.newCachedThreadPool(int size),创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
-
多线程周期性调度:Executors.newScheduledThreadPool(1),创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
Callable转换
executors_change参考资料
https://www.cnblogs.com/trust-freedom/p/6693601.html
https://zhuanlan.zhihu.com/p/27232156