CompletableFuture实现异步并阻塞获取返回结果,巧

2022-12-25  本文已影响0人  万事俱备就差一个程序员了

CompletableFuture实现异步并阻塞获取返回结果,巧用CompletableFuture返回值解决性能瓶颈,线程池,异步编排

参考: https://blog.csdn.net/LUOHUAPINGWIN/article/details/122222011

      https://blog.csdn.net/sunquan291/article/details/103991184

配置:

gulimall.thread.coreSize=20

gulimall.thread.maxSize=200

gulimall.thread.keepAliveTime=10

读取配置:

package com.xunqi.gulimall.order.config;

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**

* @Description:

* @Created: with IntelliJ IDEA.

* @author: 夏沫止水

* @createTime: 2020-06-23 20:28

**/

@ConfigurationProperties(prefix = "gulimall.thread")

// @Component

@Data

public class ThreadPoolConfigProperties {

    private Integer coreSize;

    private Integer maxSize;

    private Integer keepAliveTime;

}

注入线程池:

package com.xunqi.gulimall.order.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingDeque;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* @Description: 线程池配置类

* @Created: with IntelliJ IDEA.

* @author: 夏沫止水

* @createTime: 2020-06-23 20:24

**/

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)

@Configuration

public class MyThreadConfig {

    @Bean

    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {

        return new ThreadPoolExecutor(

                pool.getCoreSize(),

                pool.getMaxSize(),

                pool.getKeepAliveTime(),

                TimeUnit.SECONDS,

                new LinkedBlockingDeque<>(100000),

                Executors.defaultThreadFactory(),

                new ThreadPoolExecutor.AbortPolicy()

        );

    }

}

使用:

    @Autowired

    private ThreadPoolExecutor threadPoolExecutor;

@Override

    public List<WxUserInfo> getWxUserInfoByUid(String appid, List<Long> uidList) {

        // 数据太多了.分片执行

        List<List<Long>> uidListGroupList = CollectionUtil.split(uidList, 500);

        List<CompletableFuture<List<WxUserInfo>>> futures = uidListGroupList.stream().map(list -> {

            return CompletableFuture.supplyAsync(() -> {

                RestResult<List<WxUserInfo>> wxUserInfoByAppIdUid = passportFeignService.getWxUserInfoByAppIdUid(appid, list, appName);

                return wxUserInfoByAppIdUid.getData();

            }, threadPoolExecutor);

        }).collect(Collectors.toList());

        // List<WxUserInfo> collect = futures.stream().map(p -> {

        //    try {

        //        return p.get();

        //    } catch (InterruptedException e) {

        //        e.printStackTrace();

        //    } catch (ExecutionException e) {

        //        e.printStackTrace();

        //    }

        //    return null;

        // }).filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList());

        List<WxUserInfo> biddingList = futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList());

        return biddingList;

    }

上一篇 下一篇

猜你喜欢

热点阅读