多线程-源码解析RunnableFuture
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
译文:可运行的Future。 成功执行run方法会导致Future的完成,并允许访问其结果。
从源码可知,RunnableFuture继承了Future和Runnable,所以其就具有了可以运行在线程中,能够取消并且能够异步获取到运行结果。接下来学习一下它的一个具体实现类FutureTask
概论
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
*
* <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
* {@link Runnable} object. Because {@code FutureTask} implements
* {@code Runnable}, a {@code FutureTask} can be submitted to an
* {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
* {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V>
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
译文:可取消的异步计算。 此类提供Future的基本实现,其中包含启动和取消计算,查询计算是否完成以及返回计算结果的方法。 只有在计算完成后才能返回结果; 如果计算尚未完成,则get方法将阻塞。 一旦计算完成,除非使用runAndReset调用计算,否则无法重新启动或取消计算。
FutureTask主要成员
public class FutureTask<V> implements RunnableFuture<V> {
/*
* FutureTask中定义了一个state变量,用于记录任务执行的相关状态 ,状态的变化过程如下
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
//主流程状态
private static final int NEW = 0; //当FutureTask实例刚刚创建到callbale的call方法执行完成前,处于此状态
private static final int COMPLETING = 1; //callable的call方法执行完成或出现异常时,首先进行此状态
private static final int NORMAL = 2;//callable的call方法正常结束时,进入此状态,将outcom设置为正常结果
private static final int EXCEPTIONAL = 3;//callable的call方法异常结束时,进入此状态,将outcome设置为抛出的异常
//取消任务执行时可能处于的状态
private static final int CANCELLED= 4;// FutureTask任务尚未执行,即还在任务队列的时候,调用了cancel方法,进入此状态
private static final int INTERRUPTING = 5;// FutureTask的run方法已经在执行,收到中断信号,进入此状态
private static final int INTERRUPTED = 6;// 任务成功中断后,进入此状态
private Callable<V> callable;//需要执行的任务,提示:如果提交的是Runnable对象,会先转换为Callable对象,这是构造方法参数
private Object outcome; //任务运行的结果
private volatile Thread runner;//执行此任务的线程
//等待该FutureTask的线程链表,对于同一个FutureTask,如果多个线程调用了get方法,对应的线程都会加入到waiters链表中,同时当FutureTask执行完成后,也会告知所有waiters中的线程
private volatile WaitNode waiters;
......
}
FutureTask的成员变量并不复杂,主要记录以下几部分信息:
1、状态
2、任务(callable)
3、结果(outcome)
4、等待线程(waiters)
构造方法
Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
FutureTask的方法
run()方法
FutureTask执行任务的方法当然还是run方法:
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//设置结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
//判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
handlePossibleCancellationInterrupt(s);
}
}
run方法的大概逻辑如下:
1、如果状态不为new或者运行线程runner失败,说明当前任务已经被其他线程启动或者已经被执行过,直接返回false
2、调用call方法执行核心任务逻辑。如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程
3、如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING
set()方法
除非已经设置或取消了该Future,否则将此Future的结果设置为给定值
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
finishCompletion()
删除并发送所有等待线程的信号,调用done(),并使callable无效。
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
////通过CAS把栈顶的元素置为null,相当于弹出栈顶元素
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒等待的线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion的逻辑也比较简单:
1、遍历waiters链表,取出每一个节点:每个节点都代表一个正在等待该FutureTask结果(即调用过get方法)的线程
2、通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成
get()方法
get方法很简单,主要就是调用awaitDone方法:
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
awaitDone()
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion or at timeout
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) { //如果state状态大于COMPLETING 则说明任务执行完成,或取消
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)//如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) { //如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) { //构建节点
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)//把当前节点入栈
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
//如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
//如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);//阻塞当前线程nanos秒
}
else
LockSupport.park(this);//阻塞当前线程
}
}
整个方法的大致逻辑主要分为以下几步:
1>如果当前状态值大于COMPLETING,说明已经执行完成或者取消,直接返回
2>如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快
3>如果当前线程是首次进入循环,为当前线程创建wait节点加入到waiters链表中
4>根据是否定时将当前线程挂起(LockSupport.parkNanos LockSupport.park)来阻塞当前线程,直到超时或者线程被finishCompletion方法唤醒
5>当线程挂起超时或者被唤醒后,重新循环执行上述逻辑
cancel()
public boolean cancel(boolean mayInterruptIfRunning) {
//根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
//唤醒所有在get()方法等待的线程
finishCompletion();
}
return true;
}
public class FutureTaskDemo {
public static void main(String[] args) {
FutureTask<String> futureTask=new FutureTask<>(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Hello world");
Thread thread = new Thread(futureTask,"Thread Future");
thread.start();
try {
String result=futureTask.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
运行时线程堆栈信息
D:\android\progect\IdeaDemo>jps
10624 Launcher
1296 FutureTaskDemo
5360 Jps
12872
D:\android\progect\IdeaDemo>jstack 1296
2021-05-13 00:03:51
Full thread dump Java HotSpot(TM) Client VM (25.202-b08 mixed mode, sharing):
"Thread Future" #9 prio=5 os_prio=0 tid=0x15a1d800 nid=0x2040 waiting on condition [0x15cef000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.idea.future.FutureTaskDemo$1.run(FutureTaskDemo.java:13)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
"main" #1 prio=5 os_prio=0 tid=0x02a0dc00 nid=0x18a8 waiting on condition [0x00d4f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x04cfd7e8> (a java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.idea.future.FutureTaskDemo.main(FutureTaskDemo.java:22)
由于我在run方法里面设置了相当于耗时的操作,从堆栈信息可以得知,"Thread Future"的状态是TIMED_WAITING,而主线程"main"则处于WAITING的状态
实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;
又因为实现了 Future 接口,所以也能用来获得任务的执行结果。
提交给 ThreadPoolExecutor 去执行
下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es =
Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
直接被 Thread 执行
FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。
// 创建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();
总结
利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。
利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。