线程那些事

线程池那些事之Future

2018-11-04  本文已影响8人  土豆肉丝盖浇饭

前言

ThreadPoolExecutor除了可以执行Runnable的任务外,还可以执行那些带有返回结果的Callable任务。在提交Callable任务后,我们会得到一个Future对象。使用这个Future对象,我们可以监控任务的执行状态,也可以取消任务的执行。

源码分析

任务的提交

ThreadPoolExecutor的submit方法除了可以提交Callable的任务,也可以提交Runnable的任务,但是在底层都会转换为Callable,只不过提交Runnable任务在Future会返回空或者我们预设的值。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

在submit方法中,执行的逻辑为:

  1. 通过入参生成一个RunnableFuture对象
  2. 通过RunnableFuture调用execute方法
  3. 把这个RunnableFuture对象返回。

这边我们需要注意的是,对于execute方法来讲,它只关注RunnableFuture的run方法逻辑体,那么监控是如何做的呢,看下面的FutureTask解析。

FutureTask

image.png

从类图中可以看到,FutureTask实现了RunnbaleFuture接口,而RunnbaleFuture继承了Runnable和Future接口,FutureTask针对这两个的接口实现,有各自的作用。

我们先看下Future接口定义

public interface Future<V> {
    //如果任务还未执行,取消任务执行
    //mayInterruptIfRunning为true,会中断正在执行任务的线程
    boolean cancel(boolean mayInterruptIfRunning);
    //任务是否已经被取消
    boolean isCancelled();
    //任务是否完成
    boolean isDone();
    //获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    //超时获取任务结果,超过时间,抛出TimeoutException
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口定义了一些监控的方法,讲解FutureTask如何实现Future的接口之前,先了解下FutureTask的生命周期

FutureTask生命周期

FutureTask有7个状态

    //初始化
    private static final int NEW          = 0;
    //任务执行中
    private static final int COMPLETING   = 1;
    //正常结束
    private static final int NORMAL       = 2;
    //异常结束
    private static final int EXCEPTIONAL  = 3;
    //取消
    private static final int CANCELLED    = 4;
    //中断取消进行中
    private static final int INTERRUPTING = 5;
    //中断取消完成
    private static final int INTERRUPTED  = 6;

状态转换有以下4种情况

  1. 正常执行
    NEW->COMPLETING->NORMAL
  2. 执行失败
    NEW->COMPLETING->EXCEPTIONAL
  3. 取消
    NEW->CANCELLED
  4. 中断取消
    NEW->INTERRUPTING->INTERRUPTED

生命周期状态转换的逻辑都在FutureTask的run方法中

Runnable接口实现

public void run() {
        //如果已经被执行过或者被取消了,直接return
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行call方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //执行失败,抛出异常,设置异常
                    setException(ex);
                }
                if (ran)
                    //执行成功设置结果
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            //对中断的一些处理
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

逻辑很简单,成功调用set(result)设置结果,发生异常调用 setException(ex);

下面来看下set和setException方法

protected void set(V v) {
        //修改状态为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //设置结果到outcome
            outcome = v;
            //修改状态为NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            //唤醒等待的get方法
            finishCompletion();
        }
    }

protected void setException(Throwable t) {
        //修改状态为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
             //设置异常结果到outcome
            outcome = t;
            //修改状态为EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
             //唤醒等待的get方法
            finishCompletion();
        }
    }

两个方法的逻辑几乎一样,除了最后的结果和state不同。

在finishCompletion会唤醒等待结果的线程

private void finishCompletion() {
        // 遍历waiters进行唤醒
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //通过LockSupport唤醒线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

run方法中主要封装了对提交任务的执行以及FutureTask生命周期的转变逻辑,接下来看下Future接口的实现。

也就是在线程池中,工作线程会执行以上逻辑。

Futute接口实现

Future接口的方法调用者,不是工作线程,而是我们自己的业务线程了。

get
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //状态小于COMPLETING的时候代表任务没有完成,需要阻塞等待
        if (s <= COMPLETING)
            //阻塞当前线程
            s = awaitDone(false, 0L);
        return report(s);
    }

private V report(int s) throws ExecutionException {
        Object x = outcome;
        //如果状态为NORMAL,表示调用成功,返回结果
        if (s == NORMAL)
            return (V)x;
        //表示被取消,抛出CancellationException异常
        if (s >= CANCELLED)
            throw new CancellationException();
        //下面的话,代表执行过程中抛出其他异常
        throw new ExecutionException((Throwable)x);
    }

我们需要注意如果任务没有执行完毕,在get方法中会阻塞当前线程进行等待执行结果。当然get也支持超时模式。

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        //下面通过awaitDone会传入超时时间
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            //如果超时后,状态还是小于COMPLETING,抛出异常
            throw new TimeoutException();
        return report(s);
    }

如果在指定时间内没有返回,抛出TimeoutException异常。

在awaitDone中,会把当前线程阻塞,以链表节点的形式放入waiters中。

cancel

cancel用于取消任务,有普通取消和中断取消两种,通过mayInterruptIfRunning区分。

public boolean cancel(boolean mayInterruptIfRunning) {
        //只能在任务开始运行前取消
        //并且只能对NEW状态的线程进行中断
        //如果不进行中断设置为CANCELLED状态,否则,INTERRUPTING状态
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            //对线程进行中断的逻辑
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    //中断过后,设置为INTERRUPTED状态
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //唤醒结果的线程
            finishCompletion();
        }
        return true;
    }

对于中断取消,会调用线程的interrupt对线程进行中断。

这里有个知识点,线程中断只对阻塞的线程有效,如果阻塞的线程被中断了,会抛出中断异常。线程对于不阻塞的线程无效。
理论上取消需要在任务未执行前,那么我们也没必要对线程进行中断。但是从以下代码来看,在多线程情况下,即使状态为NEW,你取消了,任务还是有可能被执行。如果这个被执行任务内部有阻塞的逻辑,那么对应工作线程会释放不了,所以需要进行中断。

//run方法第一个判断
if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
//cancel方法的判断
 if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;

isCancelled和isDone

这两个方法比较简单,直接使用状态来判断。

public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

一些测试

超时测试


        Future future = executorService.submit(new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                try {
                    Thread.sleep(2000);
                }catch (Exception ex){
                    ex.printStackTrace();
                }


                return "hello world";
            }
        });
        System.out.println(future.isDone());
        System.out.println(future.get(1000,TimeUnit.MILLISECONDS));
        System.out.println(future.isDone());

会抛出超时异常

Exception in thread "main" java.util.concurrent.TimeoutException

中断测试

Future future = executorService.submit(new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                try {
                    Thread.sleep(2000);
                }catch (Exception ex){
                    ex.printStackTrace();
                }

                return "hello world";
            }
        });
        Thread.sleep(1000L);
        System.out.println(future.isDone());
        future.cancel(true);
        System.out.println(future.get());
        System.out.println(future.isDone());

返回结果为

false
Exception in thread "main" java.lang.InterruptedException: sleep interrupted
java.util.concurrent.CancellationException

抛出了2个异常,一个是中断异常,因为线程在sleep下面阻塞,如果没有阻塞不会有这个异常,第二个是get的时候抛出取消异常。

下面模拟下对于已经执行的任务,取消无效的例子

Future future = executorService.submit(new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                int i =0;
                while(i>=0){
                    i++;
                    if(i%1000000==0){
                        System.out.println(i);
                    }
                }

                return "hello world";
            }
        });
        //Thread.sleep(1000L);
        System.out.println(future.isDone());
        System.out.println(future.cancel(true));
        System.out.println(future.get());
        System.out.println(future.isDone());

结果的开始如下

false
Exception in thread "main" java.util.concurrent.CancellationException
true
1000000
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.scj.threadpool.BasicUse.main(BasicUse.java:57)
2000000
3000000
4000000

从第二个true看出来,我们cancel是成功了,但是这个线程一直在跑,所以对于这种无线循环的任务我们也是需要注意的,即使取消了,也没有用。只能把整个线程池销毁了吧。

总结

对于ThreadPoolExecutor来讲,FutureTask和Runnable毫无差别。FutureTask对Runnable任务进行了包装,在run方法体整合了Callable的调用,FutureTask状态转换以及调用结果设置的逻辑,同时可以通过Future接口的方法提供监控操作。

下面是我公众号,大家可以关注下。


image
上一篇下一篇

猜你喜欢

热点阅读