Guava——ListenableFuture

2018-04-02  本文已影响275人  jiangmo

缘由

To simplify matters, Guava extends the Future interface of the JDK with ListenableFuture
We strongly advise that you always use ListenableFuture instead of Future in all of your code, because:

Interface

A ListenableFuture allows you to register callbacks to be executed once the computation is complete, or if the computation is already complete, immediately. This simple addition makes it possible to efficiently support many operations that the basic Future interface cannot support.

The basic operation added by ListenableFuture is addListener(Runnable, Executor), which specifies that when the computation represented by this Future is done, the specified Runnable will be run on the specified Executor.

Adding Callbacks

Most users will prefer to use Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor), or the version which defaults to using MoreExecutors.directExecutor(), for use when the callback is fast and lightweight. A FutureCallback<V> implements two methods:

Creation

Corresponding to the JDK ExecutorService.submit(Callable) approach to initiating an asynchronous computation, Guava provides the ListeningExecutorService interface, which returns a ListenableFuture wherever ExecutorService would return a normal Future. To convert an ExecutorService to a ListeningExecutorService, just useMoreExecutors.listeningDecorator(ExecutorService).

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Alternatively, if you're converting from an API based on FutureTask, Guava offers ListenableFutureTask.create(Callable<V>) and ListenableFutureTask.create(Runnable, V). Unlike the JDK, ListenableFutureTask is not meant to be extended directly.

If you prefer an abstraction in which you set the value of the future rather than implementing a method to compute the value, consider extending AbstractFuture<V> or using SettableFuture directly.

If you must convert a Future provided by another API to an ListenableFuture, you may have no choice but to use the heavyweight JdkFutureAdapters.listenInPoolThread(Future) to convert a Future to a ListenableFuture.
Whenever possible, it is preferred to modify the original code to return a ListenableFuture.

Application

The most important reason to use ListenableFuture is that it becomes possible to have complex chains of asynchronous operations.

* An AsyncFunction<A, B> provides one method, ListenableFuture<B> apply(A input). It can be used to asynchronously transform a value.

ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
  new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture<QueryResult> apply(RowKey rowKey) {
      return dataService.read(rowKey);
    }
  };
ListenableFuture<QueryResult> queryFuture =
    Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);

Many other operations can be supported efficiently with a ListenableFuture that cannot be supported with a Future alone. Different operations may be executed by different executors, and a single ListenableFuture can have multiple actions waiting upon it.

When several operations should begin as soon as another operation starts -- "fan-out" -- ListenableFuture just works: it triggers all of the requested callbacks. With slightly more work, we can "fan-in," or trigger a ListenableFuture to get computed as soon as several other futures have all finished: see the implementation of Futures.allAsList for an example.

Avoid nested Futures

In cases where code calls a generic interface and returns a Future, it's possible to end up with nested Futures. For example:

executorService.submit(new Callable<ListenableFuture<Foo>() {
  @Override
  public ListenableFuture<Foo> call() {
    return otherExecutorService.submit(otherCallable);
  }
});

would return a ListenableFuture<ListenableFuture<Foo>>.

This code is incorrect, because if a cancel on the outer future races with the completion of the outer future, that cancellation will not be propagated to the inner future.
It's also a common error to check for failure of the other future using get() or a listener, but unless special care is taken an exception thrown fromotherCallable would be suppressed.
To avoid this, all of Guava's future-handling methods (and some from the JDK) have Async versions that safely unwrap this nesting - transform(ListenableFuture<A>, Function<A, B>, Executor) and transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor), or ExecutorService.submit(Callable) and submitAsync(AsyncCallable<A>, Executor), etc.

CheckedFuture

Guava also provides a CheckedFuture<V, X extends Exception> interface. A CheckedFutureis a ListenableFuture that includes versions of the get methods that can throw a checked exception. This makes it easier to create a future that executes logic which can throw an exception. To convert a ListenableFuture to a CheckedFuture, useFutures.makeChecked(ListenableFuture<V>, Function<Exception, X>).

main class

Future局限性

Future 具有局限性。在实际应用中,当需要下载大量图片或视频时,可以使用多线程去下载,提交任务下载后,可以从多个Future中获取下载结果,由于Future获取任务结果是阻塞的,所以将会依次调用Future.get()方法,这样的效率会很低。很可能第一个下载速度很慢,则会拖累整个下载速度。
Future主要功能在于获取任务执行结果和对异步任务的控制。但如果要获取批量任务的执行结果,从上面的例子我们已经可以看到,单使用 Future 是很不方便的

在实际的使用中建议使用Guava ListenableFuture来实现异步非阻塞,目的就是多任务异步执行,通过回调的方方式来获取执行结果而不需轮询任务状态。

Test Code

使用callback

public static void testRateLimiter() {
        ListeningExecutorService executorService = MoreExecutors
                .listeningDecorator(Executors.newCachedThreadPool());

        RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
        List<ListenableFuture<Integer>> listfutures = Lists.newArrayList();
        ListenableFuture<Integer> tmp = null;
        for (int i = 0; i < 10; i++) {
            limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
            tmp = executorService.submit(new Task(i));
            tmp.addListener(new Runnable() {
                @Override
                public void run() {
                    System.out.println("add Listener");
                }
            }, executorService);

            Futures.addCallback(tmp, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer result) {
                    System.out.println("suc"+result);
                }

                @Override
                public void onFailure(Throwable t) {
                    System.out.println("fail"+t.toString());
                }
            });

            listfutures.add(tmp);

        }

        listfutures.forEach(e-> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("e = " + e.get());
                System.out.println("e = " + e.get());
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            } catch (ExecutionException e1) {
                e1.printStackTrace();
            }
        });
    }

    static class Task implements Callable<Integer> {
        private int number;
        public Task(int i){
            this.number = i;
        }
        @Override
        public Integer call() throws Exception {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("call execute.." + number);
            return number;
        }
    }

使用链式future

那如果需要多重回调呢?

方法 描述
transform 加一个回调函数
allAsList 返回一个ListenableFuture ,该ListenableFuture 返回的result是一个List,List中的值是每个ListenableFuture的返回值,假如传入的其中之一fails或者cancel,这个Future fails 或者canceled
successAsList 返回一个ListenableFuture ,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Failed或者cancel,则用null替代
public static void testLinkedFutureLisener() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        final ListeningExecutorService poolService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
        ListenableFuture<String> futureBase = poolService.submit(new Task("task1"));
        Futures.addCallback(futureBase, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println("onSuccess result = " + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("onFailure result = " + t.toString());

            }
        });

        // 链式1

        ListenableFuture<String> base_1 = Futures.transform(futureBase, new AsyncFunction<String, String>() {
            public ListenableFuture<String> apply(final String input) throws Exception {
                ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                    public String call() throws Exception {
                        System.out.println("base_1回调线程正在执行...input:"+input);
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("base_1回调线程 done");

                        return input + " & base_1回调线程的结果 ";
                    }
                });
                return temp;
            }
        }, poolService);

        ListenableFuture<String> base_2 = Futures.transform(futureBase, new AsyncFunction<String, String>() {
            public ListenableFuture<String> apply(final String input) throws Exception {
                ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                    public String call() throws Exception {
                        System.out.println("base_2回调线程正在执行...input:"+input);
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("base_2回调线程 done");

                        return input + " & base_2回调线程的结果 ";
                    }
                });
                return temp;
            }
        }, poolService);

        ListenableFuture<String> first = Futures.transform(base_2, new AsyncFunction<String, String>() {
            public ListenableFuture<String> apply(final String input) throws Exception {
                ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                    public String call() throws Exception {
                        System.out.println("first回调线程正在执行...input:"+input);
                        try {
                            String resBase1 =  base_1.get();
                            System.out.println("resBase1 = " + resBase1);
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("first 回调线程 done");

                        return input + " & first回调线程的结果 ";
                    }
                });
                return temp;
            }
        }, poolService);

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        poolService.shutdown();
    }

// 运行结果:
task1 doing
task1done
onSuccess result = task1
base_2回调线程正在执行...input:task1
base_1回调线程正在执行...input:task1
base_1回调线程 done
base_2回调线程 done
first回调线程正在执行...input:task1 & base_2回调线程的结果 
resBase1 = task1 & base_1回调线程的结果 
first 回调线程 done

Ref:
https://github.com/google/guava/wiki/ListenableFutureExplained
https://blog.csdn.net/pistolove/article/details/51232004

上一篇下一篇

猜你喜欢

热点阅读