Java并发编程 - ThreadPoolExecutor之Wo
初识Worker
线程池顾名思义是存放线程的池子,ThreadPoolExecutor是语言级别上对它的定义,既然要存放线程,那么其内部就需要某种数据结构来存储代表线程的Thread对象。Java API定义的容器类可以用来完成存储的功能,由于线程集中线程的松散性和不可重复性的特点,选用Set来存储比较合适。
那么,就可以在ThreadPoolExecutor类中这样定义:
private final HashSet<Thread> threads = new HashSet<Thread>();
存储的问题我们解决了,那么下一个问题就是线程执行用户任务的问题,虽然现在还没提到任务队列,不过就当成我们已经知道了,需要执行的用户任务会被放入到ThreadPoolExecutor内部定义的任务队列里,线程池的线程会从这个任务队列里取出用户任务来执行。
ThreadPoolExecutor-Worker-1.png线程池中的线程要取用户任务并执行,那么线程池中的线程类要这样定义:
public class ThreadPoolThread extends Thread {
@Override
public void run() {
// 获取队列中的用户任务
Runnable userTask = getUserTaskFromQueue();
userTask.run();
}
}
我们基于Runnable接口编程,"获取队列中的用户任务并执行"的逻辑代码可以剥离出来,定义成一个新的任务类:
public class ThreadTask implements Runnable {
@Override
public void run() {
// 获取并执行队列中的任务
Runnable userTask = getUserTashFromQueue();
userTask.run();
}
}
这样定义后,创建线程并放到线程池中,就可以这样做:
ThreadTask threadTask = new ThreadTask();
Thread thread = new Thread(threadTask);
thread.start();
threads.put(thread);
线程池中的线程需要类似上面代码那样创建后放入到线程池中的,那么上面的这段代码什么时候执行?
有两种方案:一种是在线程池启动的时候我们就创建一定数量的线程,也就是所谓的"预先启动线程",这种方案的策略是不管有没有任务需要执行,预先启动一定数量的线程然后等着任务到来;还有一种方案就是提交任务的时候再创建线程,只要数量不达到一个预定的值即可。提交任务的时候再创建线程,那么想一下,这个任务有必要放到任务队列中吗?用刚创建的线程来执行这个任务不就行了嘛?是的,的确是这样。回头看一下上面创建线程的代码,也就是说:
Thread thread = new Thread(threadTask);
这里操作的时候就希望把用户任务设置进去,但是threadTask本身就是一个任务对象,没有方法设置两个任务对象。那就需要对ThreadTask改造了,改造也很简单,类似这样:
public class ThreadTask implements Runnable {
Runnalbe userTask;
public ThreadTask (Runnable userTask) {
this.userTask = userTask;
}
@Override
public void run() {
if (userTask != null || (userTask = getUserTaskFromQueue() ) != null) {
userTask.run();
}
}
public Runnalbe getUserTaskFromQueue() {
Runnable queueTask = getUserTashFromQueue();
return queueTask;
}
}
上面的代码还有个问题,就是线程需要不断地从队列中取用户任务执行,而我们知道线程执行完run方法后就会停止,所以上面的代码达不到循环取任务的效果,需要对run方法的逻辑进行修改,如下:
while (userTask != null || (userTask = getUserTaskFromQueue() ) != null) {
userTask.run();
}
重新定义ThreadTask之后,创建线程池线程的代码,修改成如下所示:
Runnable userTask = new UserTask();
ThreadTask threadTask = new ThreadTask(userTask);
Thread thread = new Thread(threadTask);
threads.put(thread);
thread.start();
ThreadTask需要Thread的调用,也可以说其实就是Thread的run方法中的一部分,因为我们基于Runnable编程所以把它给提取出来了,既然ThreadTask是依托于Thread的,那么创建ThreadTask的时候就把要运行它的线程确定下来可以吗?是可以的,对ThreadTask改造如下:
public class ThreadTask implements Runnable {
Runnalbe userTask;
Thread thread;
public ThreadTask (Runnable userTask) {
this.userTask = userTask;
thread = new Thread(this);
}
@Override
public void run() {
while (userTask != null || (userTask = getUserTaskFromQueue() ) != null) {
userTask.run();
}
}
public Runnalbe getUserTaskFromQueue() {
Runnable queueTask = getUserTashFromQueue();
return queueTask;
}
}
上面的代码中我们将Thread作为了ThreadTask的一个属性,在ThreadTask的构造方法中通过this将自身对象设置进去。这样改造后,创建线程池线程的代码,修改成如下所示:
Runnable userTask = new UserTask();
ThreadTask threadTask = new ThreadTask(userTask);
threadTask.thread.start();// 启动线程
经过上面的一系列改造,ThreadTask具有了获取队列用户任务以及执行用户任务的功能,那么ThreadPoolExecutore只要持有ThreadTask的集合集合,也就是说我们上面的Thread集合可以修改成:
private final HashSet<ThreadTask> threads = new HashSet<ThreadTask>();
内部任务的执行就是通过ThreadTask这个对象的操作来完成,将用户交给它,它会启动内部持有的线程,然后执行用户任务。
有没有觉得ThreadTask这个名字并不完全符合了。这里不用费心思考合适的名字,它就是这篇文章要介绍的类——Worker的简化版。
JDK中Worker类的定义:
ThreadPoolExecutor$Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
中文翻译把它称之为:工作线程或工作者。
上面贴出来的代码可以看到,run方法的执行逻辑交给了外部定义的runWorker方法。
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Worker锁
可以看到上面贴出来的Worker类继承了AQS,重写了锁获取与释放的方法,Worker具有了锁的特性,而且是一把互斥锁。runWorker使用了Worker这把互斥锁,通过它保护用户任务的执行。搜索ThreadPoolExecutor的源码可以发现除了Worker本身的run方法调用了runWorker之外,没有其他地方执行了调用,而run方法的调用本身就是线程封闭式的,不会存在一个线程闯入进来打乱线程的执行。也就是runWorker的执行不存在竞争,为什么要用锁来保护?
目的是中断的控制, 正如Worker类注释锁描述的那样:
Class Worker mainly maintains interrupt control state for threads running tasks...
控制中断?怎么说?
这样说,现在问你,Worker代表工作线程,它正在执行任务,这时候想要中断它怎么做?Woker严格意义来说不是线程对象,但是它的内部包含了线程对象,这个线程对象就是运行Worker称之为工作线程的体现。想中断Worker的操作,那么中断它内部的这个线程就行了,于是我们可以这样操作:
Thread t = worker.thread;
t.interrupt();
通过这样的方式就可以中断Worker的执行。前提是可以持有想中断的线程对象的Worker对象。
但是现在的需求是在Worker工作的时候不允许中断它。也就是在执行runWorker内部逻辑的时候不允许中断正在运行它的线程。这时候runWorker使用的Worker锁就起到作用了。
对某一个线程的中断通常是另外一个线程发起的,也就是
Thread t = worker.thread;
t.interrupt();
这段代码是另外一个线程调用的,于是我们可以这样操作:
if (worker.tryLock()) {
Thread t = worker.thread;
t.interrupt();
}
如果worker锁现在被想中断的那个线程持有,那么上面的执行线程想获取锁就会失败,无法执行中断操作。
这样通过Worker锁的控制保护了正在执行的任务不能被中断。
还有一点在Worker的构造还是是这样定义的:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
这里可以看到state的初始值为-1,为什么定义这个数字,其实后面的解释已经很明确了:在进入runWorker方法之前不允许中断。在runWorker进入后用户任务执行之前,做了释放锁的操作,将state的值改为了0,也就是说runWorker锁保护块代码之前可以被中断:
w.unlock(); // allow interrupts
用户任务没执行前被中断ThreadPoolExecutor是允许的。
非重入
看先Worker的tryAcquire方法定义:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
这里的逻辑规定了不能重复获取锁,这个是为什么呢?
首先要明确一点Worker是来执行用户任务的,用户任务的开始执行的执行点在在runWorker方法中:
task.run();
假设现在执行这个调用的是线程A,这时候线程A是持有Worker锁的,我们在自己调用的这个task的run方法中执行这样的操作:
class MyTask implements Runnable {
@Override
public void run() {
threadPoolExecutor.setCorePoolSize(10);
}
}
执行run方法的当前线程还是线程A,来看看setCorePoolSize方法:
setCorePoolSize
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
这里有个interruptIdleWorkers方法的调用:
interruptIdleWorkers
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
现在线程A执行到这里,刚好w就是它在runWorker执行时候持有的锁,
if (!t.isInterrupted() && w.tryLock())
如果允许重入,那么w.tryLock就是true,那么线程A就把它自己它打断了。
所以Worker类在设计的时候就设计成是不可重入。
Worker执行异常处理
Worker执行出现异常指的是什么?
还是来看一下runWorker代码:
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
注意第2个try...catch块,task是用户自定的任务,在运行的时候可能会出现异常,出现异常后,从上面代码中可以看到捕获异常后被抛出了,那么下面这一句就无法执行:
completedAbruptly = false;
也就是completedAbruptly为初始值true,while循环结束,执行processWorkerExit方法,现在来看一下processWorkerExit的处理:
- processWorkerExit*
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
#第1步:减workerCount(减1)
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
#第2步:将当前Worker完成的任务数加到总任务数中,并从Worker集合中移除当前Worker
completedTaskCount += w.completedTasks;
workers.remove(w);
#第3步:尝试终止线程池
tryTerminate();
在对线程池有负效益的操作时,都需要“尝试终止”线程池。
终止线程池不在这里做说明。
#第4步:如果线程池状态是running或shutdown就尝试增加一个新的Worker
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
......
// completedAbruptly为true,上面的代码忽略
addWorker(null, false);
}
这里调用addWorker方法不一定能增加成功,addWorker内部有具体的判断逻辑。
addWorker逻辑不在这里做说明。