Java线程池_ThreadPoolExecutor原理分析
线程池中有一定数量的工作线程,工作线程会循环从任务队列中获取任务,并执行这个任务。那么怎么去停止这些工作线程呢?
这里就涉及到线程池两个重要概念:工作线程数量和线程池状态。
一.线程池状态和工作线程数量
这本来是两个不同的概念,但是在ThreadPoolExecutor中我们使用一个变量ctl来存储这两个值,这样我们只需要维护这一个变量的并发问题,提高运行效率。
/**
* 记录线程池中Worker工作线程数量和线程池的状态
* int类型是32位,它的高3位,表示线程池的状态,低29位表示Worker的数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 29位,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示线程池中创建Worker工作线程数量的最大值。即 0b0001.....1(29位1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
怎么使用一个变量ctl存储两个值呢?
就是利用int变量的高3位来储存线程池状态,用int变量的低29位来储存工作线程数量。
这样就有两个需要注意的地方:
- 工作线程数量最大值不能超过int类型29位的值CAPACITY 即0b0001.....1(29位1)
- 因为线程池状态都是高3位储存的,所以工作线程数量不会影响状态值大小关系。
1.1 线程池状态
// 高3位值是111
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位值是000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位值是001
private static final int STOP = 1 << COUNT_BITS;
// 高3位值是010
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位值是011
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态分析:
- RUNNING状态:线程池刚创建时的状态。向任务队列中添加任务,并执行任务队列中的任务。因为高3位值是111,即处于RUNNING状态下的ctl值都是负数。
- SHUTDOWN状态: 调用shutdown方法,会将线程池设置成这个状态。不能向任务队列中添加任务,但是可以执行任务队列中已添加的任务。并且处于SHUTDOWN状态下正在运行任务的工作线程不能中断的,就是保证任务能够执行完成。
- STOP状态: 调用shutdownNow方法,会将线程池设置成这个状态。不能向任务队列中添加任务,也不能再执行任务队列中已添加的任务。
- TIDYING状态: 调用tryTerminate方法,可能会将线程池设置成这个状态。这个只是中断过度状态,表示线程池即将变成TERMINATED状态。
- TERMINATED状态: 调用tryTerminate方法,可能会将线程池设置成这个状态。表示线程池已经完全终止,即任务队列为空,工作线程数量也是0.
线程池为什么要定义这么多状态呢?按道理说线程池只应该有运行和终止这两种状态啊。
主要是因为终止线程池时,要考虑正在执行的任务和已经添加到任务队列中待执行的任务该如何处理,否则的话,这些任务可能就会被丢失。
线程池提供了两个方式处理:
- shutdown方法: 它会将线程池状态变成SHUTDOWN 状态。禁止向添加新的任务,但是会让任务队列中的任务继续执行,最后释放所有的工作线程,让线程池状态变成TERMINATED状态。
- shutdownNow方法: 它会将线程池状态变成STOP 状态。禁止向添加新的任务,也不会执行任务队列中的任务,但是会返回这个任务集合,释放所有的工作线程,让线程池状态变成TERMINATED状态。
1.2 操作ctl的方法
1.2.1 获取线程池的状态
/**
* 获取线程池的状态。因为线程池的状态是使用高3位储存,所以屏蔽低29位就行了。
* 所以就c与~CAPACITY(0b1110..0)进行&操作,屏蔽低29位的值了。
* 注意:这里是屏蔽低29位的值,而不是右移29位。
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
1.2.2 获取工作线程数量
/**
* 获取线程池中Worker工作线程的数量,
* 因为只使用低29位保存Worker的数量,只要屏蔽高3位的值就行了
* 所以就c与CAPACITY(0b0001...1)进行&操作,屏蔽高3位的值了。
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
1.2.3 合并ctl的值
/**
* 得到ctl的值。
* 接受两个参数rs和wc。rs表示线程池的状态,wc表示Worker工作线程的数量。
* 对于rs来说我们只需要高3位的值,对于wc来说我们需要低29位的值。
* 所以我们将rs | wc就可以得到ctl的值了。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.2.4 其他方法
// 因为RUNNING状态高三位是111,所以状态值rs与工作线程数量ws相与的结果值c一定是个负数,
// 而其他状态值都是大于等于0的数,所以c是负数,那么表示当前线程处于运行状态。
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS函数将ctl值自增
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS函数将ctl值自减
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS函数加循环方法这种乐观锁的方式,解决并发问题。
* 保证使ctl值减一
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
二. 重要成员变量
// 记录线程池中Worker工作线程数量和线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任务线程的阻塞队列,因为是阻塞队列,所以它是并发安全的
private final BlockingQueue<Runnable> workQueue;
// 独占锁,用来保证操作成员变量的并发安全问题
private final ReentrantLock mainLock = new ReentrantLock();
// 等待线程池完全终止的条件Condition,
private final Condition termination = mainLock.newCondition();
//----------------- 需要mainLock来保证并发安全-------------------------//
// 线程池中工作线程集合。Worker中持有线程thread变量
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程池中曾拥有过的最大工作线程个数
private int largestPoolSize;
// 线程池完成过任务的总个数
private long completedTaskCount;
//----------------- 需要mainLock来保证并发安全-------------------------//
// 创建线程的工厂类
private volatile ThreadFactory threadFactory;
// 当任务被拒绝时,用来处理这个被拒绝的任务
private volatile RejectedExecutionHandler handler;
// 工作线程空闲的超时时间keepAliveTime
private volatile long keepAliveTime;
// 是否允许核心池线程超时释放
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心池线程个数
private volatile int corePoolSize;
// 线程池最大的线程个数
private volatile int maximumPoolSize;
成员变量的含义已经标注了:
- mainLock:使用mainLock来保证会发生变化成员变量的并发安全问题。会发生的成员变量有5个:ctl、workQueue、workers、largestPoolSize和completedTaskCount。但是其中ctl和workQueue的类型本身就是多线程安全的,所以不用mainLock锁保护。
- termination:等待线程池完全终止的条件,如果线程池没有完全终止,调用它的awaitNanos方法,让线程等待。当线程池完全终止后,调用它的signalAll方法,唤醒所有等待termination条件的线程。
- workers:记录所有的工作线程Worker
- workQueue:记录所有待执行的任务。使用阻塞队列BlockingQueue,可以在队列为空时,线程等待,队列有值时,唤醒等待的线程。
- largestPoolSize:线程池中曾拥有过的最大工作线程个数
- completedTaskCount:线程池完成过任务的总个数
- threadFactory:创建线程的工厂类
- handler:当任务被拒绝时,用来处理这个被拒绝的任务
- keepAliveTime:工作线程允许空闲的超时时间,一般都是针对超过核心池数量的工作线程。
- allowCoreThreadTimeOut: 是否允许核心池的工作线程超时释放。
- corePoolSize:线程池核心池线程个数。
- maximumPoolSize: 线程池最大的线程个数。
这里注意一下两个概念核心池个数和最大线程池个数:
- 核心池个数就是线程池能够维持的常用工作线程个数,当工作线程没有执行任务空闲时,它不会被销毁,而是在等待。但是如果设置allowCoreThreadTimeOut为true,那么核心池工作线程也是会被销毁。
- 最大线程池个数就是线程池允许开启的最大工作线程个数。最大线程池的意义就是当核心池的工作线程不够用,且任务队列也已经满了,不能添加新的任务了,那么就要开启新的工作线程来执行任务。
三. 执行任务execute方法
在线程池中如何执行一个任务command,要分三种情况:
- 线程池中工作线程的数量没有达到核心池个数,那么线程池就应该开启新的工作线程来执行任务。
- 线程池中工作线程的数量达到核心池个数,那么就应该将任务添加到任务队列中,等待着工作线程去任务队列中获取任务并执行。
- 如果任务添加到任务队列失败,那么就要开启新的工作线程来执行任务。
public void execute(Runnable command) {
// 如果command为null,抛出异常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 分为三个步骤:
* 1. 如果运行的工作线程数量少于核心池数量corePoolSize,
* 那么就调用addWorker方法开启一个新的工作线程,运行任务command。
* 2. 如果开启新的工作线程失败,就将任务添加到任务队列中。
* 3. 添加到任务队列失败,
* 那么仍然addWorker方法在最大池中开启一个新的工作线程,运行任务command。
*/
int c = ctl.get();
// 运行的工作线程数量少于核心池数量corePoolSize
if (workerCountOf(c) < corePoolSize) {
/**
* 开启一个新的工作线程,运行任务command。
* 返回true,表示开启工作线程成功,直接return。
* 返回false,表示没有开启新线程。那么任务command就没有运行,所以要执行下面代码。
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池处于运行状态,
// 且任务添加到任务阻塞队列workQueue中成功,即workQueue队列有剩余空间。
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查线程池状态和工作线程数量
int recheck = ctl.get();
/**
* 如果线程池不在运行状态,那么就调用remove方法移除workQueue队列这个任务command,
* 如果移除成功,那么调用reject(command)方法,进行拒绝任务的处理。
* 如果移除失败,那么这个任务还是会被执行,那么就不用调用reject(command)方法
*/
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作线程数量为0,但是workQueue队列中我们添加过任务,
// 那么必须调用addWorker方法,开启一个新的工作线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 调用addWorker方法,开启一个新的工作线程,运行任务command。
// 如果还是失败,那么这个任务command就不会不可能执行了,
// 那么调用reject(command)方法拒绝这个任务
else if (!addWorker(command, false))
reject(command);
}
方法流程上面已经有标注,注意有以下几点:
- addWorker(Runnable firstTask, boolean core):表示开启一个新的工作线程执行任务firstTask。core是用来判断核心池还是最大池。返回false,表示开启新线程失败,即任务firstTask没有机会执行。
- isRunning(c)线程池处于RUNNING状态,只有处于RUNNING状态下,才能将任务添加到任务队列。
- reject(command) 当任务command不能在线程池中执行时,就会调用这个方法,告诉调用值,线程池拒绝执行这个任务。
四. 添加工作线程addWorker方法
就是利用任务task创建一个新的工作线程Work,然后将它添加到工作线程集合workers中。但是需要注意多线程并发问题。
private boolean addWorker(Runnable firstTask, boolean core) {
// 利用死循环和CAS函数,实现乐观锁,来实现多线程改变ctl值的并发问题
// 因为ctl值代表两个东西,工作线程数量和线程池状态。
// 这里就用了两个for循环,一个是线程池状态的for循环,一个是工作线程数量的for循环
retry:
for (;;) {
int c = ctl.get();
// 获取线程池运行状态rs,
int rs = runStateOf(c);
// 首先判断线程池状态和任务队列状态,
// 来判断能否创建新的工作线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 线程池中工作线程数量wc
int wc = workerCountOf(c);
// 当线程池工作线程数量wc大于线程上限CAPACITY,
// 或者用户规定核心池数量corePoolSize或用户规定最大线程池数量maximumPoolSize
// 表示不能创建工作线程了,所以返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS函数,使工作线程数量wc加一
if (compareAndIncrementWorkerCount(c))
// 跳出retry循环
break retry;
// 来到这里表示CAS函数失败,那么就要循环重新判断
// 但是c还代表线程状态,如果线程状态改变,那么就必须跳转到retry循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作线程是否开始,即调用了线程的start方法
boolean workerStarted = false;
// 工作线程是否添加到工作线程队列workers中
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个Worker对象
w = new Worker(firstTask);
// 得到Worker所拥有的线程thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 并发锁
mainLock.lock();
try {
// 获取线程池运行状态rs
int rs = runStateOf(ctl.get());
// 当线程池是运行状态,或者是SHUTDOWN状态但firstTask为null,
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程t已经被开启,就抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将w添加到工作线程集合workers中
workers.add(w);
// 获取工作线程集合workers的个数
int s = workers.size();
// 记录线程池历史最大的工作线程个数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果已经添加到工作线程队列中,那么开启线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果开启工作线程失败,那么这个任务也就没有执行
// 因此移除这个任务w(如果队列中有),减少工作线程数量,因为这个数量在之前已经增加了
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
添加一个新的工作线程,就涉及到两个成员变量的改变,一个是工作线程数量ctl,一个是工作线程集合workers。而ctl的类型是AtomicInteger,所以它可以使用乐观锁解决并发问题,workers就只能使用mainLock互斥锁来保证并发安全问题。
4.1 更改工作线程数量ctl
因为ctl储存了两个值,工作线程数量和线程池状态。所以使用了两个for循环来监控多线程对这两个值的更改。
用线程池状态来判断是否允许添加新的工作线程:
// 是对addWorker中线程状态if判断的拆分
// 当线程池不是处于运行状态
if (rs >= SHUTDOWN) {
/**
* 线程池状态不是SHUTDOWN,或者firstTask不为null,或者任务队列为空,
* 都直接返回false,表示开启新工作线程失败。
* 只有当线程池状态是SHUTDOWN,firstTask为null,任务队列不为空时,
* 需要创建新的工作线程。
* 从execute(Runnable command)方法中分析,firstTask参数为空只有一种情况,
* 此时线程池中工作线程数量是0,而任务队列不为空,
* 那么就要开启一个新工作线程去执行任务队列中的任务,否则这些任务会被丢失。
*/
if (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) {
return false;
}
}
由此可以得出,只有两种情形允许添加新的工作线程:
- 线程池处于RUNNING状态
- 线程池虽然处于SHUTDOWN状态,但是线程池工作线程个数是0(即这里的firstTask != null),且任务队列workQueue不为空,那么就要开启一个新工作线程去执行任务队列中的任务。
然后使用for循环和CAS函数方式,来给工作线程数量加一。注意此时工作线程还没有创建,并添加到线程集合workers中,所以如果线程添加失败,那么还要将工作线程数量减一。
4.2 添加工作线程集合workers
创建一个工作线程Worker,将它添加到线程集合workers中,然后开启这个工作线程,使用mainLock独占锁保证成员变量workers的并发安全问题。
五. 内部类Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 该Worker所拥有的工作线程 */
final Thread thread;
/** Worker拥有的第一个任务,初始化的时候赋值 */
Runnable firstTask;
/** 该工作线程Worker完成任务的数量 */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 将state设置为-1,禁止发起中断请求,
// 直到调用过runWorker方法,即线程已经运行时。
setState(-1);
// 第一个任务
this.firstTask = firstTask;
// 创建一个thread线程对象,它的run方法就是本Worker的run方法
// 这个thread就是Worker真正执行任务的工作线程
this.thread = getThreadFactory().newThread(this);
}
/** 复写的是Runnable中的run方法,所以当工作线程开启运行后,会调用这个方法。 */
public void run() {
runWorker(this);
}
// 当前独占锁是否空闲
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取独占锁
protected boolean tryAcquire(int unused) {
// 如果通过CAS函数,可以将state值从0改变成1,那么表示获取独占锁成功。
// 否则独占锁被别的线程获取了。
if (compareAndSetState(0, 1)) {
// 设置拥有独占锁的线程是当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放独占锁
protected boolean tryRelease(int unused) {
// 设置拥有独占锁的线程为null
setExclusiveOwnerThread(null);
// 设置获取独占锁的次数是0,表示锁是空闲状态
setState(0);
return true;
}
// 获取独占锁,如果锁被别的获取,就一直等待。
public void lock() { acquire(1); }
// 尝试获取独占锁,如果锁被别的获取,就直接返回false,表示获取失败。
public boolean tryLock() { return tryAcquire(1); }
// 释放独占锁
public void unlock() { release(1); }
// 当前独占锁是否空闲
public boolean isLocked() { return isHeldExclusively(); }
// 如果Worker的工作线程thread已经开启,那么发起中断请求。
void interruptIfStarted() {
Thread t;
// getState() >= 0表示thread已经开启
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker实现了Runnable接口,那么就可以通过Worker对象创建一个新线程thread,这个thread就是Worker的工作线程,而任务都在run方法中执行。
Worker还继承自AbstractQueuedSynchronizer类。我们知道可以通过AQS类实现独占锁和共享锁,而Worker中实现了tryAcquire和tryRelease方法,说明Worker对象也是个独占锁对象。我们可以考虑一下Worker这个独占锁的作用是什么?在后面会介绍到。
六. 工作线程运行任务runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 将w的state状态设置成0,这样就允许对w的thread线程进行中断请求了。
w.unlock();
// completedAbruptly表示线程突然终结
boolean completedAbruptly = true;
try {
// 通过getTask从任务队列中获取任务task执行,这个方法是个阻塞方法。
while (task != null || (task = getTask()) != null) {
// 获取w独占锁,保证当本工作线程运行任务时,
// 不能对该线程进行中断请求。
w.lock();
/**
* 如果线程池大于STOP状态,且Worker工作线程中断标志位是false,
* 那么就调用wt的interrupt方法发起中断请求。
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// Worker工作线程发起中断请求
wt.interrupt();
try {
// 钩子方法,提供给子类。在执行任务之前调用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 调用run方法,执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,提供给子类。在执行任务完成后调用
afterExecute(task, thrown);
}
} finally {
// 将task设置为null,进行下一次循环
task = null;
// 将work完成的任务数completedTasks加一
w.completedTasks++;
// 释放w独占锁
w.unlock();
}
}
// completedAbruptly = false表示线程正常完成终结
completedAbruptly = false;
} finally {
// 进行一个工作线程完结后的后续操作
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法是在每个工作线程的run方法中调用,通过getTask()方法从任务队列中获取任务task执行,这个方法可以阻塞当前工作线程,如果getTask()方法返回null,那么工作线程就会运行结束,释放线程。
虽然runWorker方法运行在每个工作线程中,但是对于一个Worker来说,只会有它的工作线程能够运行runWorker方法,而且改变的也是这个Worker的成员变量,且这些成员变量也只能在runWorker方法改变,那么它没有多线程并发问题啊,那么为什么在这里加锁呢?
这是因为Worker中有一个变量是可以被其他线程改变的,就是它的工作线程thread的中断请求,所以Worker独占锁的作用就是控制别的线程对它的工作线程thread中断请求的。
最后调用processWorkerExit方法,进行一个工作线程完结后的后续操作。
七. 获取任务getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 获取线程池状态rs
int rs = runStateOf(c);
// 如果有需要检查任务队列workQueue是否为空
// 即rs >= STOP或者rs == SHUTDOWN且workQueue为空,那么返回null,停止工作线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 将工作线程数量减一
decrementWorkerCount();
return null;
}
// 获取工作线程数量wc
int wc = workerCountOf(c);
/**
* 如果allowCoreThreadTimeOut为true或者wc > corePoolSize时,
* 就要减少工作线程数量了。
* 当工作线程在keepAliveTime时间内,没有获取到可执行的任务,
* 那么该工作线程就要被销毁。
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 工作线程数量减一,返回null,销毁工作线程。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从任务队列workQueue中获取了任务r,会阻塞当前线程。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果r不为null,返回这个任务r
if (r != null)
return r;
// r是null,表示获取任务超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从阻塞任务队列workQueue中获取任务返回,因为是阻塞任务队列,所以可以阻塞当前线程。如果返回null,那么会完结调用getTask方法的那个工作线程。那么getTask方法在什么情况下返回null呢?
- 线程池的状态大于等于STOP,或者线程状态是SHUTDOWN且当前任务队列为空,那么返回null,停止工作线程。
- 获取任务时间超时,那么也会返回null,停止工作线程。因为线程池一般只维护一定数量的工作线程,如果超过这个数量,那么超过数量的工作线程,在空闲一定时间后,应该被释放。
八. 终止线程池的方法
8.1 shutdown和shutdownNow方法
/**
* 终止线程池。不能在添加新任务了,但是已经添加到任务队列的任务还是会执行。
* 且对所有不是正在执行任务的工作线程都发起中断请求
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查是否拥有Shutdown的权限
checkShutdownAccess();
// 将线程池状态变成SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 对所有不是正在执行任务的工作线程都发起中断请求
interruptIdleWorkers();
// 钩子方法,提供给子类实现。表示线程池已经shutdown了
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试去终结线程池
tryTerminate();
}
/**
* 终止线程池。不能在添加新任务了,也不会执行已经添加到任务队列的任务,只是将这些任务返回。
* 且对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查是否拥有Shutdown的权限
checkShutdownAccess();
// 将线程池状态变成STOP状态
advanceRunState(STOP);
// 对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务
interruptWorkers();
// 返回阻塞队列workQueue中未执行任务的集合
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试去终结线程池
tryTerminate();
return tasks;
}
shutdown和shutdownNow区别:
- shutdown方法将线程池设置成SHUTDOWN状态,shutdownNow将线程池设置成STOP状态。
- shutdown方法调用之后不能在添加新任务了,但是已经添加到任务队列的任务还是会执行。shutdownNow方法调用之后不能在添加新任务了,也不会执行已经添加到任务队列的任务,只是将这些任务返回。
- shutdown方法会对所有不是正在执行任务的工作线程都发起中断请求,shutdownNow方法会对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务。
8.2 advanceRunState方法
private void advanceRunState(int targetState) {
// 采用乐观锁的方法,来并发更改线程池状态。
for (;;) {
int c = ctl.get();
// 如果runStateAtLeast方法返回true,表示当前线程池状态已经是目标状态targetState
// 采用CAS函数尝试更改线程池状态,如果失败就循环继续。
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
这个方法来改变线程池状态,使用乐观锁的方式保证并发安全。
8.3 中断空闲状态下的工作线程
/**
* 对所有不是正在执行任务的工作线程都发起中断请求。
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历工作线程Worker集合
for (Worker w : workers) {
Thread t = w.thread;
// 如果工作线程中断标志位是false,
// 且能够获取锁,即当前工作线程没有运行任务
if (!t.isInterrupted() && w.tryLock()) {
try {
// 发起中断请求。
// 因为获取了锁,所以在进入中断请求时,worker工作线程不会执行任务
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 释放锁
w.unlock();
}
}
// 是否只进行一个工作线程的中断请求。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
遍历工作线程Worker集合,如果工作线程出于空闲状态,且没有被中断,那么就发起中断请求。通过独占锁Worker知道,当前工作线程是否在执行任务。
8.4 对所有已开启的工作线程发起中断请求
/**
* 对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 如果w的工作线程thread已经开启,那么发起中断请求。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
遍历工作线程Worker集合,调用Worker的interruptIfStarted方法,如果工作线程已开启,那么就会发起中断。
8.5 尝试完结线程池的方法
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 如果线程池是RUNNING状态,
* 或者线程池是TIDYING状态(是因为已经有别的线程在终止线程池了)
* 或者线程池是SHUTDOWN状态且任务队列不为空,
* 线程池不能被terminate终止,直接return返回
*
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 线程池中工作线程数量不是0,线程池不能被terminate终止,所以要return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 钩子方法,提供给子类实现。表示线程池已经终止。
terminated();
} finally {
// 设置线程池状态是TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
线程池在什么情况下算是完全停止了呢?有三个条件:
- 线程池不是RUNNING状态。
- 线程池中工作线程数量是0。
- 线程池中任务队列为空。
所以在看看tryTerminate()中,前面两个if判断条件,就可以理解了。
8.6 等待线程池完结的方法
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果是TERMINATED已终止状态,那么就返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 如果已经超时就返回false
if (nanos <= 0)
return false;
// 让当前线程等待。并设置超时时间nanos
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
如果线程池不是TERMINATED状态,就让当前线程在termination条件上等待,直到线程池变成TERMINATED状态,或者等待时间超时才会被唤醒。
8.7 工作线程退出的方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果工作线程突然被终结,那么工作线程的数量就没有减一。
if (completedAbruptly)
// 将工作线程数量减一。
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将工作线程的任务完成数添加到线程池完成任务总数中
completedTaskCount += w.completedTasks;
// 从工作线程集合中移除本工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 因为有一个工作线程已经完成被释放,那么就去尝试终结线程池。
tryTerminate();
int c = ctl.get();
// 如果线程池状态小于STOP,
// 就要判断终结了这个工作线程之后,线程池中工作线程数量是否满足需求。
if (runStateLessThan(c, STOP)) {
// 如果工作线程正常终结,
// 那么要看线程池中工作线程数量是否满足需求。
if (!completedAbruptly) {
// 不允许核心池线程释放,那么最小值是corePoolSize,否则就可以为0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 但是如果任务队列中还有任务,那么工作线程数量最少为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果工作线程数量小于min值,就要创建新的工作线程了。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 开启一个新的工作线程
addWorker(null, false);
}
}
工作线程被释放,有两种情况,一种是运行完成正常结束,一种是发生异常意外终止。
当工作线程被释放时,需要将它从工作线程集合workers中移除,将该工作线程任务完成数添加到线程池完成任务总数中。调用tryTerminate方法尝试终结线程池。
另外因为有一个工作线程被释放,那么就要考虑线程池中当前工作线程数量是否符合要求,要不要添加新的工作线程。
九. 创建线程池的方法。
上面分析完线程池的功能方法后,再来说说怎样创建一个线程池。
9.1 构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 判断设置的核心池数量corePoolSize、最大池数量maximumPoolSize、
// 与线程空闲存活时间keepAliveTime的值,是否符合要求
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 赋值
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor类一共有四个构造函数,前面三个构造函数都是调用后面那个构造函数来实现的。参数意义:
- corePoolSize: 线程池核心池线程个数。
- maximumPoolSize: 线程池允许最大的线程个数。
- keepAliveTime: 线程空闲时,允许存活的时间。
- unit:辅助变量,用来将keepAliveTime参数,转成对应纳秒值。
- workQueue:储存所有待执行任务的阻塞队列
- threadFactory:用来创建线程的工厂类
- handler:通过它来通知调用值,线程池拒绝了任务。
注:有没有注意到,没有传递allowCoreThreadTimeOut这个参数,那么怎么设置这个成语变量呢?通过allowCoreThreadTimeOut(boolean value)方法来设置。
一般我们不用自己来new ThreadPoolExecutor对象,而是通过Executors这个工具类来创建ThreadPoolExecutor实例。
9.2 创建固定数量的线程池
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
根据我们前面讲解,要想线程池维持固定数量的工作线程,那么工作线程就不能被释放,就要做到两点:
- allowCoreThreadTimeOut为false,这个是默认的。keepAliveTime设置为0,这样当调用allowCoreThreadTimeOut(boolean value)方法修改allowCoreThreadTimeOut值时,会抛出异常,不允许修改。
- 核心池数量和最大池数量一样,防止添加新的工作线程池。任务队列容量要足够大,防止任务添加到任务队列中失败,不能执行。
9.3 创建单个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(
ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
与固定数量的线程池相比:
- 将固定数量nThreads变成了1
- 使用了FinalizableDelegatedExecutorService这个代理类,主要作用就是当对象被销毁时,会调用shutdown方法,停止线程池。
9.4 创建缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
什么叫做缓存线程池,当有任务执行时,会创建工作线程来执行任务,当任务执行完毕后,工作线程会等待一段时间,如果还是没有任务需要执行,那么就会释放工作线程。
十 总结
线程池有两个概念核心池与最大池。
- 核心池:线程池应该维持的工作线程数量,如果线程池中工作线程数量小于核心池数量,就会创建新的工作线程添加到线程池中。
- 最大池: 线程池中临时存在的工作线程,当任务队列不能添加新任务时,就会创建新的工作线程添加到线程池中。执行完任务后,超过一定时间没有接受到新任务,这个临时工作线程就会被释放。
两者的区别:
- 线程释放:最大池中的线程当超过一定时间没有接受到新任务,就会被释放,而核心池中的线程,一般不释放,只有设置allowCoreThreadTimeOut为true,且超过一定时间没有接受到新任务,也会被释放。
- 创建时机:线程池中工作线程数量小于核心池数量,就会创建核心池线程。但是对于最大池来说,只有任务队列已满,不能添加新任务时,才会创建新线程,放入最大池中。
注:一般称小于等于corePoolSize数量的工作线程池是核心池中的线程,大于corePoolSize数量的工作线程池就是最大池中的线程。
10.1 线程池执行任务流程
通过execute方法执行新任务command,分为三个步骤:
- 线程池中工作线程数量小于核心池数量,那么就开启新的工作线程来执行任务。
- 线程池中工作线程数量达到核心池数量,那么就将新任务添加到任务队列中。
- 如果新任务添加到任务队列失败,那么就开启新的工作线程来执行任务(这个线程就在最大池中了)。
在每个工作线程,会通过循环,调用getTask方法,不断地从任务队列中获取任务来执行。如果任务队列中没有任务,那么getTask方法会阻塞当前工作线程。
但是工作线程被唤醒后,getTask方法返回null,那么就会跳出循环,该工作线程运行结束,准备释放。
10.2 终止线程池
线程池不可能立即就终止,因为涉及到线程池正在执行任务的线程和任务队列中等待执行的任务该如何处理问题,有两个方式:
- shutdown方法:不能再向线程池中添加新任务了,但是已经添加到任务队列的任务还是会执行,也不会对正在执行任务的线程发起中断请求。等待任务队列任务执行完成,释放线程池中所有线程,线程池进入完全终止状态。
- shutdownNow方法:不能再向线程池中添加新任务了,也不会执行已经添加到任务队列的任务,但是会返回未执行的任务集合。而且对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务。等待线程池中所有线程释放,线程池进入完全终止状态。
两者的区别:
两者都不能再向线程池中添加新任务了。shutdown方法还是会将已添加的任务都执行完毕,而shutdownNow方法不会再执行任何新任务了。
注:对于正在执行的任务是可能执行完成的,因为中断请求只能中断处于WAITING与TIMED_WAITING状态的线程,对于处于其他状态的线程不起作用。
十一. 重要示例
11.1 正常运行线程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"开始运行 任务"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("======="+Thread.currentThread().getName()+"结束 任务"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定数量的线程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 单个线程的线程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 缓存线程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
}
}
运行结果:
--线程1开始运行 任务1
--线程2开始运行 任务2
--线程3开始运行 任务3
=======线程1结束 任务1
--线程1开始运行 任务4
=======线程2结束 任务2
--线程2开始运行 任务5
=======线程3结束 任务3
--线程3开始运行 任务6
=======线程1结束 任务4
=======线程2结束 任务5
=======线程3结束 任务6
这里使用的是固定数量的线程池,所以只有三个线程来执行任务,未执行到的任务只能等待。
如果换成单个线程的线程池,那么只有一个线程在执行任务。
而缓存线程池呢?你就会发现居然有六个线程在执行任务,就是有多少任务创建多少个线程。
运行完任务后,你会发现程序没有结束,那是因为线程池没有被终止。
11.2 终止线程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"开始运行 任务"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 发生中断异常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"结束 任务"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定数量的线程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 单个线程的线程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 缓存线程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
// 还是会执行完已经添加的任务
service.shutdown();
}
}
运行结果:
--线程1开始运行 任务1
--线程3开始运行 任务3
--线程2开始运行 任务2
=======线程1结束 任务1
--线程1开始运行 任务4
=======线程2结束 任务2
--线程2开始运行 任务5
=======线程3结束 任务3
--线程3开始运行 任务6
=======线程1结束 任务4
=======线程2结束 任务5
=======线程3结束 任务6
Process finished with exit code 0
使用shutdown方法,还是会执行完已经添加的任务。最后程序退出。
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"开始运行 任务"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 发生中断异常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"结束 任务"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定数量的线程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 单个线程的线程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 缓存线程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
service.shutdownNow();
}
}
运行结果:
--线程1开始运行 任务1
--线程2开始运行 任务2
--线程3开始运行 任务3
线程2 发生中断异常 exception==sleep interrupted
线程1 发生中断异常 exception==sleep interrupted
=======线程1结束 任务1
=======线程2结束 任务2
线程3 发生中断异常 exception==sleep interrupted
=======线程3结束 任务3
Process finished with exit code 0
使用shutdownNow方法,在任务队列中等待的任务是不会执行的,而且立即发起线程中断请求。