Java8线程池——底层为LinkedBlockingQueue
线程池使用的一个例子
这里约定,细节在代码中注释说明,代码外正文的描述文字展示程序的大体流程。
int nThreads = 10;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
Runnable task = new Runnable(){
public void run(){
//do something
}
};
exec.execute(task);
这里的工厂方法newFixedThreadPool()初始化的是ThreadPoolExecutor实例,在线程池实现的工厂方法newSingleThreadExecutor()与newFixedThreadPool()一样构建底层为LinkedBlockingQueue的ThreadPoolExecutor。而newCachedThreadPool()初始化的是底层为SynchronousQueue的ThreadPoolExecutor实例。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ThreadPoolExecutor的execute()
任务到来执行execute()方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 执行过程分为三步:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 1. 如果少于corePoolSize条线程在运行,那么试图启动一条新的线程,并且把当前的命令作
* 为这条线程将要执行的第一个任务。addWorker()方法原子性检查runState和workerCount
* 的状态,以防止不应该增加线程时添加了线程。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 2. 如果一个任务成功入队,我们需要进行双重的校验,校验我们是否应该增加一个线程(因
* 为从上次校验之后存在的线程可能已经死掉了)或者检查线程池是否关闭了。所以需要再次
* 检验,如果需要进行回滚出队。或者,如果没有线程,就开始一条新的线程。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*
* 3. 如果任务入队不成功,我们试图增加新的进程。仍旧失败的话,直接拒绝执行任务。
*/
int c = ctl.get();
//①
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//②
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//③
else if (!addWorker(command, false))
reject(command);
}
上面的execute()方法的源代码中注释的标号,对应着上面的英文注释,中文翻译在对应英文下面。先大体看流程,execute()方法执行新的任务,当一个任务到来后,做必要的校验,根据检验的不同结果,执行不同的逻辑实现。但是任务的执行都是通过addWorker()方法完成的,下面流程走的addWorker()方法中。
ThreadPoolExecutor的addWorker()
addWorker()方法代码片段
private boolean addWorker(Runnable firstTask, boolean core) {
....省略一些校验逻辑.....
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//①
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//②
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//③
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代码片段中有三点重要之处,分别标注了序号。
- 到来的任务,被封装在一个Worker的对象中。Worker对象字段thread赋值给变量t;
- 对线程池状态的参数设置和进一步的最终校验需要加锁,保证并发时的线程安全,和数据的一致性。
- Thread变量t执行start()方法。
梳理上面的代码片段,到来的任务会被先封装Worker对象,然后Worker对象返回一个Thread对象,这个Thread对象最终执行。下面走到Worker类的源代码。
ThreadPoolExecutor的Worker类
//Worker的构造函数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
构造Worker对象时,Runnable任务作为当前线程的第一个任务赋值到firstTask,如果确实是第一个任务firstTask不为null,否则为null,然后去队列中取已经入队的任务。字段thread指向通过ThreadPoolExecutor中定义的线程工厂,生产的封装了Worker对象自己的线程。字段thread指向的对象执行start()方法(上面addWorker()方法代码中的③)时,底层调用的就是Worker对象的run()方法。
Worker类的run()方法
run()方法调用的是runWorker()方法。
public void run() {
runWorker(this);
}
Worker类的runWorker()方法
在runWorker()方法最重要的语句就是那一条while语句(下面代码中标识①的语句)。涉及如下三点:
- 如果任务task不为null,执行当前的task,此task是当前线程的第一个任务;
- 如果task为null,getTask()方法从任务队列中获取排队的任务进行执行;
- while循环因为条件中getTask()方法会进行自旋,所以当任务队列中没有任务时,当前线程一直处于阻塞等待状态,利用阻塞队列也可以限时等待,这也是线程池中线程可以复用的原因。因为线程会一直等待队列中有任务。当然可以通过ThreadPoolExecutor的keepAliveTime字段对线程生存时间进行设定。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//①
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
自此,完成了任务到来,提交给线程池,线程池创建线程或者复用等待中的线程执行任务。复用线程可以减少创建和销毁线程产生的开销,在高并发系统中线程池使用可以取得很好的性能效果。更多细节可以参考Java线程池ThreadPoolExecutor的实现和参数