Java线程池相关

2017-05-18  本文已影响0人  冰火人生

JDK线程池相关一

jdk中将计算任务(task)和计算任务执行本身解耦。

基础接口与类

与计算任务相关的两个接口和

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

两个接口的区别从代码和注释中显而易见。

Callable带返回值,且可能抛异常(受检);Runnable不带返回值,且不抛异常(受检)。

而任务的执行则由另一个接口Executor

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

注意,这个接口只描述了可以执行任务,并没有线程池的概念,当然线程池是一定可以执行任务的,因此线程池需要实现该接口。

为了追踪任务的异步计算,比如提交完一个任务之后需要知道任务是否已完成,或者是取消该任务,jdk提供了一个名为Future的接口

public interface Future<V> {

   /**
    * 取消与之关联的任务。如果任务已经完成或者已经取消,或者是因为某些原因不能取消则返回false
    * 如果任务还未开始,且取消成功,则该任务永远不会再被执行
    * 如果任务已经开始,则输入参数mayInterruptIfRunning决定是否采用中断的方式叫停任务
    */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 任务完成前被取消 则返回true
    */
    boolean isCancelled();

    /**
    * 返回任务是否完成
    * 任务完成包括以下情况
    * 正常结束、异常或者取消
    * 以上任何一种情况下都返回true
    */
    boolean isDone();

    /**
    * 阻塞到任务结束,返回任务执行结果
    */
    V get() throws InterruptedException, ExecutionException;
    
    /**
    * 等待一段时间,还没结束则抛超时异常
    */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

至此,我们了解了三个概念:

但是这几部分是怎么联系起来的呢?线程池在哪里呢?

先回答第二个问题。

jdk提供了另一个接口,该接口定义了执行器的一系列行为(方法),那就是ExecutorService。

public interface ExecutorService extends Executor {

    /**
    * 该方法会以一种比较平滑的方式关闭执行器:
    * 1.已经提交的任务会被执行(但不保证执行完毕)
    * 2.不再接收新的任务
    * 如果该执行器已经关闭了,再调用此方法没有任何作用
    * 该方法不会等到已经提交的任务执行完毕
    */
    void shutdown();

    /**
     * 比较粗暴的关闭执行器,直接试图中止所有的任务,挂起所有等待执行的任务
     * 返回所有等待执行的任务
     * 该方法不会等待正在执行中的任务执行完毕
     */
    List<Runnable> shutdownNow();

    /**
     * 执行器已经被关闭时返回true
     *
     */
    boolean isShutdown();

    /**
     * 关闭执行器后如果所有的任务都已执行完毕则返回true
     * 这意味着该方法只可能在调用shutdown或者shutdownNow后返回true
     */
    boolean isTerminated();

    /**
     * 执行器关闭后阻塞至所有的任务执行完毕或者超时、中断
     * 当执行器已经终止时返回true;超过指定时间还没终止则返回false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个带返回值的任务给执行器,并返回一个Future对象用于跟踪任务的执行
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个没有返回值的任务给执行器,并返回一个Futrue对象用于跟踪任务的执行
     * 任务执行完毕后使用get可以得到指定的result
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个没有返回值的任务给执行器,并返回一个Futrue对象用于跟踪任务的执行
     * 任务执行完毕后使用get返回null
     */
    Future<?> submit(Runnable task);

    /**
     * 提交多个任务,该方法是阻塞的,只有所有的任务完成后才返回与这些任务关联的Future列表
     * Future列表中的每一个对象调用isDone都返回true
     * 任务正常结束或者抛异常才成为完成
     * 输入的列表在执行该方法时被修改时,则返回的结果未定义
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 指定执行的时长,到时间后任务要么执行完毕要么超时
     * 所有的Future对象调用isDone都返回true
     * 返回前,所有未能完成的任务都会被取消掉
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 只要任意一个任务完成,则返回该任务的返回值,其他未完成的任务都会被取消
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 只要任意一个任务在指定的时间点前返回,则返回该任务的返回值,其他未完成的任务都会被取消
     * 如果没有任何一个任务及时完成,则抛出超时异常
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

再来回答第一个问题。

jdk提供了一个接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

该接口直接就将任务和任务追踪关联起来了。

接下来我们看看执行器的实现。jdk中提供了一个执行器(其实是线程池)的抽象类,该类实现了其上层接口的很多方法,留给一些必要的方法给子类去实现。

public abstract class AbstractExecutorService implements ExecutorService

这里我们只看看这个类的submit方法

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

可以看到这两个方法是对ExecutorService接口中方法的实现。

可以看到这两个方法中都是将任务封装成一个RunnableFuture对象,然后扔给executor执行,最后返回该RunnableFuture对象。

来看看这个newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

返回了一个我们尚未提及的一个类示例:FutureTask。显然这个FutureTask是RunnableFuture的实现类。

目前为止,还有一个疑问:既然任务是执行器来执行的,任务的状态是通过Future来查询的,那Future中的状态是什么时候设置的呢?那就得看FutrueTask的源码了。

我们挑选它的一个构造函数来看

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

可以看到FutureTask包含了传递进去的Callable。而AbstractExecutorService中的execute执行的实际上是包含Callable对象的FutureTask。因此执行器(Executor)执行的是FutureTask的run方法。对于执行器来说,它并不知道FutureTask内部的状态,它只负责调用FutureTask的run方法,该run方法会完成FutureTask的状态变更。

FutureTask有一个成员变量

 private volatile Thread runner;

这个成员变量是用于执行FutureTask的线程。对于线程池而言,这个线程就是线程池分配给它的。来看看run方法

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

这个run方法执行时其实已经是由某一个线程来执行了,对于线程池而言就是分配给它的线程。而第一个if是为了将当前线程赋值给成员变量runner。而cancel方法中会尝试中断该线程。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

FutureTask我会单独写一篇源码分析,此处大致了解即可。

回到AbstractExecutorService的两个submit方法,注意到execute方法,该方法其实是Executor接口中的方法,抽象类并没有实现该方法,这说明需要子类去实现。

目前为止还没有出现过线程池的概念。其实jdk中的线程池就是用AbstractExecutorService来实现的。该类就是ThreadPoolExecutor。看名字就能知道是线程池执行器。这个类会另写文章单独分析。

总结

jdk线程池的设计将任务(静态),任务的一次执行(动态)以及任务的执行解耦。定义了不同的接口分别去完成这些事情。

任务由Callable或者Runnable来定义,只定义了任务需要做什么,这是个静态的概念。

任务的一次执行则由Future来表示,通过Future可以知道与之关联的任务的执行状态,这是个动态的概念。

任务的执行则由执行器来完成,执行器只负责执行任务,并不直接对外提供查询某个任务是否完成的功能(由Future来提供)。

任务被封装成FutureTask后交由执行器执行,FutureTask对任务(Callable或者Runnable)进行封装,加上了一些状态(未执行、执行中、执行完成等)。执行器执行FutureTask的run方法最终会调用任务的run或者call方法,在调用任务的run或者call的前后,FuturTask负责更改自身的状态。因此,对于执行器来说并不关心任务的状态,它只负责调用FutureTask的run方法,至于FutureTask的run方法中怎么处理,那就是FutureTask自己的事情了。

上一篇 下一篇

猜你喜欢

热点阅读