jdk提供的线程池ThreadPoolExecutor详解
前言
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。为解决资源分配问题,线程池采用了“池化思想”,即将所有线程统一管理。
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
· corePoolSize:线程池中核心线程的个数。
· maximumPoolSize: 线程池中允许创建的最大的线程个数。
· keepAliveTime:保活时间。
· workQueue:阻塞队列,用来储存任务。
· RejectedExecutionHandler :拒绝策略,jdk提供四种,可以自定义拒绝策略。
任务调度
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//注释①
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//启动新线程
return;
c = ctl.get();
}
//注释② 此时不满足小于corePoolSize条件
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)//注释③
addWorker(null, false);//false代表的是与maximumPoolSize比较,true则与corePoolSize
}
//注释④ 此时,workQueue.offer(command)返回值为false
else if (!addWorker(command, false))
reject(command);//拒绝策略
}
·注释①:如果workerCount(runable个数) < corePoolSize,则创建并启动一个线程来执行新提交的任务。
注释②:跳出if循环,此时corePoolSize>=workerCount ,且阻塞队列元素不满,添加任务到该阻塞队列中。
注释③:corePoolSize>=workerCount ,且阻塞队列元素满了,且corePoolSize<maximumPoolSize,新建线程并启动。
注释④:阻塞队列满了,并且workerCount > maximumPoolSize,此时根据拒绝策略来处理该任务,默认的是AbortPolicy(),直接抛异常。
任务缓冲
在上面execute()方法中,通过BlockQueue来完成任务的缓冲,这里的任务实际是个runable接口的对象。线程池中是以生产者消费者模式,通过阻塞队列来完成的。生产者则是往队列里添加元素的线程,消费者是从队列里拿元素的线程,即线程池中的线程。这与阻塞队列的特性有关,在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
任务申请
在上面的excute()方法中,我们可以得知,任务执行有两种可能:
1.由新创建的线程来执行,即addWork(Runable,true);
2.线程从任务队列中获取任务来执行,即addWorker(null, false),占大多数情况;
接下来去看看源码,任务申请主要通过getTask()来完成对runable的取出。
private Runnable getTask() {
boolean timedOut = false; //最后一个任务是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断是否运行状态或者阻塞队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断是否所有的工作线程被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//开始取出任务并返回
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
任务拒绝
再看excute()中, reject(command)则是我们的拒绝策略,,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略实质上是一个接口:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
可以通过实现这个接口来自定义拒绝策略,也可以通过JDK提供的四种策略:
1.ThreadPoolExecutor.AbortPolicy():丢弃任务并抛出java.util.concurrent.RejectedExecutionException异常。
2.ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程取处理。这种情况是需要让所有任务都执行完毕。
3.ThreadPoolExecutor.DiscardPolicy:丢弃任务,不报出异常。
4.ThreadPoolExecutor.DiscardOldPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
Worker线程管理
先看看Worker类:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/*初始化任务,可以为空 */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* 注释①
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// Lock methods
//注释②
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
注释①:Worker这个工作线程,实现了runable接口,并持有一个线程thread,一个初始化任务
firstTask,即第一个任务,可以为空。如果不为空,线程启动时则会执行这个任务,应对线程池初期的情况。
注释②:Worker通过继承AQS,使用AQS来实现独占锁的功能。AQS不可重入。
1.lock()方法一旦获取了锁,表示当前线程正在执行任务。
2.如果正在执行任务,则不应该中断线程。
3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
线程增加
再前面的excute()方法中出现的addWorker()则是对线程的增加。去看看里面做了啥事
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 一些判断
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
//注释①
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//注释②
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
注释①:core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
注释②:设置可重入锁,完成worker线程的增加,添加成功后会启动这个工作线程。
Worker线程回收
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//记录一下完成任务的个数
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
}
线程池中线程的销毁依赖JVM自动的回收,线程池自己决定那些线程需要回收,消除一下引用即可。
Worker线程执行任务
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
1.while()循环不断取出runable任务,通过getTask()取出。
2.执行任务
3.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
实际应用
在应用场景中主要分为IO密集型和CPU密集型。最主要还是设计核心线程个数和最大线程个数
的值。