Java 进阶之路java进阶干货技术干货

ThreadPool 之 线程池概览

2017-08-01  本文已影响93人  RojerAlone

[注] 本文中的源码基于 JDK1.8,源码中的注释为 JDK 中注释的翻译加上个人的理解。如有错误欢迎指正。

引言

因为进程的切换相当耗费资源,加上 CPU 的发展,操作系统中引入了线程的概念。相比于进程的上下文切换,线程的切换更轻量级,但是不代表没有开销,而且大部分多线程的生命周期都比较短,会发生频繁的线程创建、销毁动作,这也是相当消耗资源的,因此引入了线程池。

合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的结构

ThreadPool 的实现结构如下图所示:

ThreadPool 结构

Executor 接口

ThreadPoolExecutor 是最终的线程池实现类,顶层接口是 Executor,查看 Executor 源码,这个接口中只有一个方法:

public interface ExecutorSource {

        /**
         * 在未来某个时间执行参数中的命令,这个命令可能在一个新的线程、线程池中的线程或者一个调用线程(?)中被执行
         */
        void execute(Runnable command);
}

ExecutorService 接口

ExecutorService 接口继承了 Executor 接口,添加了一些对线程池的管理:

public interface ExecutorService extends Executor {

    /**
     * 有序地执行完之前提交的任务,但是不会接受新的任务。如果线程池已经被关闭,调用此方法没有额外的影响。
     */
    void shutdown();
    
    /**
     * 尝试停止所有正在运行的任务,停止等待中的线程,返回正在等待执行的任务列表
     */
    List<Runnable> shutdownNow();
    
    /**
     * 获取线程池是否已经被关闭
     */
    boolean isShutdown();

    /**
     * 如果所有的任务都已经被关闭了,返回 true,除非先调用 shutdown 或者 shutdownNow ,否则永远不会返回 true
     */
    boolean isTerminated();
    
    /**
     * 阻塞直到关闭请求后所有任务被完成,或者时间超时,或者线程被中断,不管哪一种情况先发生,根据先发生的情况返回值
     * 也就是说,获取线程池是否关闭,指定了一个时间,在这个时间之前被关闭的话,返回 true,如果超时还没有关闭,返回 false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个任务,会返回一个 Future , Future 可以返回任务的结果,当任务被成功完成之后可以通过 get 方法获取结果
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个实现了 Runnable 接口的任务以及返回的结果(???不懂),返回 Future
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个实现了 Runnable 接口的任务,返回 Future
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定的任务,当所有任务完成后返回带有任务状态和结果的 Future 列表。
     * Future.isDone对于返回的列表的每个元素都是正确的。 要注意的是,完成的任务可能会正常终止或抛出异常。
     * 如果在执行任务过程中修改了任务的集合,则这个方法的结果是未定义的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 和 invokeAll(Collection<? extends Callable<T>> tasks) 类似
     * 只不过如果在截止时间之前没完成的任务都会被取消,不再执行
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,直到一个任务完成或者抛出异常,其他未执行的任务将不再执行,返回被执行完的任务的结果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 和 invokeAny(Collection<? extends Callable<T>> tasks) 类似,加上了超时机制
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService 抽象类

AbstractExecutorService 抽象类实现了 ExecutorService 接口,对一些方法实现了默认实现。

public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * 根据传进来的 Runnable 和 value 构造一个 RunnableFuture
     * RunnableFuture 是继承了 Runnable 和 Future 接口的接口
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * 根据 Runnable 构建 RunnableFuture
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * 提交任务,从这里可以看到,内部还是调用了 execute 方法
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * 同 submit,只不过构建的是一个带有 result 的 FutureTask
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * 同上
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    /**
     * invokeAny 内部都调用这个方法
     * 三个参数,第一个表示要提交的任务,第二个表示是否是有时间限制的,第三个表示时间
     */
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        
        // 将自己本身作为参数传入 ExecutorCompletionService 的构造函数
        // 在这个类内部只用了 AbstractExecutorService 的 submit 方法
        ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

        try {
            // 记录异常,如果最终没有获得任何结果,就抛出这个异常
            ExecutionException ee = null;
            // 记录终止操作的时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 提交一个任务,其他的等下看情况再提交
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll(); // 从队列中出队一个任务的结果
                if (f == null) { // 如果没有结果,那么就继续从提交的任务中选取下一个执行
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) { // 如果已经有任务执行完毕,那么就返回结果
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            // 执行到这里还没有返回,那么就抛出异常
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally { // 最后将没有执行的任务取消
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

    // 其他的 invokeAny 都是基于 doInvokeAny 的,就不贴源码了
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        // code......
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        // code......
    }

    /**
     * 执行所有任务,直到所有任务完成或者出现异常才返回
     */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 执行所有的任务
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { // 如果任务还没有完成,就等待任务完成
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally { // 如果发生了异常,取消没有执行的任务
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    /**
     * 和一般的 invokeAll 类似,但是加上了判断是否在指定时间内执行完毕
     * 只要到指定时间,不管任务执行完没有,都直接返回结果
     */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally { // 如果最后有任务没有执行完,取消没执行的任务
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

由于贴的代码太多,如果所有内容都写到一篇文章里太过冗长,因此分开写了,下篇 ThreadPool 之线程池实现类 ThreadPoolExecutor 将研究线程池的最终实现类 ThreadPoolExecutor

上一篇下一篇

猜你喜欢

热点阅读