线程池- AbstractExecutorService

2017-05-11  本文已影响0人  zhanglbjames

前言

AbstractExecutorService实现了ExecutorService和Executor接口的基本方法,ThreadPoolExecute和ForkJoinPool继承AbstractExecutorService就可以减少实现的复杂度,接口适配器模式
类继承结构
  1. Executor接口

Executor的存在用来实现异步框架(将任务和任务的执行分开,不同于Thread将任务和执行绑定在一起),即将任务提交和任务如何执行分开,Executor正是用来提交任务的。 void execute(Runnable command)用于提交没有返回值的任务

public interface Executor {
    void execute(Runnable command);
}
  1. ExecutorService接口

ExecutorService接口继承自Executor 接口,并且提供了对任务执行过程的管理操作,为了Executor提供各种管理服务而存在的,拓展了提交有返回值的任务的submit()方法

public interface ExecutorService extends Executor {
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit);
// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 如果此执行程序已关闭,则返回 true。
boolean isShutdown();
// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated();
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
void shutdown();
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
List<Runnable> shutdownNow();
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result);
}
  1. AbstractExecutorService是对ExecutorService接口的默认实现

AbstractExecutorService 源码详解

invokeAny方法实现
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 含有结果的Future队列
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        // 将本对象作为Executor创建ExecutorCompletionService对象
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            // 记录可能抛出的执行异常
            ExecutionException ee = null;
            // 初始化超时时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 确定在主循环之前开始一个任务
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1; // 记录正在执行的任务数量

            for (;;) {
                // 获取并移除下一个将要完成的任务的结果表示,如果没有任何表示则返回null
                Future<T> f = ecs.poll();// 底层调用队列的poll方法(非阻塞)
                if (f == null) { // 没有结果表示
                    if (ntasks > 0) { //如果还有剩余的任务,则提交下一个任务
                        --ntasks;
                        futures.add(ecs.submit(it.next())); 
                        ++active;
                    }
                    // 出现这种情况说明,已经有任务完成,并返回结果表示,但是
                    // 捕获到了异常,则跳出主循环,进行异常的抛出
                    else if (active == 0) 
                        break;
                    else if (timed) { // 超时获取结果表示
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else // 阻塞获取结果表示
                        f = ecs.take();
                }
                if (f != null) { //含有结果表示
                    --active;
                    try {
                        return f.get(); // 返回结果
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            // 
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally { // 最后取消所有提交的任务
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    // 对doInvokeAny的封装,实现无超时等待的版本
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
     // 对doInvokeAny的封装,实现超时等待的版本
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

通过对私有内部实现doInvokeAny的封装,实现对外的无超时等待的版本和超时等待的两个版本,通过ExecutorCompletionService类来实现对所有提交的任务执行完成时返回结果的存储和获取

  1. invokeAny方法没有对task进行显示地包装,但是通过ExecutorCompletionService的submit()方法提交任务时,实际上是调用newTaskFor()方法对任务进行了包装为RunnableFuture对象,然后调用了本对象的execute()方法提交任务,并返回异步计算结果对象
// ExecutorCompletionService 的submit方法
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
  1. 使用了ExecutorCompletionService对象对任务执行完成时结果的存取,隐含了对任务是否完成的判断;所以对返回结果就不用通过isDone()方法判断是否任务已经完成了
    ExecutorCompletionService部分源码如下:
    // 阻塞队列:用来存储已经完成的任务
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * 拓展了 FutureTask 在完成时将任务入队功能
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        // 此方法在FutureTask任务run方法完成时调用,这里是将完成的任务入队
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
invokeAll方法
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false; // 所有任务是否完成的标志
        try {
            // 对所有任务进行包装,并提交任务,并将返回的结果添加到futures集合中
            for (Callable<T> t : tasks) { 
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            // 对所有结果进行判断或者阻塞等待结果返回
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { // 如果任务没有完成
                    try {
                        f.get(); // 则阻塞等待结果返回,并压制异常
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            // 当所有任务已经完成了(不管是正常完成还是异常完成,
            // 如发生CancellationException、ExecutionException ),
            // 则将完成标志设为true,并返回结果集合
            done = true; 
            return futures;
        } finally {
            if (!done) // 如果发生中断异常InterruptedException 则取消已经提交的任务
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false; 
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                // 在添加执行任务时超时判断,如果超时则立刻返回futures集合
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }
            // 每次对结果进行判断时都进行超时判断
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { // 判断超时
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime(); // 更新剩余时间
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

invokeAll方法也实现了无超时和超时两个版本

  1. 无超时版本首先将所有任务包装后提交给本对象的执行器(调用execute)执行,并将返回的结果添加到futures集合中,然后对futures集合进行遍历判断,是否已经完成,如果没有完成则使用get方法阻塞等待结果的返回,并压制了一些异常;并在finally模块对标志进行检查取消已经提交的任务
  2. 超时版本和无超时版本基本一致,但是加了超时逻辑。在2个地方增加了超时判断1) 在添加执行任务时超时判断,如果超时则立刻返回futures集合;** 2) **每次对结果进行判断时都进行超时判断。
方法总览
上一篇下一篇

猜你喜欢

热点阅读