js css htmljava 基础并发和多线程

Java异步任务编排—CompletableFuture(二)

2023-02-12  本文已影响0人  雪飘千里

CompletableFuture API

1 创建异步任务 API

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:

举个栗子:

public static void main(String[] args) {
    //可以自定义线程池
    ExecutorService executor = Executors.newCachedThreadPool();
    //runAsync的使用
    CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,为了部落"), executor);
    //supplyAsync的使用
    CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                System.out.print("supplyAsync,为了联盟");
                return "哈哈哈哈哈"; }, executor);
    //runAsync的future没有返回值,输出null
    System.out.println(runFuture.join());
    //supplyAsync的future,有返回值
    System.out.println(supplyFuture.join());
    executor.shutdown(); // 线程池需要关闭
}



//输出
runAsync,为了部落
null
supplyAsync,为了联盟哈哈哈哈哈

2 依赖关系

3 组合关系

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
//想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用 thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

applyToEither / acceptEither / runAfterEither 都表示将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务

//第一个异步任务,休眠2秒,保证它执行晚点
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{
                Thread.sleep(2000L);
                System.out.println("执行完第一个异步任务");}
            catch (Exception e){
                return "第一个任务异常";
            }
            return "第一个异步任务";
        });

        //第二个异步任务
        CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第一个任务还在睡觉,这是第二个任务";}
                        );
                
        CompletableFuture acceptEither =  second.acceptEitherAsync(first, result ->System.out.println(result+"==acceptEither"));
        CompletableFuture applyToEither = second.applyToEitherAsync(first,result->{
            System.out.println(result+"==applyToEither");
            return result;
        });
        CompletableFuture runAfterEither =  second.runAfterEitherAsync(first, () ->System.out.println("hello"));
CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2   = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture  = CompletableFuture.allOf(future1, future2, future3);
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

allOf局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);
 CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello";
            }
        );

        CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
        CompletableFuture<Object> combinedFuture  = CompletableFuture.anyOf(future1, future3);

        System.out.println(combinedFuture.get());

        System.out.println(future1.get());
        System.out.println(future3.get());
        
        //结果
        World
        Hello
        World

4 结果处理 异常捕获

     CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
     CompletableFuture<String> future3   = future1.whenComplete((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
        });

    System.out.println(future3.get());
    CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "world";
        });

   System.out.println(future3.get());
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    throw new RuntimeException();
                }
        );

        CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
            e.printStackTrace();
            return "歪歪歪?你的程序异常啦";
        });

        System.out.println(exceptionFuture.get());

5 超时处理

JDK 8 版本的CompletableFuture 没有timeout机制,timeout机制是指,如果forkjoin-pool(或者自定义线程池)中一个线程在规定时间内没有返回,那么就结束掉,而不是继续执行直到获取结果,比如main线程200ms内返回,但forkjoin-pool(或者自定义线程池)中某个执行线程执行400ms才返回,而其返回值根本没有被使用到。

实现方案:启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),然后用 acceptEither() 或者 applyToEither 看是先计算完成还是先超时:

public class FutureUtil {

    /**
     * cpu 核心数
     */
    private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

    // 最大超时时间
    private static final int TIMEOUT_VALUE = 1500;
    // 时间单位
    private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;


    /**
     * Singleton delay scheduler, used only for starting and * cancelling tasks.
     */
    public static final class Delayer {

        static final ScheduledThreadPoolExecutor delayer;

        /**
         * 异常线程,不做请求处理,只抛出异常
         */
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }

        static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }

        static final class DaemonThreadFactory implements ThreadFactory {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureScheduler");
                return t;
            }
        }
    }

    /**
     * 根据服务器cpu自定义线程池
     */
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            AVALIABLE_PROCESSORS,
            3 * AVALIABLE_PROCESSORS,
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(20),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    /**
     * 有返回值的异步
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
        return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
    }

    /**
     * 有返回值的异步 - 可设置超时时间
     * @param timeout
     * @param unit
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
        return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 无返回值的异步
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(Runnable runnable){
        return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
    }

    /**
     * 无返回值的异步 - 可设置超时时间
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
        return CompletableFuture.runAsync(runnable,threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 统一处理异步结果
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(CompletableFuture... futures){
        return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
    }

    /**
     * 统一处理异步结果 - 可设置超时时间
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
        return CompletableFuture.allOf(futures)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 异步超时处理
     * @param timeout
     * @param unit
     * @param <T>
     * @return
     */
    public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return result;
    }

    public static <T> CompletableFuture<T> timeoutAfter() {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
        return result;
    }

}

使用demo

 CompletableFuture<String> future1    = FutureUtil.supplyAsync(10,TimeUnit.MILLISECONDS,() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello";
        });

        CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "world";
        });

        System.out.println(future3.get());
image.png

在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 方法,来准确实现异步超时控制。实现原理跟上面是一样的。

6 线程阻塞问题

要合理治理线程资源,最基本的前提条件就是要在写代码时,清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下CompletableFuture的执行线程情况。

CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。

同步方法(即不带Async后缀的方法)有两种情况。

异步方法(即带Async后缀的方法):

例如:

ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
    //业务操作
    return "";
}, threadPool1);
//此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
future1.thenApply(value -> {
    System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
    return value + "1";
});
//使用ForkJoinPool中的共用线程池CommonPool
future1.thenApplyAsync(value -> {
//do something
  return value + "1";
});
//使用指定线程池
future1.thenApplyAsync(value -> {
//do something
  return value + "1";
}, threadPool1);

7 线程池死锁问题

前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

线程池循环引用会导致死锁

public Object doGet() {
  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  //do sth
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
    }, threadPool1);
  return cf1.join();
}

如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。

8 异步RPC调用注意不要阻塞IO线程池

服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。

上一篇下一篇

猜你喜欢

热点阅读