JAVA语言系列:组合式异步编程

2018-10-15  本文已影响0人  wshj

1. 导论

同步API和异步API:同步/异步关注的是消息通知的机制。

  1. 轮询:即监听被调用者的状态,调用者需要每隔一定时间检查一次,效率会很低。
  2. 回调:当被调用者执行完成后,会调用调用者提供的回调函数。

阻塞和非阻塞:阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态、

并行和并发

异步和多线程:异步是目的,而多线程是实现异步的一个手段。多线程需要考虑线程上下文切换带来的负担,并需要考虑死锁的可能。


2. Future 接口:

目的:实现异步计算:把调用线程从潜在耗时的操作中解放出来,让它能继续执行其他工作,不再需要等待耗时操作完成。
原理:返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。
使用

  ExecutorService executor = Executors.newCachedThreadPool();
  Future<Double> future = executor.submit(new Callable<Double>() {
        public Double call() {
        return doSomeLongComputation();
  }});
  doSomethingElse();
  try { 
    // 获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
      Double result = future.get(1, TimeUnit.SECONDS);
  } catch (ExecutionException ee) {
    // 计算抛出一个异常
  } catch (InterruptedException ie) {
    // 当前线程在等待过程中被中断
  } catch (TimeoutException te) {
    // 在Future对象完成之前超过已过期
  }

局限性:很难表述Future结果之间的依赖性,如:


3. CompletableFuture:Java 8 提供的Future 实现

使用CompletableFuture实现异步方法: 将一个耗时的产品价格查询异步化

public Future<Double> getPriceAsync(String product) {
  // 创建CompletableFuture对象,它会包含计算的结果
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread( () -> {
    // 假设calculatePrice是一个耗时任务
    double price = calculatePrice(product);
    // 需长时间计算的任务结束并得出结果时,设置Future的返回值
    futurePrice.complete(price);
  }).start();
  return futurePrice;
}

上述代码问题:异常被限制,如果异步执行计算过程中产生了错误,用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。

解决

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread( () -> {
  try {
    double price = calculatePrice(product);
    futurePrice.complete(price);
  } catch (Exception ex) {
  futurePrice.completeExceptionally(ex);
  }
  }).start();
  return futurePrice;
}

更加简洁地使用:使用工厂方法supplyAsync创建CompletableFuture

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

4. 多个异步任务

需求:来自4个商店的指定产品的异步查询,返回4个商店的产品价格List<String> findPrices(String product); CPU为4个线程

实现一:使用顺序流,查询耗时大概为4*delay

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

实现二:使用并行流,查询耗时为 delay

public List<String> findPrices(String product) {
  return shops.parallelStream()
    .map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

实现三:使用CompletableFuture。查询耗时为 2* delay。join方法类似于get,但不会抛出检测到的异常。注意这里这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果。先将CompletableFutures对象聚集到一个列表中,让对象们可以在等待其他对象完成操作之前就能启动。

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures =
    shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
    .collect(Collectors.toList());
  return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}

总结

改进CompletableFuture:使用定制的执行器。

《JAVA并发编程实战》建议:
线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
❑NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
❑UCPU是期望的CPU利用率(该值应该介于0和1之间)
❑W/C是等待时间与计算时间的比率

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
  new ThreadFactory() {
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setDaemon(true);//使用守护线程——这种方式不会阻止程序的关停
      return t;
  }
});
// 传递执行器
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);

并行流还是CompletableFuture


5. 多个异步任务的流水线操作

两个相关的异步任务的组合

需求:假设需要在获取到产品价格后还需要进一步获取折扣价格,这是两个异步的远程任务,如何组合?

实现一:流水线方式,耗时随着商店增多线性增长。

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> shop.getPrice(product))  //延迟1秒
    .map(Quote::parse)
    .map(Discount::applyDiscount)  //延迟1秒
    .collect(toList());
}

实现二:同步组合和异步组合

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures =
  shops.stream()
  .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
  .map(future -> future.thenApply(Quote::parse))
  .map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() ->  Discount.applyDiscount(quote), executor)))
  .collect(toList());
  return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}

thenApply方法:由于这里不涉及IO操作,因此采用同步执行。thenApply方法用于对第一步中CompletableFuture连接同步方法。这意味着CompletableFuture最终结束运行时,希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture<String>对象转换为对应的CompletableFuture<Quote>对象。
thenCompose方法:这里涉及到远程操作,因此希望能够异步执行。thenCompose方法允许对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。意味着可以创建两个CompletableFutures对象,对第一个CompletableFuture 对象调用thenCompose , 并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。
thenComposeAsync方法:以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。不带Async的方法和它的前一个任务一样,在同一个线程中运行。

两个不相关的异步任务的组合

需求:不希望等到第一个任务完全结束才开始第二项任务。
实现:合并两个独立的CompletableFuture对象

Future<Double> futurePriceInUSD =
  CompletableFuture.supplyAsync(() -> shop.getPrice(product)).thenCombine(
    CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
    (price, rate) -> price * rate
);

thenCombine方法:接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。
thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

上一篇 下一篇

猜你喜欢

热点阅读