Java 并发: 从任务中产生返回值
通常我们会通过 Thread
类创建执行线程并执行提交的任务(Runnable
)
new Thread(new Runnable() {
@Override
public void run() {
}
}).start();
但是这种方式有个缺陷,它没有返回值,有时候,我们想知道查询任务是否完成,或者我们想在任务执行完毕后立即通知我们,很显单纯的 Thread
类做不到。 然而, Executor
, Callable
, FutureTask
可以帮我们实现,不过,最终执行任务的方式还是 Thread
+ Runnable
,这些类只不过包装了一些逻辑而已。
Callable
public interface Callable<V> {
V call() throws Exception;
}
通常我们说,如果需要返回任务执行的结果,任务需要实现 Callable
接口,而不是 Runnable
接口,这个说法并不是很精确,因为线程中执行的任务永远是 Runnable
,而 Callable
只是在 Runnable
的 run()
方法中调用而已,这个可以在后面的源码分析中可知。
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
代表一个异步任务的结果,可以通过 get()
来查询结果,但是会阻塞当前线程,直到结果返回。也可以通过 cancel()
来取消一个任务。 也可以通过 isCancelled()
和 isDone()
查询任务的状态。
其中 cancel()
方法比较特殊,它会尝试取消当前任务的执行。如果当前任务已经完成,或者已经被取消,或者由于某些原因不能被取消,cancel()
方法就会返回 false
。 而如果 cancel()
方法返回 true
,并且当前任务还没有启动,这个任务将永远不会运行。 而如果这个任务已经启动了,cancel(boolean mayInterruptIfRunning)
的参数 mayInterruptIfRunning
决定了执行当前任务的线程时候被中断,从而来停止任务。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {}
Future
实现了 RunnableFuture
接口,而 RunnableFuture
正如它的名字一样,继承了 Runnable
和 Future
两个接口。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
例子
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit(new Task());
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class Task implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(10);
return "Task completed!";
}
}
执行结果
Task completed!
执行结果很简单,但是 get()
方法是会阻塞主线程 10 秒的,有时候,例如在 Android 中,阻塞主线程这么长时间,这是不可接受的。那么有没有一种方法,能够在任务执行完毕后,自动通知我们呢? 当然有,往下看。
原理
ExecutorService
接口的 submit(Callable)
方法是在 AbstractExecutorService
中实现的。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
submit()
方法首先把 Callable
对象转换为了 RunnableFuture
接口实现类对象,在这里用的是 FutureTask
, 最终也是返回这个 FutureTask
对象,只是最后被转为了更为通用的 Future
接口,因此例子中的 submit()
返回结果也可以强制转换为 FutureTask
,只是我们一般不这样使用它,这个事情后面再说。从这里可以看出,最终执行的任务还是 Runnable
。
由于 RunnableFuture
继承自 Runnable
接口,因此能被 Executor
的 execute(Runnable)
方法执行。当调用 execute(ftask)
提交任务后,就会执行 FutureTask
的 run()
方法,主要源码如下
public void run() {
// ...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// STE1: 调用 Callable 接口的 call() 方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// STEP2: 设置结果
set(result);
}
} finally {
// ...
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 这就是 get() 方法返回的结果
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
// ...
// STEP3: 任务执行完毕后调用 FutureTask 的 done() 方法
done();
callable = null;
}
protected void done() { }
通过我对源码的注释,可以看到 FutureTask.run()
方法,实际做了三件事
- 调用
Callable
的run()
方法,并获取结果。 - 用
outcome
保存返回的结果。通过Future
接口的get()
方法就可以获取这个结果 - 任务执行完毕后,调用
FutureTask
的done()
方法。
FutureTask 的正确使用
从刚才的源码实现中可以发现,提交的是 Callable
对象,而最终执行的是 FutureTask
对象。那么我们完全可以自己用 Callable
创建一个 FutureTask
对象,然后用 Executor
的 execute(Runnable)
方法提交任务。那么,这么做有什么好处呢?因为我们可以复写 FutureTask
中的 done()
方法,从而在任务执行完毕后回调。
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
MyFutureTask futureTask = new MyFutureTask(new Task());
System.out.println(Thread.currentThread().getName() + ": start task");
service.execute(futureTask);
}
}
class MyFutureTask extends FutureTask<String> {
public MyFutureTask(Callable<String> callable) {
super(callable);
}
@Override
protected void done() {
try {
System.out.println(Thread.currentThread().getName() + ": " + get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
;
}
}
class Task implements Callable<String> {
@Override
public String call() throws Exception {
return "Task completed!";
}
}
执行结果如下
main: start task
pool-1-thread-1: Task completed!
根据上面讲的原理,这里的结果并不意外。 从这个例子可以看出,FutureTask
的 done()
方法能够在任务执行完毕后第一时间回调,这就不需要去调用会阻塞当前线程的get()
方法。
另外,从结果看 done()
方法并不是在主线程中执行,如果是像在 Android 中,是不能在工作线程中直接更新 UI 的。
其实我们的代码可以写得再简单点,只是观赏性并不好
FutureTask<String> futureTask = new FutureTask<String>(new Task()){
@Override
protected void done() {
try {
System.out.println(Thread.currentThread().getName() + ": " + get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
};
直接在创建 FutureTask
对象的时候,复写它的 done()
方法。
ExecutorService.submit()
查看 API 发现,ExecutorService
的 submit()
方法可以传入一个 Runnable
对象作为参数。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
通过上面的原理分析可知,submit(Runnable task)
方法的任务返回结果是 null
,而 submit(Runnable task, T result)
返回的结果就是参数 result
.
Runnable 转 Callable
有时候代码中只用了 Runnable
来执行任务,如果我们想给这个任务一个返回结果,就需要把 Runnable
转为 Callable
,然后用 ExecutorService.submit()
来执行。 Executors
提供了静态方法用来转换
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
如果看懂了上面的原理,这里就很简单了 ~