Java 线程池 ThreadPoolExecutor源码解析
2019-07-09 本文已影响0人
皮卡丘_5833
java线程池的使用在很多客户端开发过程中都是必不可少的,主要是为了减少在线程创建和销毁时产生的系统资源消耗,提高客户端的性能.之前对线程池并没有深入的了解,最近在项目中遇到了一个问题,使用threadpoolexecutor.submit(runable)之后,runable的run方法并没有回调,所以去看一遍源码,为了加深巩固自己的理解,整理成博客,另外,我将Android系统的ThreadPoolExecuter类的源码copy下来作为我们demo中的一个类,方便我们调试,项目demo附带源码地址如下:demo和源码地址。
使用方法很简短,首先初始化线程池:
//各个参数的含义下面会结合源码介绍
ThreadPoolExecutor sExecutorService = new ThreadPoolExecutor(
5,
MAX_POOL_SIZE,
KEEP_ALIVE,
TimeUnit.SECONDS,limitedQueue,
new DefaultThreadPoolFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
传入需要在线程执行的任务:
sExecutorService.submit(new Runnable() {
@Override
public void run() {
try {
Log.d(TAG, "run: value:"+value);
value++;
} catch (Exception e) {
e.printStackTrace();
}
}
});
以上就是线程池的大致用法,下面我们结合源码来分析一下线程池的工作原理:
1. 线程池状态和数量,一个int四个字节,32位,前三位便是线程池状态,后三位表示线程池里面线程的数量:
//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数量统计位数29 Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
//容量 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//运行中 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//关闭 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//终止 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
//获取运行状态(获取前3位)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程个数(获取后29位)
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- RUNNING:接受新任务并且处理阻塞队列里的任务
- SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
- STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
- TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
- TERMINATED:终止状态。terminated方法调用完成以后的状态
2. 线程池的状态转换:
RUNNING -> SHUTDOWN
显式调用shutdown()方法, 或者隐式调用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
显式调用shutdownNow()方法
SHUTDOWN -> TIDYING
当线程池和任务队列都为空的时候
STOP -> TIDYING
当线程池为空的时候
TIDYING -> TERMINATED
当 terminated() hook 方法执行完成时候
注:只有在RUNNING和SHUTDOWN状态下线程池才可以接受任务。
3. 线程池的构造方法和含义(有四个构造方法,最终都是调用下面的方法)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心线程数,这些线程一直存活在线程池中,即使空闲也不会被kill掉,在没有达到此数量时,来了新任务即使线程池有空闲的线程,依然会创建新的线程来处理。
- maximumPoolSize:线程池最大线程数,注意此设置参数必须要在大于核心线程且工作队列被塞满之后才有效。如果我们设置的是无界队列(int最大值,一般不会达到这个指,如果你写个for循环一直阻塞我也没话说😄,瘦的手机应该最先受不了的),线程池的线程数最多也就是核心线程数corePoolSize
- keepAliveTime:非核心线程在空闲情况下的保活时间
- TimeUnit:非核心线程在空闲情况下的保活时间单位,时、分、秒。
- workQueue:工作队列,在线程池中的线程数量等于核心线程数量corePoolSize且都不空闲时,新来一个任务,则会将新来的任务放进队列等待执行。
- threadFactory:创建线程的工厂,一般使用默认的即可。
- handler:任务拒绝处理的逻辑,工作队列已满,线程池中的线程数已经达到最大值maximumPoolSize时,新来的任务将会由handler对应的拒绝策略来处理,可以设置为抛弃最老的task将当前最新的task放进队列、直接丢弃当前的task不处理,当然你也可以自定义拒绝逻辑,比如将被拒绝的task再次尝试放入队列,只需要自己实现RejectedExecutionHandler接口即可。
4. submit提交任务
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
大致逻辑如下:
- submit任务task
- 如果task为空,跑出空指针异常
- 如果不为空,将task封装成RunnableFuture
- 执行execute,传入封装的将task封装成RunnableFuture
- 返回封装的RunnableFuture对象
5. 我们来看一下execute方法的源码
public void execute(Runnable command) {
//传进来的线程为null,则抛出空指针异常
if (command == null)
throw new NullPointerException();
//获取当前线程池的状态+线程个数变量
int c = ctl.get();
/**
* 3个步骤
*/
//1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,且传进来的Runnable当做第一个任务执行。
//如果调用addWorker方法返回false,则直接返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.如果线程池处于RUNNING状态,则添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//二次检查
int recheck = ctl.get();
//如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//否者如果当前线程池线程空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.新增线程,新增失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
其实从上面代码注释中可以看出就三个判断,
- 核心线程数是否已满
- 队列是否已满
-
线程池是否已满
从网上盗了一个图,便于理解:
first.png
大致流程如下:
- 调用execute方法,传入Runable对象
- 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
- 获取当前线程池的状态和线程个数变量
- 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
- 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
- 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
- 重新获取当前线程池的状态和线程个数变量
- 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
- 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
- 调用!addWorker(command, false),为true走流程11,false则结束
-
调用拒绝策略reject(command),结束
如下是流程图:
second.png
6. addwork方法源码解析
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)
// 条件都成立则返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//循环
for (;;) {
int wc = workerCountOf(c);
//如果当前的线程数量超过最大容量或者大于(根据传入的core决定是核心线程数还是最大线程数)核心线程数 || 最大线程数,则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加c,成功则跳出retry
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//CAS成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新检查线程池状态
//避免ThreadFactory退出故障或者在锁获取前线程池被关闭
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 先检查线程是否是可启动的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判断线程有没有启动成功,没有则调用addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这里可以将addWorker分为两部分,第一部分增加线程池个数,第二部分是将任务添加到workder里面并执行。
第一部分主要是两个循环,外层循环主要是判断线程池状态
rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())
展开!运算后等价于:
s >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty())
也就是说下面几种情况下会返回false:
- 当前线程池状态为STOP,TIDYING,TERMINATED
- 当前线程池状态为SHUTDOWN并且已经有了第一个任务
- 当前线程池状态为SHUTDOWN并且任务队列为空
- 内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。
到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。
继续盗图如下😄:
third.png
7. Worker对象
Worker是定义在ThreadPoolExecutor中的finnal类,其中继承了AbstractQueuedSynchronizer类和实现Runnable接口,其中的run方法如下
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
8. 线程启动时调用了runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//循环获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态
// 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态
// 重新检查当前线程池的状态是否大于等于STOP状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//提供给继承类使用做一些统计之类的事情,在线程运行前调用
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//提供给继承类使用做一些统计之类的事情,在线程运行之后调用
afterExecute(task, thrown);
}
} finally {
task = null;
//统计当前worker完成了多少个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作
processWorkerExit(w, completedAbruptly);
}
}
9. getTask源码解析(阻塞式获取任务)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程线程池状态和队列是否为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//线程数量
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(当前线程数是否大于最大线程数或者)
//且(线程数大于1或者任务队列为空)
//这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
10. shutdown 关闭线程池(调用此方法,线程池不会接收新的任务,但是工作队列里面的任务还会继续执行完)
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
11. shutdownNow 关闭线程池(立刻停止任务的执行,并将线程池中的任务返回)
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdown和shutdownNow区别
shutdown和shutdownNow这两个方法的作用都是关闭线程池,流程大致相同,只有几个步骤不同,如下
- 加锁
- 检查关闭权限
- CAS改变线程池状态
- 设置中断标志(线程池不在接收任务,队列任务会完成)/中断当前执行的线程
- 调用onShutdown方法(给子类提供的方法)/获取队列中的任务
- 解锁
- 尝试将线程池状态变成终止状态TERMINATED
- 结束/返回队列中的任务