ThreadPoolExecutor(3) —— 干活的人 Wo
一、前言
前一篇文章,大体说明了一下线程池如何添加一个新的Worker去执行任务。本篇来详细分析 Worker 本身。
二、Worker 的结构
2.1 Worker 整体结构
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Worker所绑定的执行任务的线程. */
final Thread thread;
/** 初始化时需要执行的任务,有可能为空 */
Runnable firstTask;
/** 完成任务数 */
volatile long completedTasks;
/**
* 通过给定的任务(有可能为空)来创建初始化,初始化时会创建一条线程进行绑定
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 实现Runnable接口的run方法 */
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
可以看到 Worker 继承了AbstractQueuedSynchronizer,并实现了Runnable。
- 继承了 AQS,说明 Worker 本身是个锁,而且在tryAcquire以及其他对AQS方法的实现,都说明了它不支持重入。因为参数都写死为1,如果是重入功能的锁的话,会支持累加(此处可能说的不详细,如果不明白可以参考 AQS 系列文章 ReentrantLock(一) —— AQS简介)。
- 实现 Runnable,说明 Worker 本身是个可执行的任务类,它与它自身的thread 属性相互绑定(this.thread = getThreadFactory().newThread(this))
2.2 runWorker 方法
runWorker 方法是ThreadPoolExecutor的方法。
/**
* 执行 Worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 调用unlock()是为了让外部可以中断
w.unlock(); // allow interrupts
// 这个变量用于判断是否进入过自旋(while循环)
boolean completedAbruptly = true;
try {
// 这儿是自旋
// 1. 如果firstTask不为null,则执行firstTask;
// 2. 如果firstTask为null,则调用getTask()从队列获取任务。
// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// 这儿对worker进行加锁,是为了达到下面的目的
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
// 这两个方法在当前类里面为空实现。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 帮助gc
task = null;
// 已完成任务数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作被退出,说明线程池正在结束
processWorkerExit(w, completedAbruptly);
}
}
总结一下runWorker方法的执行过程:
- while循环中,不断地通过getTask()方法从workerQueue中获取任务
- 如果线程池正在停止,则中断线程。否则调用
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);
这个流程图非常经典:
Worker 的执行
2.3 processWorkerExit 方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 这个变量用于表示是否进入过自旋。
// 1. 如果没有进入过,该值为false
// 2. 进入过,该值为true
// 只有进入过自旋,worker的数量才需要减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 通过全局锁的方式移除worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
int c = ctl.get();
// 如果线程池状态为`SHUTDOWN`或`RUNNING`,
// 则通过调用`addWorker()`来创建线程,辅助完成对阻塞队列中任务的处理。
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
2.4 Worker 是如何被启动的
在上一篇 ThreadPoolExecutor(二) —— 线程池源码分析 中,ThreadPoolExecutor 成功将 Worker 添加到集合中后,调用的是 Worker 中 thread 的 start 方法(t.start())。我们知道 Thread 的 start 方法是会启动一个内存中的线程单元,并执行 run 方法:
public
class Thread implements Runnable {
//此处的 target 将会是 Worker 实例
public Thread(ThreadGroup group, Runnable target, String name,
long stackSize) {
init(group, target, name, stackSize);
}
@Override
public void run() {
if (target != null) {
target.run();
}
}
}
而在 Thread 执行 run 方法时,实际上调用的是它自身的 target 的run方法,此处的 target 就是与 Thread 绑定的 Worker 实例。我们再看一下两者绑定的过程。
在 Worker 初始化时,会通过 ThreadFactory 创建一个 Thread 实例:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 在创建 Thread 实例并赋值时,Worker 将自己作为参数传入线程工厂的方法内
this.thread = getThreadFactory().newThread(this);
}
此处 getThreadFactory 方法,返回的实际实例时 DefaultThreadFactory:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
此处执行了它的 newThread 方法,其中 Runnable 对象 r,就是 Worker 对象,在此处将 Worker 对象传入 Thread 的构造方法中,与 Thread 完成绑定。
所以,在线程池直接调用 Thread 的 start 方法,可以直接启动 Worker,执行 Worker 的 run 方法。
三、线程是如何在线程池中运作的
1、在线程池成功的 addWorker ,并且成功启动了 Worker 对应的 Thread以后,这个Thread就开始运作,运作的第一个任务,是 Worker 对象中的firstTask。
2、当 firstTask运作结束,会通过 getTask() 方法从队列中获取任务。在这里获取到的 task,无需与当前的 Thread 对象有什么绑定关系,只需要在当前 Thread 中执行这个 task 的 run 方法即可。
3、getTask 是从队列中获取 task 的核心逻辑,其中包含对线程数的判断以及是否允许核心线程数超时的判断。这些判断会影响从队列中获取task等待的时长,当然还有些比较细的内容需要额外的去说。
4、当当前的 Thread 从 getTask 方法中获取的 task 为空时,就说明这个线程已经没用了,就会消亡
三、有关 Worker 的一些疑问
3.1 为什么 Worker 要继承 AQS
- 上面已提到,ThreadPoolExecutor 需要的是不能重入的锁
- runWorker 方法是在 ThreadPoolExecutor 中的,它的参数是需要执行 Worker。而runWorker 方法是支持多线程,只是不支持同一条线程(也就是同一个Worker)出现并发执行的情况,所以让 Worker 自己来上自己的锁。当然 Worker 也可以不用自己去实现 AQS,可以自己有一个 lock 的属性,初始化时创建一个 lock对象。此处集成 AQS 可能也是为了更简洁,更优雅。
3.2 为什么 Worker 要实现 Runnable
- Worker 实现Runnable 接口,是为了可以作为 Runnable 类型的参数,与 Thread 进行绑定,在 Thread 启动时,会启动 Worker 的 run 方法。
3.3 为什么一定要与 Thread 绑定?为何在 ThreadPoolExecutor 启动 Worker 执行任务要调用 Worker 的 Thread,而不是 Worker 本身呢?
- 因为只有调用 Thread 的 start 方法,才会在内存中启动一条新的线程单元,如果直接执行 Worker 的 run 方法,那仅仅是主线程执行了 run 方法,并没有启动一条新线程
3.4 为什么不能启动 Worker 方法的run,然后 Worker 中 run 的内容,是启动 Worker 对象持有的 Thread 对象的 start 方法呢?这样做也可以启动一条新的线程单元啊?
- 如此问题所描述的那样,如果 ThreadPoolExecutor 直接启动 Worker,并且 Worker 将 ThreadPoolExecutor 和 Worker 中持有的 Thread 对象隔离,想想其实也没啥问题,也有可能我没有想到问题的关键所在。如果有哪位大佬知道这么写的原因,还请下方留言~
- 我能想到的,可能是类结构,或者代码写法上的偏好。当然也可能作者在写的时候,把 Worker 类就看做一个可执行的任务类,它的存在仅仅是对 Thread 的一层包装。这样想的话,Worker 确实有必要去实现 Runnable 接口。
3.5 在 runWorker 方法中,只要当前 Worker 完成了所有任务,就跳出了 while 循环,并执行 finally 中的移除过程,那核心的线程也会被移除吗?
- 此处有待更新
3.6 Worker为什么不使用ReentrantLock来实现呢?
- tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。
四、总结
本篇文章介绍了 ThreadPoolExecutor 中真正去执行任务的对象 —— Worker,Worker 与 Thread 之间的关系,以及 ThreadPoolExecutor 是如何去启动 Worker 的。再加上个人愚见,如果有理解错误或者缺失的地方,还请下方留言,大家一起交流,一起学习,一起成长。