并发专题-一张图理解线程池
前言
多线程Runnable任务的执行器Executor有很多,今天来看一下最常用的Executor:ThreadPoolExecutor
,也就是线程池
ThreadPoolExecutor
ThreadPoolExecutor想必都用过,有的是直接new ThreadPoolExecutor
来创建线程池,有的是通过Executors.newFixedThreadPool
等工厂方法去创建线程池
阿里BB的规范中不允许使用Executors创建线程池,主要是怕忽略线程池创建的参数而造成资源耗尽的风险,那么我们先看看ThreadPoolExecutor可配参数有哪些
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
-
corePoolSize
核心线程数 -
maximumPoolSize
最大线程数 -
keepAliveTime&unit
保持存活时间 -
workQueue
任务队列 -
threadFactory
线程工厂 -
handler
拒绝策略
图文理解

我们把线程池比作公司,线程比作员工,那么corePoolSize
核心线程数相当于一个公司的正式编制员工数,他们接受任务并执行,没任务时候等待新任务(公司聘请正式员工的方式是来一个任务聘用一个新员工)
当公司正式编制员工聘用满后,新来的任务放到workQueue
代办任务队列中,等待被员工处理
当任务变多了,正式员工都在忙,任务队列放不下新任务了,就要考虑聘请临时工帮忙,maximumPoolSize
代表公司可容纳的最多员工数,相当于办公位数,那么最大能聘请的临时工的数量就是maximumPoolSize-corePoolSize
,请多了也没有工位可干活,临时工在公司不忙时候就离开公司(线程销毁),给他一个缓冲时间避免刚走又忙起来,这个保持时间就是keepAliveTime
,时间单位是unit
当任务队列都满了,所有员工(包括临时工)都在忙,而员工总数已经到达maximumPoolSize
,再不能新增员工了,这时如果还有任务那只能拒绝了,拒绝工作的处理者就是handler
,可以自行定义
threadFactory
相当于员工的来源,比如来自某个大学,那么这个大学对于公司来说就是员工的生产工厂,可以通过配置自定义threadFactory
来控制员工的实际生成工作,比如可以统一名称前缀、统一编号规则等
工作机制
回到程序本身,首先线程池初始化是没有任何线程的,执行逻辑如下
step1
当任务到来时,真实线程数少于corePoolSize
,就会创建线程,并执行这个任务,执行结束后线程并不关闭,因为是corePool
,继续获取workQueue
里的任务,如果没有就阻塞等待
step2
如果上一步条件不满足(corePool
已全部创建完成),会尝试把任务加入队列workQueue
中,那么空闲下来的线程就可以从队列获取并执行任务
step3
如果上一步失败-队列已满,线程数小于maximumPoolSize
,则会尝试新增一个临时线程去执行任务,这些临时线程工作完成后会存活一段时间,直到空闲了keepAliveTime
设置的时间后就销毁(实际上可能销毁原线程而保留新增线程,属于一种淘汰机制)
代码
以上的线程创建并不是直接new Thread,而是通过ThreadFactory
创建
public interface ThreadFactory {
// 生成新线程
Thread newThread(Runnable r);
}
我们可以自定义实现ThreadFactory让我们线程池的拥有一样的名称前缀或编号规则,方便jvm调试时识别,比如
private static ThreadFactory threadFactory = new ThreadFactory() {
private AtomicInteger no = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "my-thread-"+(no.incrementAndGet()));
}
};
// 生成的线程名字就是my-thread-1,my-thread-2,my-thread-3...
以上所说三步验证,可以看一下ThreadPoolExecutor.execute
源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// step1:小于corePoolSize,新增线程:addWorker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// step2:尝试添加到队列:workQueue.offer
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// step3:尝试新增线程:addWorker
else if (!addWorker(command, false))
reject(command);
}
其中addWorker
代表是新增线程的方法,其定义如下
private void addWorker(Runnable firstTask, boolean core)
第二参数boolean型的core,代表是否核心线程,可以看到execute的step1
传入的是true,此时添加的是核心线程,step3
传的是false,此时添加的是临时线程
总结
ThreadPoolExecutor的核心代码看起来还是挺费劲,主要考虑的线程安全的事太多,后续计划模仿手写一个线程池来更深一层看一下具体实现~