java多线程

Guava 官方文档:Concurrency(一)

2020-03-27  本文已影响0人  changhr2013

ListenableFuture

并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以显著的简化并发。为了简化问题,Guava 使用 ListenableFuture 扩展了JDK 的 Future 接口。

我们强烈建议你在所有代码中始终使用 ListenableFuture 而不是 Future,因为:

Interface

传统的 Future 表示异步计算的结果:即 可能已经可能尚未完成计算结果 的计算。一个 Future 可以作为正在进行中的计算的句柄,是服务向我们提供结果的承诺。

ListenableFuture 允许你注册在计算完成后或在计算已经完成时立即执行的回调方法(callbacks)。这个简单的改进使其可以有效地支持许多 JDK 的 Future 接口无法支持的操作。

ListenableFuture 添加的基本操作是 addListener(Runnable, Executor),它指定当完成此 Future 表示的计算时,指定的 Runnable 将在指定的 Executor 上运行。

Adding Callbacks

大多数用户应该会更喜欢使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor). 这个 FutureCallback<V> 接口需要实现两个方法:

Creation

对应于JDK ExecutorService.submit(Callable) 方法来启动异步计算,Guava 提供了 ListeningExecutorService 接口,该接口在 ExecutorService 返回普通 Future 的所有地方都返回了 ListenableFuture。要将 ExecutorService 转换为 ListeningExecutorService,可以使用 MoreExecutors.listeningDecorator(ExecutorService)

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(
    new Callable<Explosion>() {
      public Explosion call() {
        return pushBigRedButton();
      }
    });
Futures.addCallback(
    explosion,
    new FutureCallback<Explosion>() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
        walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
        battleArchNemesis(); // escaped the explosion!
      }
    },
    service);

另外,如果您要从基于 FutureTask 的 API 进行转换,则 Guava 提供了 ListenableFutureTask.create(Callable <V>)ListenableFutureTask.create(Runnable,V)。与 JDK 不同,ListenableFutureTask 不能直接继承。

如果您更喜欢使用抽象的方式来设置 Future 的值,而不是想实现接口中的方法,可以考虑继承抽象类 AbstractFuture <V> 或直接使用 SettableFuture

如果你必须将其他 API 提供的 Future 转换为 ListenableFuture,那么没有什么好的办法,只能使用重量级的 JdkFutureAdapters.listenInPoolThread(Future)Future 转换为 ListenableFuture。有可能的话,最好修改原始代码直接返回 ListenableFuture

Application

使用 ListenableFuture 的最重要原因是可以拥有复杂的异步操作链。

ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
  new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture<QueryResult> apply(RowKey rowKey) {
      return dataService.read(rowKey);
    }
  };
ListenableFuture<QueryResult> queryFuture =
    Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);

ListenableFuture 可以有效地支持许多其他操作,而 Future 不能单独支持。不同的 Executors 可以执行不同的操作,并且单个 ListenableFuture 可以有多个操作在等待它。

只要一个操作开始,其他的一些操作也会立即开始执行 —— “fan-out” —— ListenableFuture 能够满足这样的场景:它将触发所有请求的回调。进一步的,它同时可以满足 “fan-in” 场景,在其它的 Futures 全部计算完成后立即触发 ListenableFuture 获取计算结果:有关示例,可以参考 Futures.allAsList 的实现。

Method Description See also
transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor)* 返回一个新的 ListenableFuture,其结果是将给定的 AsyncFunction 应用于给定的 ListenableFuture 的结果。 transformAsync(ListenableFuture<A>, AsyncFunction<A, B>)
transform(ListenableFuture<A>, Function<A, B>, Executor) 返回一个新的 ListenableFuture,其结果是将给定 Function 应用于给定 ListenableFuture 的结果。 transform(ListenableFuture<A>, Function<A, B>)
allAsList(Iterable<ListenableFuture<V>>) 返回一个 ListenableFuture,其值是一个 List 集合,该集合按顺序包含每个输入 Future 的值。如果有任何输入的 Future 失败或被取消,则该 Future 失败或被取消。 allAsList(ListenableFuture<V>...)
successfulAsList(Iterable<ListenableFuture<V>>) 返回一个 ListenableFuture,其值是一个 List 集合,该集合按顺序包含每个成功输入 Future 的值。与失败或取消的 Future 相对应的值将会被替换为 null successfulAsList(ListenableFuture<V>...)
List<ListenableFuture<QueryResult>> queries;
// The queries go to all different data centers, but we want to wait until they're all done or failed.

ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);

Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);

Avoid nested Futures(避免内嵌的 Future)

在代码调用通用接口并返回 Future 的情况下,最终可能会出现嵌套 Future 的情况。例如:

executorService.submit(new Callable<ListenableFuture<Foo>() {
  @Override
  public ListenableFuture<Foo> call() {
    return otherExecutorService.submit(otherCallable);
  }
});

以上代码将返回一个 ListenableFuture<ListenableFuture<Foo>>。这段代码是不正确的,因为如果外层的 Future 在 complete 之前调用了 cancel 方法,外层的 cancel 就无法传播给内层的 Future,导致内层 Future 无法被取消。另外在内层 Future 调用 get 方法或者使用监听器处理结果时,除非特别的小心,否则很容易出现没有显式处理内层 Future 抛出的异常导致异常被外层 Future 忽略掉。为了避免这种情况,所有的 Guava 提供的 Future 处理方法都有一个异步版本安全的解开了这种嵌套:例如 transform(ListenableFuture<A>, Function<A, B>, Executor) and transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor), or ExecutorService.submit(Callable) and submitAsync(AsyncCallable<A>, Executor), 等等。

上一篇 下一篇

猜你喜欢

热点阅读