异步处理restful请求

2019-09-28  本文已影响0人  不二不二熊
一、起因

众所周知,toncat线程数量有限,在接收到http请求的时候,为了提高系统的吞吐量,很多时候我们都会选择使用异步来进行处理,从而使得不阻塞主线程。

二、异步处理的两种方式

1. callabel调用

@Slf4j
@RestController
@RequestMapping("/async")
public class AsyncController {
    /**
     * <一句话功能描述>:异步处理服务 
     * <功能详细描述>:
     * @Param:
     * @Return: java.util.concurrent.Callable
     */
    @GetMapping("/order")
    public Callable order(){
        log.info("主线程开始");
        Callable<String> callable = new Callable() {
            @Override
            public String call() throws Exception {
                log.info("副线程开始");
                Thread.sleep(1000);
                log.info("副线程结束");
                return "success";
            }
        };
        log.info("主线程结束");
        return callable;
    }
}

2. DeferredResult,多个系统间调用

@Component
@Data
public class DeferredResultHold {
    private Map<String,DeferredResult<String>>map = new HashMap<>();
}
@Slf4j
@Component
public class OrderQueue {
    private String placeOrder;
    private String completeOrder;

    public String getPlaceOrder() {
        return placeOrder;
    }

    public String getCompleteOrder() {
        return completeOrder;
    }

    public void setPlaceOrder(String placeOrder) throws Exception {
        new Thread(() -> {
            log.info("下单开始:{}", placeOrder);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.completeOrder = placeOrder;
            log.info("下单成功:{}", placeOrder);
        }).start();
    }

    public void setCompleteOrder(String completeOrder) {
        this.completeOrder = completeOrder;
    }
}
@Component
public class DeferredResultListen implements CommandLineRunner {
    @Autowired
    private OrderQueue orderQueue;
    @Autowired
    private DeferredResultHold deferredResultHold;

    @Override
    public void run(String... args) throws Exception {
        new Thread(() -> {
            while (true) {
                if (StringUtils.isNotBlank(orderQueue.getCompleteOrder())) {
                    //调用setResult通知已经完成
                    deferredResultHold.getMap().get(orderQueue.getCompleteOrder())
                    .setResult("do success");
                    orderQueue.setCompleteOrder(null);
                } else {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}
@Slf4j
@RestController
@RequestMapping("/deferredResult")
public class OrderController {
    @Autowired
    private OrderQueue orderQueue;
    @Autowired
    private DeferredResultHold deferredResultHold;

    @GetMapping("/order")
    public DeferredResult<String> order() throws Exception {
        log.info("主线程开始");
        String orderNumber = RandomStringUtils.randomNumeric(8);
        orderQueue.setPlaceOrder(orderNumber);
        DeferredResult<String> deferredResult = new DeferredResult<>();
        deferredResultHold.getMap().put(orderNumber, deferredResult);
        log.info("主线程开始");
        return deferredResult;
    }
}

3. 全局配置,自定义连接池,不使用默认配置的连接池

@Configuration
public class WebConfig {

    /**
     * <一句话功能描述>:异步处理请求的配置
     * <功能详细描述>:
     *
     * @Param:
     * @Return: org.springframework.web.servlet.config.annotation.WebMvcConfigurer
     */
    @Bean
    public WebMvcConfigurer webMvcConfigurer2() {
        return new WebMvcConfigurerAdapter() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                //自定义配置连接池
                configurer.setTaskExecutor(threadPoolTaskExecutor());
            }
        };
    }
    /**
     * <一句话功能描述>:创建线程池
     * <功能详细描述>:
     * @Param:
     * @Return: org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
     */
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
        //核心连接数
        t.setCorePoolSize(10);
        //最大连接数
        t.setMaxPoolSize(50);
        //线程名前缀(可自定义,标识更清楚)
        t.setThreadNamePrefix("asyn");
        return t;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读