Java并发编程:线程池ThreadPoolExecutor
2020-12-01 本文已影响0人
singleZhang2010
为什么使用线程池?
- 降低资源消耗
- 提高响应速度
- 提高线程的可管理性
工作原理
ThreadPoolExecutor 执行 execute()方法的流程图:
execute
ThreadPoolExecutor执行示意图:
ThreadPoolExecutor
这里主要分以下四种情况:
- 如果当前运行的线程少于core Pool Size, 则创建新线程来执行任务(注意, 执行这一步骤需要获取全局锁)。
- 如果运行的线程等于或多于core Pool Size, 则将任务加入Blocking Queue。
- 如果无法将任务加入Blocking Queue(队列已满) , 则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
- 如果创建新线程将使当前运行的线程超出maximum Pool Size, 任务将被拒绝, 并调用Rejected Execution Handler.rejected Execution() 方法。
ThreadPoolExecutor中线程执行任务示意图:
image.png
线程池中的线程执行任务分两种情况,如下。
- 在execute() 方法中创建一个线程时, 会让这个线程执行当前任务。
- 这个线程执行完上图中1的任务后, 会反复从Blocking Queue获取任务来执
行。
Executors框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,四个构造函数源码如下:
//一
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
//二
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
//三
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
//四
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (corePoolSize >= 0 && maximumPoolSize > 0 && maximumPoolSize >= corePoolSize && keepAliveTime >= 0L) {
if (workQueue != null && threadFactory != null && handler != null) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}
参数说明:
- corePoolSize:核心线程数,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的
- maximumPoolSize:最大线程数,可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:
corePoolSize <运行的线程数< maximumPoolSize:仅当队列满时才创建新线程
corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池 - keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止
- unit:keepAliveTime参数的时间单位
- workQueue:保存任务的阻塞队列,与线程池的大小有关:
当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程
当队列满时,在有新任务时就创建新线程 - threadFactory:使用ThreadFactory创建新线程,默认使用defaultThreadFactory创建线程
- handle:定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,丢弃任务,任务被拒绝时将抛出RejectExecutorException
线程池的拒绝策略
拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
- CallerRunsPolicy
当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大 - AbortPolicy
丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。 - DiscardPolicy
直接丢弃,其他啥都没有 - DiscardOldestPolicy
当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
线程池默认的AbortPlolicy不够优雅,直接在程序中抛出RejectedExecutionException异常(因为是运行时异常,不强制catch),推荐以下几种策略:
- 在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略
- 使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低
- 自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可
- 如果任务不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒绝策略将任务丢弃也是可以的
Executors中常见四种线程池创建方法
- newCachedThreadPool
- newFixedThreadPool
- newScheduledThreadPool
- newSingleThreadExecutor
※不过一般不提倡使用Executors创建线程池,而是直接使用ThreadPoolExecutor 的构造函数创建线程池,避免OOM异常。
最后提供一个自定义线程池单例类:
public final class GlobalThreadPool {
/**
* 核心线程数
*/
private static final int CORE_POOL_SIZE = 30;
/**
* 最大线程数
*/
private static final int MAXIMUM_POOL_SIZE = 60;
/**
* 空闲线程的存活时间
*/
private static final int KEEP_ALIVE_TIME = 1000;
/**
* 任务队列的大小
*/
private static final int BLOCKING_QUEUE_SIZE = 1000;
private GlobalThreadPool() {
}
/**
* 饿汉模式初始化
*/
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE),
new ThreadFactoryBuilder().setNameFormat("global-thread-pool-%d").build(),
//任务拒绝策略为由调用该线程处执行该任务
new ThreadPoolExecutor.CallerRunsPolicy());
public static ThreadPoolExecutor getExecutor() {
return EXECUTOR;
}
}