7-AbstractExecutorService

2020-02-11  本文已影响0人  鹏程1995

概述

引入

经过几篇记录,我们已经详细了解了线程池的接口定义,现在我们对于一个正常的线程池都提供哪些功能已经有了一个大概的认识了。

在之前我们学习集合类源码时已经大概了解了Java的套路:

  1. 定义接口【从抽象到具体】
  2. 针对接口将通用的方法实现,AbstractXXXXXXX
  3. 继承AbstractXXXX再加上各自场景下的逻辑,完成一个完整的工具

我们本次就介绍AbstractExecutorService

摘要

介绍AbstractExecutorService的实现原理。

类介绍

类定位

AbstractExecutorServiceExecutorService中的多种任务提交方法做了具体实现。以减少线程池的后续实现成本。

AbstractExecutorService实现了n种任务提交方法。但是放过了最核心的提交方法executor(),用户可以根据自己的环境自行定制。

此类也没有对线程池的监控、管理方法做默认实现,需要用户根据自己的使用场景自己定制。

注意

没啥说的吧,看看他们怎么搞的,学习一下。

源码解读

入参统一

介绍

AbstractExecutorService抽离了入参统一的方法newTaskFor(),它通过RunnableFuture将入参统一转换成Runnable接口以方便后面的执行。

毕竟经过我们之前的例子也能意识到,线程池重视的是任务的执行而非返回的结果,统一包装成Runnable的实现类更符合最少知原则。

同时,AbstractExecutorService默认使用FutureTask,但是将newTaskFor()其设置成protected以方便用户定制自己的RunnableFuture实现类。

源码

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

单任务提交

介绍

没什么说的,这些实现都依赖于上面的统一包装和未实现的executor()

源码

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}


public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

多任务提交

介绍

多任务操作的实现相对于单任务提交略有不同,他不是我们之前实例中的那样直接循环调用单任务提交然后等结束,而是依赖一个ExecutorCompletionService的框架,此框架将这些协同操作进行了封装。后面再详细介绍吧。

invokeAny()的思路【不考虑限时问题】:

  1. 一个一个顺序将任务提交进去
  2. 提交任务的同时检测看是不是有任务完成了,如果有完成了,就取消已经添加的任务并返回

invokeAll的思路【不考虑限时的问题】:

源码

/**
 * invokeAny() 的主要逻辑
 */
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    
    // 参数合法性检测
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    // 为了在处理器并行能力有限制的情况下保证执行效率,我们一个一个的加任务并检测执行结果,有任务执行
    // 成功的话,剩下的任务就不用添加了,正在执行的任务一律取消即可

    try {
        // 记录下抛出的异常,这个和我们的机制相关,在上面的 ExecutorService 中我们介绍了,我们返回的
        // 是最早结束的任务,不管是不是抛出了执行异常。当然这里编码是可以简化的。
        ExecutionException ee = null;
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // 
        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;//正在执行的任务数

        
        for (;;) {
            Future<T> f = ecs.poll();
            if (f == null) { // 还没有任务执行成功
                if (ntasks > 0) {// 还有没加进队列的任务,加任务
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)// 没有可以加的任务了,而且也没有正在执行的任务,
                                     //说明任务都执行失败
                    break;
                else if (timed) { // 任务都加进去了,还有在执行的任务,而且有限时,就等限时吧
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else// 任务都加进去了,还有在执行的任务,没有限时,那就一直等下去
                    f = ecs.take();
            }
            if (f != null) { // 得到一个结束状态的任务
                --active;
                try {
                    return f.get(); // 尝试获取结果并返回
                } catch (ExecutionException eex) { 
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null) 
            ee = new ExecutionException();
        throw ee;

    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)//取消之前提交的任务
            futures.get(i).cancel(true);
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}


// 一样的思路:
// 1. 一个一个调用单次提交提交上去
// 2. 等待所有任务完成,然后返回
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }

        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

和自己实现的例子对比

大同小异,但是有几点他做的比较好:

  1. 单次提交任务最重的实现全都集中到execute()中,方便统一编码和修改;而我各自写了一遍,不方便后面的阅读和维护。

我感觉我做的比较好的:

  1. invokeAll()中,我直接依赖的submit()省了很多调用前后的转换。它还是依赖的execute(),代码相对就有些赘余了。

问题

doInvokeAny()中,它的try-catch-finally有点冗余,貌似可以精简:

/**
 * the main mechanics of invokeAny.
 */
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

    try {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;

        for (;;) {
            Future<T> f = ecs.poll();
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)
                    break;
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();
            }
            if (f != null) {
                --active;
                    return f.get();
            }
        }
    } catch (ExecutionException e){
        e.printStackTrace();
        throw e;

    } finally{
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
    return null;
}


试着修改了一下,发现了问题,java在检测编译问题时,如果你明确写了 throw他就不会追究的是不是有地方漏写了return,但是如果你在catch中写的throw,他就会报错说你有地方没写return

扩展

参考文献

上一篇下一篇

猜你喜欢

热点阅读