ThreadPoolExecutor 深度分析

2018-06-05  本文已影响14人  wooody

一般情况下我们创建一个线程我们会直接继承一个Thread 类或者实现一个Runnable接口,然后通过new Thread().start()去启动一个线程执行相应的操作。可是这样来说的话,我们需要对于每一个操作都需要创建一个线程,比如我们在处理web请求的时候,如果我们针对每一个请求都创建一个线程来处理它的话,而且一次请求处理的时间比较短。这就意味着我们需要频繁的创建和销毁一个线程,这样也就会大大降低系统的效率,因为创建和销毁线程不但需要时间而且会消耗系统资源。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?这个时候我们就需要一个线程池了。

使用线程池有什么好处?

ThreadPoolExecutor

Java中 的线程池是通过ThreadPoolExecutor实现。它位于java.util.concurrent包下面,通过它我就可以直接使用线程池提供的好处和功能了。来看下它的简单结构类图:


ThreadPoolExecutor类图.png

来看下它的简单结构,从上面的类图中可以看到:它继承了AbstractExecutorService接口,AbstractExecutorService继承了ExecutorService接口,ExecutorService继承了Executor接口。Executor是一个顶层接口。

我们通常创建一个线程池的时候首先使用的就是newThreadPoolExecutor(),来看看ThreadPoolExecutor构造函数做了什么事情?来看看源码

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Executor接口

我这边就按照从上到下的顺序来分析ThreadPoolExecutor的实现和功能,首先从顶层的Executor接口来看:

public interface Executor {

    /**
     * 在将来某一个时候来运行给定的任务,这个任务可以是一个新的线程也可以是一个线程池线程
     */
    void execute(Runnable command);
}

很简单就是一个接口,接受一个Runnable参数,用来接受提交的任务,没有返回值。

ExecutorService接口

public interface ExecutorService extends Executor {

    /**
     * 关闭线程池,不再接受新的任务,但是此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出
     */
    void shutdown();

    /**
     *  这个会尝试停止正在执行的任务,并且暂停等待运行的任务,并且吧没有机会执行的任务列表返回。它是通过通过调用Thread.interrupt()方法来终止线程的,所以ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
     */
    List<Runnable> shutdownNow();

    /**
     返回线程池是否关闭的状态
     */
    boolean isShutdown();

    /**
     * 当所有的任务都完成了才会返回true.
     */
    boolean isTerminated();

    /**
     * 在给定的时间内,都会被阻塞,除非任务完成或者到达超时时间
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个可以有返回值的任务,返回值的结果保存在Future中
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个Runnable任务,并且通过制定的Result 返回结果到Future中
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个Runnable任务
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定的多个任务集合,并且把他们的执行都成功执行完成之后将状态和结果保存的Future列表中返回。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     *  相对于上面的一个方法,增加了一个超时时间
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 运行给定的方法,可是只要有一个任务执行成功就会立即返回结果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     *  和上面的方法类似,指定了一个超时时间
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

如上ExecutorService丰富了线程池接受任务的形式,还可以提交Callable类型的任务,可以有返回值的任务,还提供了关闭线程池的任务接口定义。

AbstractExecutorService抽象类

public abstract class AbstractExecutorService implements ExecutorService {

  /**
   *  返回一个给定Runnable任务的FutureTask。
  */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

     /**
   *  返回一个给定Callable任务的FutureTask。
  */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
 
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

     
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

AbstractExecutorService抽象类增加了newTaskFor方法还有doInokeAny方法,其他的就是对ExecutorService接口中任务提交和任务调用方法的具体实现。newTaskFor主要是用于创建一个FutureTask任务,而submit方法的实现都大同小异,都是首先判断提交的任务是否为空,不为空则调用newTaskFor创建RunnableFuture任务,然后通过调用顶层的execute方法来添加任务。重点看下invokeAny和invokeAll的实现,invokeAny主要是通过调用doInvokeAny来实现业务逻辑;

上一篇 下一篇

猜你喜欢

热点阅读