ThreadPoolExecutor源码分析
记得最开始接触并发编程是,看的第一块的源码就是ThreadPoolExecutor,但是之前没有做任何的笔记,今天再来复习一下
一、线程池主要属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池使用ctl代表线程状态和真正运行的线程数量,包括两个部分;第一个部分,就是高3位,代表线程池的状态;然后是低29位代表正在执行有效的线程数量;COUNT_BITS 等于 29;CAPACITY 表示二进制 29 个 1;这两个值是为了方便计算线程状态,以及线程数量的;
线程池状态主要有:
RUNNING:高三位为111,线程池正常运行状态
SHUTDOWN:高三位为000,该状态不接受新任务,处理已接收的任务
STOP:高三位为001,该状态不接受新任务,也不处理已接收的任务,并且会中断正在处理的任务,调用shutdownNow()方法时,可以从RUNNING或者是SHUTDOWN状态变为STOP状态
TIDYING:高三位为010,当所有线程都终止,ctl记录的线程数量变为0,则会变为TIDYING状态;
TERMINATED:高三位为011,当线程池彻底终止,就会变成TERMINATED状态
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:线程最大空闲时间
unit:空闲时间单位
workQueue:阻塞队列
threadFactory:生成线程的工厂类,默认使用Executors.defaultThreadFactory() 来创建线程
handler:拒绝策略,拒绝策略有一下几种
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
二、线程池执行
当线程执行时,先看是否达到核心线程数,如果还没达到核心线程数则,直接起线程;当达到核心线程数,则把线程放入阻塞队列;当阻塞队列放满,则开始起最大线程数,当线程数量起到最大线程后;后续线程进入拒绝策略;当起了超过核心线程数的线程,去阻塞队列获取任务时,超过最大空闲时间则直接返回,然后关掉该线程。
三、源码分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当线程数小于核心线程数,则直接新起任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//优先判断线程池运行状态,然后再去把线程放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判断运行状态,如果线程池状态关闭后,然后执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前线程池没有线程在跑,则启动一个非核心的线程,去执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//入队失败,则起非核心的线程去执行(最大线程数),如果失败,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
从上述代码就能看出,整个线程池执行的核心流程;我们再来分析一下细节,先看下内部类Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//当前执行的线程
final Thread thread;
//创建该woker时,传入的任务
Runnable firstTask;
//当前woker执行完成的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
//设置锁的初始状态
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建线程
this.thread = getThreadFactory().newThread(this);
}
public void run() {
//该线程调用的,是ThreadPoolExecutor runWorker方法
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
//使用CAS去获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//调用该方法,去中断线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
线程池执行时,是先把每个任务封装成Worker,来执行的;我们来看看addWorker的代码分析
//参数为第一个执行的任务(有可能为空),是否创建的是核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池运行状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
/*如果当线程达到CAPACITY最大数量
*或者启用的是核心线程数,超过了核心线程数
*或者超过了最大线程数,则添加woker失败
*/
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用cas对执行的线程数加一,如果成功则跳出循环,去添加woker
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果执行到这里,线程池状态有变化,则重新判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//对当前线程池加锁,为了保证并发时,workers添加的安全性
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//判断线程池的状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到线程池维护的列表中引用
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//添加成功,则运行当前woker线程
if (workerAdded) {
//woker线程执行,调用woker的run方法,又调用ThreadPoolExecutor的runWorker方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
接下来,我们再看看runWorker方法
final void runWorker(Worker w) {
//wt目前只是判断了是否打断,beforeExecute,该方法为空,没有实现,如果我们自己实现线程池,可以做日志等其他的输出
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//先释放锁,是为了清除interrupt,任务还没开始,不允许打断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果当前任务为空,则去阻塞队列获取任务
while (task != null || (task = getTask()) != null) {
//对当前woker加锁,这个加锁时为了防止在执行期间,被其他线程中断,主要是调用interruptIdleWorkers()这个方法
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//判断线程池运行状态,和中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//直接执行我们传入的task的run方法,所以,线程池每次执行时,都是跑的自己woker线程,我们传入进去的Runnable对象,并没有调用start方法,只是跑了run方法
task.run();
} catch (RuntimeException x) {
//如果有异常,则直接抛出,但是会在processWorkerExit方法中,新起一个woker
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//执行完成则完成任务数加一,释放锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理线程跑完后的逻辑,主要是对是否是抛出异常做处理
processWorkerExit(w, completedAbruptly);
}
}
从runWorker方法中我们看到了,线程执行时,并不是去start,这样避免了线程上下文的切换,只是在当前线程执行了我们传入task的run方法,提高了系统的整体性能;接下来我们再看看getTask怎么来获取线程的;
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果当前线程池状态已经停止,则去把线程池清空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程数量
int wc = workerCountOf(c);
/*
*判断获取线程是否需要超时
*jdk1.5之后,allowCoreThreadTimeOut可以设置是否允许核心线程数超时
*一般是不会设置,则只有当线程数量超过了核心线程数,则从阻塞队列获取线程会超时
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果该线程超时,或者超过了最大线程数,或者队列为空了,则去对当前运行的线程数减一
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果线程为核心线程数,则使用take方法阻塞获取线程,不会超时
//如果不为核心线程数,则使用poll获取task,如果超时则获取为空,会进入上面一步,进行销毁,线程数量会减一
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果获取到了,则跳出该方法,返回该task
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从上面可以看出,当去从阻塞队列中获取任务时,是靠阻塞队列的特性去阻塞获取线程,如果获取不到,就直接销毁当前woker了;并且,woker中没有核心线程的标记,是根据数量去判断的,如果小于或者等于了我们设置的核心线程数,则会阻塞去获取task;
四、Executors
Executors工具类提供了几种线程池模型,这里分析一下,但是不建议大家用,最好还是根据自己的业务来新建线程池,因为这几种线程池可能会引起内存溢出
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool核心线程数,最大线程数相同,队列是无界的阻塞队列,如果线程消费能力不够时,就有可能出现内存溢出的问题
2.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor核心线程数和最大线程数都是1,和newFixedThreadPool一样,如果线程消费能力不够时,就有可能出现内存溢出的问题
3.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool,核心线程数为0,最大线程数为MAX_VALUE,传入的阻塞队列长度只有1。如果线程消费能力不够时,可能会启用很多线程,所以也可能会出现内存溢出的问题;