技术

completablefuture的多数据流的异步操作

2018-06-12  本文已影响15人  老柿子

场景:

假设这种场景:要同时处理多条数据,而且不阻塞调用方,且所有的结果处理完之后,再执行指定的其他的处理。不阻塞我们可以采用异步处理即可,但是这里面还有就是全部执行完之后还有后处理,而且是所有的流都执行完之后。那么这里completablefuture中有一个这样的函数allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

这个就是用于多个流的合并,但是真正将List<CompletableFuture>转变成一个CompletableFuture还需要再尽心封装一层:

/**
 * 全流式处理的融合
 * @param futures
 * @param <T>
 * @return
 */
public <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}

就可以愉快的使用了

测试如下:

**
 * 多数据流的异步操作
 * 也用于多流输入的情况下的合并,这样可以将多流进行合并的输入
 */
@Test
public void allOf(){
    CompletableFuture c1 = CompletableFuture.supplyAsync(()->{
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        show("111");
        return "111";
    });
    CompletableFuture c2 = CompletableFuture.supplyAsync(()->{
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        show("222");
        return "222";
    });
    CompletableFuture c3 = CompletableFuture.supplyAsync(()->{
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        show("333");
        return "333";
    });

    CompletableFuture c4 = sequence(Arrays.asList(c1, c2, c3));
    c4.thenAccept(System.out::println);

    try {
        Thread.sleep(20000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
上一篇下一篇

猜你喜欢

热点阅读