Java8 CompletableFuture 组合式异步编程

2020-03-28  本文已影响0人  巴巴11

问题:

工具:
Future接口,尤其是它的新版实现CompletableFuture

Future接口,异步操作
要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
return doSomeLongComputation();
}});
doSomethingElse();
try {
Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
// 计算抛出一个异常
} catch (InterruptedException ie) {
// 当前线程在等待过程中被中断
} catch (TimeoutException te) {
// 在Future对象完成之前超过已过期
}

Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果

CompletableFuture和Future的关系就跟Stream和Collection的关系一样。

阻塞式调用
非阻塞式调用

实现异步 API,非阻塞式调用

Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。
这意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。

public class Shop {
    // 同步API,阻塞式调用
    public double getPrice(String product) {
        return calculatePrice(product);
    }
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread( () -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }


    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime
+ " msecs");
// 执行更多任务,比如查询其他商店
doSomethingElse();
// 在计算商品价格的同时
try {
    double price = futurePrice.get();
    System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
    throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");

// 抛出CompletableFuture内的异常
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
try {
    double price = calculatePrice(product);
    futurePrice.complete(price);
} catch (Exception ex) {
    futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}

Runtime.getRuntime().availableProcessors() 核心数

使用工厂方法supplyAsync创建CompletableFuture:

// CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。
public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");

改进
CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在
Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。

public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " +
shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}

private final Executor executor =
Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});

对多个异步任务进行流水线操作

public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}

Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
);

# 一个模拟生成0.5秒至2.5秒随机延迟的方法
private static final Random random = new Random();
public static void randomDelay() {
int delay = 500 + random.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}


long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream("myPhone27S")
.map(f -> f.thenAccept(
s -> System.out.println(s + " (done in " +
((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "
+ ((System.nanoTime() - start) / 1_000_000) + " msecs");

总结:

执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。

应该尽可能地为客户提供异步API。使用CompletableFuture类提供的特性,你能够轻松地实现这一目标。

CompletableFuture类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常。

将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。

如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个。

你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者它们计算的结果可用时,针对性地执行一些程序。

你可以决定在什么时候结束程序的运行,是等待由CompletableFuture对象构成的列表
中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。
image.png image.png image.png
上一篇 下一篇

猜你喜欢

热点阅读