FutureTask源码解析

2019-01-29  本文已影响4人  多喝水JS

简介

FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。

使用FutureTask的优势有:

1、可以获取线程执行后的返回结果;
2、提供了超时控制功能。

缺陷:
1、虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果
2、想完成一些复杂的任务可能就比较难。比如下面一些例子:
①将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
②当Future集合中某个任务最快结束时,返回结果。
③等待Future结合中的所有任务都完成。

实现的接口

实现了Runnable接口和Future接口:

例子

public class FutureTaskTest {

    public static void main(String[] args) {
        Callable<Integer> call = new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                System.out.println("process ...");
                Thread.sleep(3000);
                return 1;
            }
        };
        FutureTask<Integer> task = new FutureTask<>(call);

        Thread thread = new Thread(task);
        thread.start();
        Thread thread1 = new Thread(task);
        thread1.start();

        System.out.println("do something...");

        try {
            Integer result = task.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}
输出:
process ...
do something...
1

上面代码在在任务执行时,不需要一直等待其运行结束返回结果,而是可以先去处理其他的事情,然后再获取返回结果。

原理

FutureTask构造器

 public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
 }
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

可以看出FutureTask提供两种方式来执行任务:
第一种是直接通过callable。
第二种构造方法传入一个Runnable对象和一个返回值对象,然后把他们封装为callable(RunnableAdapter),所以本质上还是通过callable来调用的。另因为Runnable是没有返回值的,所以要通过result参数在执行完之后返回结果。

核心成员变量

volatile int state;//表示对象状态,volatile关键字保证了内存可见性。futureTask中定义了7种状态,代表了7种不同的执行状态
private static final int NEW          = 0; //任务新建和执行中
private static final int COMPLETING   = 1; //任务将要执行完毕
private static final int NORMAL       = 2; //任务正常执行结束
private static final int EXCEPTIONAL  = 3; //任务异常
private static final int CANCELLED    = 4; //任务取消
private static final int INTERRUPTING = 5; //任务线程即将被中断
private static final int INTERRUPTED  = 6; //任务线程已中断

run方法

 public void run() {
        /*
     * 首先判断状态,如果不是初始状态,说明任务已经被执行或取消;
     * runner是FutureTask的一个属性,用于保存执行任务的线程,
     * 如果不为空则表示已经有线程正在执行,这里用CAS来设置,失败则返回。
     */
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //这里调用目标方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //设置返回结果对象
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
          // 无论是否执行成功,把runner设置为null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            // 如果被中断,则说明调用的cancel(true),
          // 这里要保证在cancel方法中把state设置为INTERRUPTED
          // 否则可能在cancel方法中还没执行中断,造成中断的泄露
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
   //设置返回结果对象
     protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            //当前任务执行完成(不管成功还是失败),唤醒等待队列中的下一个任务
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

总结一下run方法的执行过程

1、只有state为NEW的时候才执行任务;
2、执行前要设置runner为当前线程,使用CAS来设置是为了防止竞争;
3、如果任务执行成功,任务状态从NEW转换为COMPLETING,如果执行正常,设置最终状态为NORMAL;如果执行中出现了异常,设置最终状态为EXCEPTIONAL
4、任务不管成功或者失败都唤醒并删除Treiber Stack中的所有节点;
5、如果调用了cancel(true)方法进行了中断,要确保在run方法执行结束前的状态是INTERRUPTED

finishCompletion方法

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

在调用get方法时,如果任务还没有执行结束,则会阻塞调用的线程,然后把调用的线程放入waiters中,这时,如果任务执行完毕,也就是调用了finishCompletion方法,waiters会依次出栈并逐个唤醒对应的线程。

get方法

 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

1、这两个方法类似,首先判断状态,如果s <= COMPLETING,说明任务已经执行完毕,但set方法或setException方法还未执行结束(还未设置状态为NORMALEXCEPTIONAL),这时需要将当前线程添加到waiters中并阻塞。

2、第二种get提供了超时功能,如果在规定时间内任务还未执行完毕或者状态还是COMPLETING,则获取结果超时,抛出TimeoutException。而第一种get会一直阻塞直到state > COMPLETING
3、两个方法最终都调用report方法返回结果:如果state==NORMAL,将返回结果,否则抛异常

cancel方法

 public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // mayInterruptIfRunning参数表示是否要进行中断
        if (mayInterruptIfRunning) {
            try {
                // runner保存着当前执行任务的线程
                Thread t = runner;
                // 中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 设置最终状态为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

如果状态不是NEW,或者设置状态为INTERRUPTINGCANCELLED失败,则取消失败,返回false。

简单来说有一下两种情况:

1、如果当前任务还没有执行,那么state == NEW,那么会尝试设置状态,如果设置状态失败会返回false,表示取消失败;
2、如果当前任务已经被执行了,那么state > NEW,也就是!state == NEW为true,直接返回false。也就是说,如果任务一旦开始执行了(state != NEW),那么就不能被取消。
如果mayInterruptIfRunning为true,要中断当前执行任务的线程。

总结

1、FutureTask是线程安全的,在多线程下任务也只会被执行一次;
2、get方法调用时,如果任务没有结束,要阻塞当前线程,阻塞的线程将保存在一个Treiber Stack中;
3、get方法超时功能如果超时未获取成功,会抛出TimeoutException;

上一篇下一篇

猜你喜欢

热点阅读