FutureTask源码解析
简介
FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。
使用FutureTask的优势有:
1、可以获取线程执行后的返回结果;
2、提供了超时控制功能。
缺陷:
1、虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果
2、想完成一些复杂的任务可能就比较难。比如下面一些例子:
①将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
②当Future集合中某个任务最快结束时,返回结果。
③等待Future结合中的所有任务都完成。
实现的接口
实现了Runnable接口和Future接口:
例子
public class FutureTaskTest {
public static void main(String[] args) {
Callable<Integer> call = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("process ...");
Thread.sleep(3000);
return 1;
}
};
FutureTask<Integer> task = new FutureTask<>(call);
Thread thread = new Thread(task);
thread.start();
Thread thread1 = new Thread(task);
thread1.start();
System.out.println("do something...");
try {
Integer result = task.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
输出:
process ...
do something...
1
上面代码在在任务执行时,不需要一直等待其运行结束返回结果,而是可以先去处理其他的事情,然后再获取返回结果。
原理
FutureTask构造器
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
可以看出FutureTask提供两种方式来执行任务:
第一种是直接通过callable。
第二种构造方法传入一个Runnable对象和一个返回值对象,然后把他们封装为callable(RunnableAdapter
),所以本质上还是通过callable来调用的。另因为Runnable是没有返回值的,所以要通过result参数在执行完之后返回结果。
核心成员变量
volatile int state;//表示对象状态,volatile关键字保证了内存可见性。futureTask中定义了7种状态,代表了7种不同的执行状态
private static final int NEW = 0; //任务新建和执行中
private static final int COMPLETING = 1; //任务将要执行完毕
private static final int NORMAL = 2; //任务正常执行结束
private static final int EXCEPTIONAL = 3; //任务异常
private static final int CANCELLED = 4; //任务取消
private static final int INTERRUPTING = 5; //任务线程即将被中断
private static final int INTERRUPTED = 6; //任务线程已中断
run方法
public void run() {
/*
* 首先判断状态,如果不是初始状态,说明任务已经被执行或取消;
* runner是FutureTask的一个属性,用于保存执行任务的线程,
* 如果不为空则表示已经有线程正在执行,这里用CAS来设置,失败则返回。
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里调用目标方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//设置返回结果对象
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 无论是否执行成功,把runner设置为null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 如果被中断,则说明调用的cancel(true),
// 这里要保证在cancel方法中把state设置为INTERRUPTED
// 否则可能在cancel方法中还没执行中断,造成中断的泄露
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
//设置返回结果对象
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//当前任务执行完成(不管成功还是失败),唤醒等待队列中的下一个任务
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
总结一下run方法的执行过程
1、只有state为NEW
的时候才执行任务;
2、执行前要设置runner为当前线程,使用CAS来设置是为了防止竞争;
3、如果任务执行成功,任务状态从NEW转换为COMPLETING
,如果执行正常,设置最终状态为NORMAL
;如果执行中出现了异常,设置最终状态为EXCEPTIONAL
;
4、任务不管成功或者失败都唤醒并删除Treiber Stack
中的所有节点;
5、如果调用了cancel(true)方法进行了中断,要确保在run方法执行结束前的状态是INTERRUPTED
。
finishCompletion方法
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
在调用get方法时,如果任务还没有执行结束,则会阻塞调用的线程,然后把调用的线程放入waiters中,这时,如果任务执行完毕,也就是调用了finishCompletion方法,waiters会依次出栈并逐个唤醒对应的线程。
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
1、这两个方法类似,首先判断状态,如果s <= COMPLETING
,说明任务已经执行完毕,但set方法或setException
方法还未执行结束(还未设置状态为NORMAL
或EXCEPTIONAL
),这时需要将当前线程添加到waiters中并阻塞。
2、第二种get提供了超时功能,如果在规定时间内任务还未执行完毕或者状态还是COMPLETING
,则获取结果超时,抛出TimeoutException
。而第一种get会一直阻塞直到state > COMPLETING
。
3、两个方法最终都调用report
方法返回结果:如果state==NORMAL
,将返回结果,否则抛异常
cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// mayInterruptIfRunning参数表示是否要进行中断
if (mayInterruptIfRunning) {
try {
// runner保存着当前执行任务的线程
Thread t = runner;
// 中断线程
if (t != null)
t.interrupt();
} finally { // final state
// 设置最终状态为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
如果状态不是NEW
,或者设置状态为INTERRUPTING
或CANCELLED
失败,则取消失败,返回false。
简单来说有一下两种情况:
1、如果当前任务还没有执行,那么state == NEW
,那么会尝试设置状态,如果设置状态失败会返回false,表示取消失败;
2、如果当前任务已经被执行了,那么state > NEW
,也就是!state == NEW
为true,直接返回false。也就是说,如果任务一旦开始执行了(state != NEW
),那么就不能被取消。
如果mayInterruptIfRunning
为true,要中断当前执行任务的线程。
总结
1、FutureTask是线程安全的,在多线程下任务也只会被执行一次;
2、get方法调用时,如果任务没有结束,要阻塞当前线程,阻塞的线程将保存在一个Treiber Stack中;
3、get方法超时功能如果超时未获取成功,会抛出TimeoutException;