FutureTask
这篇文章写的都是一些比较基础的内容,也就是从API层面解释一下我们平时用的比较多的东西
在Java中使用多线程,本质上还是对Thread对象的操作。线程池只是为了方便对线程的管理,避免频繁的创建和销毁线程带来不必要的系统开销,内部通过指定的线程数和阻塞队列实现。
基本使用
创建一个Thread对象的时候一般会传递一个Runnable对象,任务逻辑就写在Runnable的run方法中。感觉这个Runnable的名字取得不太好,如果叫Task是不是会更好一些呢?
new Thread(()-> doXX() ).start();
获取返回值
上面的那种方式使用起来是挺简单,但会遇到一些问题,比如:能获取返回值不?
通过全局变量
像上面这样是没办法获取返回值的,所以我们需要做一些处理,比如,将结果赋值给一个全局变量
private static int result;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println("处理业务逻辑");
result = 1000;
}).start();
Thread.sleep(1000);
System.out.println(result);
}
result
就是一个全局变量,当任务执行完成之后,更新这个值。这其实都不能算是返回值,但有时候也能用:不需要立即知道任务的执行结果,在访问全部变量的时候,只需要获取它的值就好了。比如通过定时任务去更新缓存,不需要关注任务什么时候执行完成,我需要的只是缓存的值,任务执行了就获取最新的值,没有执行就获取旧值。
通过空轮询
那假如我就是想现在获取返回值咋办?因为我要用这个返回值作为下面逻辑的输入。那或许可以通过轮询的方式检测全局变量来达到目的?
while(result == null){
}
除了白白浪费CPU,好像也行啊?但我现在考虑的只是两个线程,如果有多个线程该对全局变量修改该怎么办呢?那用ThreadLocal?算了,就此打住吧
通过简单封装
或许可以封装一下?再封装之前,先考虑几个问题
- 任务的逻辑定义在哪里?
如果用Runnable,就无法返回值,所以可以定义一个有返回值的@FunctionalInterface接口,叫Task
- 返回的值存到哪里?怎么返回?Thread没有相关的方法,扩展一下?
public static void main(String[] args) throws InterruptedException {
CallableThread callableThread = new CallableThread(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "ccccc";
});
callableThread.start();
System.out.println("开始时间 " + LocalDateTime.now());
System.out.println(callableThread.get());
System.out.println("结束时间 " + LocalDateTime.now());
}
class CallableThread<T> extends Thread {
private Task<T> task;
private T result;
private volatile boolean finished = false;
public CallableThread(Task<T> task) {
this.task = task;
}
@Override
public void run() {
synchronized (this) {
result = task.call();
finished = true;
notifyAll();
}
}
public T get() {
synchronized (this) {
while (!finished) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return result;
}
}
}
@FunctionalInterface
interface Task<T> {
T call();
}
这样貌似也可以,但是不太好。Thread本来只是用于处理和线程相关的事情,现在将它和逻辑(Task)绑定在一起,如果有多个任务想共用一个Thread,那返回值怎么处理?
是否可以将这部分逻辑抽出来,放到一个新类当中?
public static void main(String[] args) throws InterruptedException {
MyRunnable<String> myRunnable = new MyRunnable(() -> {
// 模拟耗时的业务操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "我是结果";
});
System.out.println("开始时间 " + LocalDateTime.now());
new Thread(myRunnable).start();
System.out.println("result: " + myRunnable.get());
System.out.println("结束时间 " + LocalDateTime.now());
}
class MyRunnable<T> implements Runnable {
private Task<T> task;
private T result;
private volatile boolean finished = false;
public MyRunnable(Task<T> task) {
this.task = task;
}
@Override
public void run() {
synchronized (this) {
result = task.call();
finished = true;
notifyAll();
}
}
public T get() {
synchronized (this) {
while (!finished) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return result;
}
}
}
这不是和java里面的Future有点像吗?确实有点像
Future模式
Future
里面有几个比较核心的概念
- Future:抽象出
获取任务返回值
、获取任务执行状态
等常用方法的接口 - Callable:类似于上面的 Task
- FutureTask:类似于上面的 MyRunnable
下面看一个例子
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> future = new FutureTask<>(() -> {
Thread.sleep(3000);
System.out.println(System.currentTimeMillis());
return "hehehh";
});
new Thread(future).start();
System.out.println("Start Get Result : " + System.currentTimeMillis());
System.out.println("Get Result : " + future.get() + System.currentTimeMillis());
}
Future
Future
接口除了提供获取返回值的接口,还提供了一些其他的接口,根据名字大概也可以猜到什么意思,不过多解释了。实在不行看看源码吧,这样子就很愉快了。
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
FutureTask
FutureTask
同时实现了Runnable
和Future
接口,
任务状态
在FutureTask
中,任务的不同状态通过state
变量来表示,状态有以下几种:
/*
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
任务执行
因为FutureTask
本身也实现了 Runnable 接口,所以核心关注它的run方法,执行逻辑其实比较简单:
- 先判断状态,如果不为
NEW
或者通过cas更新runner
失败,则直接返回 - 执行
Callable#call
方法,根据执行结果,设置状态,
如果执行成功:先将state设置成COMPLETING
,然后保存返回的结果保存到属性outcome
,再将state设置成NORMAL
,最后通过LockSupport.unpark(t)
解除阻塞的线程;
如果执行失败:先将state设置成COMPLETING
,然后异常信息保存到属性outcome
,再将state设置成EXCEPTIONAL
,最后通过LockSupport.unpark(t)
解除阻塞的线程;
如何阻塞
当我们通过FutureTask#get
方法获取返回值的时候,会阻塞当前线程,那是通过什么方式阻塞当前线程的?是通过LockSupport
阻塞的,这个推荐看看博客吧。我也是看博客的,自己也解释的没人家好,嗯,就是这样的
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// state > COMPLETING ,说明任务要么正常执行,要么异常结束,所以这里可以直接返回
if (s > COMPLETING) {
if (q != null)
q.thread = null; // 这应该是help GC吧?
return s;
}
// 如果正在收尾阶段,交出CPU, 等下次循环
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
// 通过UNSAFE 设置 waiters
else if (!queued)
// 将新的`WaitNode`添加到单向链表的头部,waiters即对应头节点
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this); // 阻塞当前线程
}
}
上面我们看到了有一个waiters
,这是用来干嘛的呢?它是一个单向链表结构,主要是为了处理多次调用FutureTask#get
的情况,每调用一次FutureTask#get
就会生成一个WaitNode
节点,然后将它添加到单向链表的头部
那什么时候用到这个链表呢?在任务执行完成的时候,会执行finishCompletion
方法,主要就是从头节点依次往下遍历,获取节点的thread
属性,然后执行LockSupport.unpark(thread)
解除阻塞
回调如何处理
相对之前的那种方式来说,FutureTask
已经很好用了,直接通过FutureTask#get
方法就可以获取返回值了,确实蛮方便的。
不过方便是方便,但假如我想在获取返回值之后执行一些其他的逻辑该怎么处理呢?其实我最直接的想法就是回调了。比如,我们可以对上面的MyRunnable
代码再扩展一下,例如
public MyRunnable addListener(Consumer c) {
// 这里是一个例子,肯定不会每次都new一个线程,一般是使用线程池
while (!finished) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
c.accept(result);
}).start();
return this;
}
我们给MyRunnable
添加了一个addListener
方法,接收一个Consumer
作为入参,当任务执行完成之后就执行这段逻辑,如下:
public static void main(String[] args) throws InterruptedException {
MyRunnable<String> myRunnable = new MyRunnable(() -> {
// 模拟耗时的业务操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "我是结果";
});
System.out.println("开始时间 " + LocalDateTime.now());
new Thread(myRunnable).start();
myRunnable.addListener(result -> {
System.out.println("当xxx执行完成之后,线程:" + Thread.currentThread().getName() + " 执行一些其他的任务");
result = result + " ggggg";
System.out.println(result);
});
}
ListenableFuture
ListenableFuture
是guava
包里面的,对Future
进行了增强,ListenableFuture
继承了Future
,新增了一个添加回调的方法
/**
* @param listener the listener to run when the computation is complete 回调逻辑
* @param executor the executor to run the listener in 回调在哪个线程池执行
*/
void addListener(Runnable listener, Executor executor);
ListenableFutureTask
继承了FutureTask
并且是实现了ListenableFuture
接口,看一个简单例子
public static void main(String[] args) throws InterruptedException {
ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {
System.out.println("执行任务开始 " + LocalDateTime.now());
Thread.sleep(3000);
System.out.println("执行任务完成 " + LocalDateTime.now());
return "结果";
});
futureTask.addListener(() -> System.out.println("获取结果之后,输出一条日志"), MoreExecutors.directExecutor());
new Thread(futureTask).start();
}
源码分析
原理就是将所有回调维护在一个单向链表中,也就是ExecutionList
,然后通过重写``FutureTask#done`方法,在任务完成之后执行回调逻辑
// 每个回调就相当于是一个RunnableExecutorPair节点,所有RunnableExecutorPair节点构成一条链表,头插链表
private final ExecutionList executionList = new ExecutionList();
// ListenableFutureTask#addListener
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}
// ExecutionList#add
public void add(Runnable runnable, Executor executor) {
// 上锁,因为它的内部属性 executed 可能会被任务逻辑线程更新,即 ListenableFutureTask 实现了 FutureTask 的done方法,然后会在里面更新 executed 的值为true
// 还有一点,如果不加锁,当多个线程同时添加回调的时候,可能会造成节点丢失
synchronized (this) {
// 如果任务还没有执行完成,就将当前节点添加到头节点
if (!executed) {
runnables = new RunnableExecutorPair(runnable, executor, runnables);
return;
}
}
// 如果任务执行完成,就开始执行回调
executeListener(runnable, executor);
}
// ExecutionList#executeListener
private static void executeListener(Runnable runnable, Executor executor) {
try {
// 直接将任务交给线程池
executor.execute(runnable);
} catch (RuntimeException e) {
log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
}
}
// ExecutionList.RunnableExecutorPair
private static final class RunnableExecutorPair {
final Runnable runnable;
final Executor executor;
@Nullable RunnableExecutorPair next;
RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
this.runnable = runnable;
this.executor = executor;
this.next = next;
}
}
ListenableFutureTask
是怎么知道任务是否执行完成了呢?
在FutureTask#finishCompletion
方法中,解除阻塞的线程之后,还会执行一个done
方法,不过该方法在FutureTask
没有任何逻辑,可以把它当作是一个模板方法,而ListenableFutureTask
实现了该方法,如下:
// ListenableFutureTask#done
protected void done() {
executionList.execute();
}
// ExecutionList#execute
public void execute() {
RunnableExecutorPair list;
synchronized (this) {
if (executed) {
return;
}
// 首先将executed置为true
executed = true;
// runnables代表链表的头节点
list = runnables;
runnables = null; // allow GC to free listeners even if this stays around for a while.
}
RunnableExecutorPair reversedList = null;
// 这其实是一个倒置的过程,因为我们添加节点的时候,是插入到头部的,为了保证回调按照我们添加时的顺序执行,即 先添加先执行,所以做了一个倒置
while (list != null) {
RunnableExecutorPair tmp = list;
list = list.next;
tmp.next = reversedList;
reversedList = tmp;
}
// 遍历链表,依次执行回调逻辑
while (reversedList != null) {
executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}
FutureCallback
通过ListenableFutureTask
,我们可以在任务执行完成之后执行一些回调逻辑。可是细心的同学会发现,回调方法无法使用任务的返回值
,那假如我就是想先获取值然后再用这个返回值做下一步操作怎么办?还是只能先通过get方法阻塞当前线程吗?其实guava
包中也给了我们相关的接口。先看一个例子:
public static void main(String[] args) throws InterruptedException {
ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {
System.out.println("执行任务开始 " + LocalDateTime.now());
Thread.sleep(3000);
System.out.println("执行任务完成 " + LocalDateTime.now());
return "结果";
});
Futures.addCallback(futureTask, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("执行成功: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("执行失败");
}
});
new Thread(futureTask).start();
}
源码分析
FutureCallback
接口里面有两个方法,分别对应任务执行成功逻辑和任务失败逻辑
void onSuccess(@Nullable V result);
void onFailure(Throwable t);
Futures
可以堪称是一个门面类,里面封装了一些操作
// Futures#addCallback
public static <V> void addCallback(
ListenableFuture<V> future, FutureCallback<? super V> callback) {
// 这里使用了DirectExecutor线程池,即直接在当前线程执行
addCallback(future, callback, directExecutor());
}
// Futures#addCallback
public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {
Runnable callbackListener =
new Runnable() {
@Override
public void run() {
final V value;
try {
value = getDone(future);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
return;
} catch (RuntimeException e) {
callback.onFailure(e);
return;
} catch (Error e) {
callback.onFailure(e);
return;
}
callback.onSuccess(value);
}
};
// 最终还是将这部分逻辑封装成一个回调,然后在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法
future.addListener(callbackListener, executor);
}
// Futures#getDone
public static <V> V getDone(Future<V> future) throws ExecutionException {
checkState(future.isDone(), "Future was expected to be done: %s", future);
return getUninterruptibly(future);
}
public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
boolean interrupted = false;
try {
while (true) {
try {
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
本质上其实就是将获取返回值的逻辑封装成一个回调,在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback
方法,不过在使用上却方便了好多。
与我们直接通过get方法获取返回值然后再执行其他逻辑还是有区别的,因为我们直接调用Future#get
方法会阻塞当前线程,而guava
是在回调中执行这部逻辑,类似于一种通知机制,所以不会阻塞当前线程。
ListenableFutureTask
其实Spring里面也有一个ListenableFutureTask
,实现上和guava
大同小异,也是继承了FutureTask
并且实现了自己的ListenableFuture
接口,通过重写FutureTask#done
方法,在该方法中获取返回值然后执行回调逻辑
public static void main(String[] args) {
ListenableFutureTask future = new ListenableFutureTask(() -> "结果");
future.addCallback(new ListenableFutureCallback() {
@Override
public void onSuccess(Object result) {
System.out.println("callback " + result);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("执行失败 ");
}
});
new Thread(future).start();
}
核心源码
它的Callback是保存在两个Queue里面的:successCallbacks
,failureCallbacks
,数据结构是LinkedList
private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();
private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();
重写的done方法如下,逻辑很简单,就不解释了
protected void done() {
Throwable cause;
try {
T result = get();
this.callbacks.success(result);
return;
}catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}catch (ExecutionException ex) {
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable ex) {
cause = ex;
}
this.callbacks.failure(cause);
}
CompletableFuture
可能是之前的Future
功能太少了,所以Java8推出了CompletableFuture
,功能强大,除了上面说的那些功能,还有很多其他的功能,反正就是吊炸天。而且从DUBBO 2.7
开始异步处理都是通过CompletableFuture
来实现。
CompletableFuture
ForkJoinPoll
总结
总结下来就发现,那些很好用的API,真的是封装的好啊。所以,设计模式真的很重要啊,老铁。。。。。。