线程池- AbstractExecutorService
2017-05-11 本文已影响0人
zhanglbjames
前言
AbstractExecutorService实现了ExecutorService和Executor接口的基本方法,ThreadPoolExecute和ForkJoinPool继承AbstractExecutorService就可以减少实现的复杂度,接口适配器模式
类继承结构
- Executor接口
Executor的存在用来实现异步框架(将任务和任务的执行分开,不同于Thread将任务和执行绑定在一起),即将任务提交和任务如何执行分开,Executor正是用来提交任务的。 void execute(Runnable command)用于提交没有返回值的任务
public interface Executor {
void execute(Runnable command);
}
- ExecutorService接口
ExecutorService接口继承自Executor 接口,并且提供了对任务执行过程的管理操作,为了Executor提供各种管理服务而存在的,拓展了提交有返回值的任务的submit()方法
public interface ExecutorService extends Executor {
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit);
// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 如果此执行程序已关闭,则返回 true。
boolean isShutdown();
// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated();
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
void shutdown();
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
List<Runnable> shutdownNow();
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result);
}
- AbstractExecutorService是对ExecutorService接口的默认实现
AbstractExecutorService 源码详解
invokeAny方法实现
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// 含有结果的Future队列
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// 将本对象作为Executor创建ExecutorCompletionService对象
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 记录可能抛出的执行异常
ExecutionException ee = null;
// 初始化超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 确定在主循环之前开始一个任务
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1; // 记录正在执行的任务数量
for (;;) {
// 获取并移除下一个将要完成的任务的结果表示,如果没有任何表示则返回null
Future<T> f = ecs.poll();// 底层调用队列的poll方法(非阻塞)
if (f == null) { // 没有结果表示
if (ntasks > 0) { //如果还有剩余的任务,则提交下一个任务
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 出现这种情况说明,已经有任务完成,并返回结果表示,但是
// 捕获到了异常,则跳出主循环,进行异常的抛出
else if (active == 0)
break;
else if (timed) { // 超时获取结果表示
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else // 阻塞获取结果表示
f = ecs.take();
}
if (f != null) { //含有结果表示
--active;
try {
return f.get(); // 返回结果
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally { // 最后取消所有提交的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
// 对doInvokeAny的封装,实现无超时等待的版本
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
// 对doInvokeAny的封装,实现超时等待的版本
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
通过对私有内部实现doInvokeAny的封装,实现对外的无超时等待的版本和超时等待的两个版本,通过ExecutorCompletionService类来实现对所有提交的任务执行完成时返回结果的存储和获取。
- invokeAny方法没有对task进行显示地包装,但是通过ExecutorCompletionService的submit()方法提交任务时,实际上是调用newTaskFor()方法对任务进行了包装为RunnableFuture对象,然后调用了本对象的execute()方法提交任务,并返回异步计算结果对象
// ExecutorCompletionService 的submit方法
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
- 使用了ExecutorCompletionService对象对任务执行完成时结果的存取,隐含了对任务是否完成的判断;所以对返回结果就不用通过isDone()方法判断是否任务已经完成了
ExecutorCompletionService部分源码如下:
// 阻塞队列:用来存储已经完成的任务
private final BlockingQueue<Future<V>> completionQueue;
/**
* 拓展了 FutureTask 在完成时将任务入队功能
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 此方法在FutureTask任务run方法完成时调用,这里是将完成的任务入队
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
invokeAll方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false; // 所有任务是否完成的标志
try {
// 对所有任务进行包装,并提交任务,并将返回的结果添加到futures集合中
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 对所有结果进行判断或者阻塞等待结果返回
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) { // 如果任务没有完成
try {
f.get(); // 则阻塞等待结果返回,并压制异常
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
// 当所有任务已经完成了(不管是正常完成还是异常完成,
// 如发生CancellationException、ExecutionException ),
// 则将完成标志设为true,并返回结果集合
done = true;
return futures;
} finally {
if (!done) // 如果发生中断异常InterruptedException 则取消已经提交的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
// 在添加执行任务时超时判断,如果超时则立刻返回futures集合
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
// 每次对结果进行判断时都进行超时判断
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) { // 判断超时
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime(); // 更新剩余时间
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
invokeAll方法也实现了无超时和超时两个版本
- 无超时版本首先将所有任务包装后提交给本对象的执行器(调用execute)执行,并将返回的结果添加到futures集合中,然后对futures集合进行遍历判断,是否已经完成,如果没有完成则使用get方法阻塞,等待结果的返回,并压制了一些异常;并在finally模块对标志进行检查,取消已经提交的任务。
- 超时版本和无超时版本基本一致,但是加了超时逻辑。在2个地方增加了超时判断:1) 在添加执行任务时超时判断,如果超时则立刻返回futures集合;** 2) **每次对结果进行判断时都进行超时判断。