springboot线程池

2022-10-07  本文已影响0人  sunpy

什么时候在项目中使用线程池


线程池基础知识


https://www.jianshu.com/p/f5e2c8b6ed75

springboot中使用自定义线程池


  1. 配置线程池 appliction.yml
thread :
  param:
    corePoolSize: 5
    maxPoolSize: 5
    keepAliveSeconds: 10
    queueCapacity: 500
    allowCoreThreadTimeOut: false
  1. 编写线程池配置类
@EnableAsync
@Data
@Configuration
public class ThreadConfig {

    @Autowired
    private ThreadParamModel threadParam;

    @Bean("asyncExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 配置核心线程数量
        executor.setCorePoolSize(threadParam.getCorePoolSize());
        // 配置最大线程数
        executor.setMaxPoolSize(threadParam.getMaxPoolSize());
        // 配置队列容量
        executor.setQueueCapacity(threadParam.getQueueCapacity());
        // 配置空闲线程存活时间
        executor.setKeepAliveSeconds(threadParam.getKeepAliveSeconds());
        // 是否允许核心线程超时
        executor.setAllowCoreThreadTimeOut(threadParam.isAllowCoreThreadTimeOut());
        // 设置拒绝策略,直接在execute方法的调用线程中运行被拒绝的任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }
}
  1. 业务方法中使用线程池
    service层:
@Async(value = "asyncExecutor")
@Override
public void insertUserForBatch(List<UserBO> userBOList) throws CommonException {
    log.info("==================" + Thread.currentThread().getName());
    log.info("================== 开始时间:" + TimeUtil.getLocalDateTime());
    Long datacenterId = machine.getDatacenterId();
    Long workerId = machine.getWorkerId();
    String createBy = "machine-" +
            datacenterId +
            "-" +
            workerId;
    SnowFlakeIdUtil idUtil = new SnowFlakeIdUtil(workerId, datacenterId);

    List<User> userList = CopyUtil.copyList(userBOList, User.class);

    userList.forEach(user -> {
        user.setUserId(idUtil.genNextId());
        user.setCreateTime(TimeUtil.getLocalDateTime());
        user.setUpdateTime(TimeUtil.getLocalDateTime());
        user.setCreateBy(createBy);
    });

    userMapper.insertForBatch(userList);

    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    log.info("================== 结束时间:" + TimeUtil.getLocalDateTime());
}

controller层:

@PostMapping("/insert/batch")
public ResultModel<String> insertBatch() throws CommonException {
    List<UserBO> userBOList = new ArrayList<>();

    for (int i = 200 ; i < 300 ; i++) {
        UserBO userBO = new UserBO();
        userBO.setUserName("张三" + i);
        userBO.setPassword("123456");
        userBOList.add(userBO);
    }

    userService.insertUserForBatch(userBOList);
    ResultModel<String> resultModel = new ResultModel<>();
    resultModel.setTime(TimeUtil.getNowTime());
    return resultModel;
}

测试:

日志:

当发生阻塞的时候主线程直接返回了,没有造成项目阻塞等待。

老方法:使用jdk线程池类实现


封装工具类:

@Slf4j
@Component
public class ThreadService {

    @Autowired
    private ThreadParamModel threadParamModel;

    // 线程池执行器
    private volatile ThreadPoolExecutor executor;

    // 私有化构造子,阻止外部直接实例化对象
    private ThreadService() {}

    /**
     * 获取单例的线程池对象--单例的双重校验
     *
     * @return 线程池
     */
    public ThreadPoolExecutor getThreadPool() {
        if (executor == null) {
            synchronized (ThreadService.class) {
                if (executor == null) {
                    // 获取处理器数量
                    //int cpuNum = Runtime.getRuntime().availableProcessors();
                    // 根据cpu数量,计算出合理的线程并发数
                    //int maximumPoolSize = cpuNum * 2 + 1;

                    executor = new ThreadPoolExecutor(
                            // 核心线程数
                            threadParamModel.getCorePoolSize(),
                            // 最大线程数
                            threadParamModel.getMaxPoolSize(),
                            // 活跃时间
                            threadParamModel.getKeepAliveSeconds(),
                            // 活跃时间单位
                            TimeUnit.SECONDS,
                            // 线程队列
                            new LinkedBlockingDeque<>(threadParamModel.getQueueCapacity()),
                            // 线程工厂
                            Executors.defaultThreadFactory(),
                            // 队列已满,而且当前线程数已经超过最大线程数时的异常处理策略(这里可以自定义拒绝策略)
                            new ThreadPoolExecutor.AbortPolicy() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                                    log.warn("线程等待队列已满,当前运行线程总数:{},活动线程数:{},等待运行任务数:{}",
                                            e.getPoolSize(),
                                            e.getActiveCount(),
                                            e.getQueue().size());
                                }
                            }
                    );

                    executor.allowCoreThreadTimeOut(false);
                }
            }
        }
        return executor;
    }

    /**
     * 向线程池提交一个任务,返回线程结果
     *
     * @param callable 任务
     * @return 处理结果
     */
    public <T> Future<T> submit(Callable<T> callable) {
        return getThreadPool().submit(callable);
    }

    /**
     * 向线程池提交一个任务,不关心处理结果
     *
     * @param runnable 任务
     */
    public void execute(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        getThreadPool().execute(runnable);
    }

    /**
     * 获取当前线程池线程数量
     */
    public int getSize() {
        return getThreadPool().getPoolSize();
    }

    /**
     * 获取当前活动的线程数量
     */
    public int getActiveCount() {
        return getThreadPool().getActiveCount();
    }

    /**
     * 从线程队列中移除对象
     */
    public void cancel(Runnable runnable) {
        if (executor != null) {
            getThreadPool().getQueue().remove(runnable);
        }
    }
}

业务方法调用:

@Autowired
private ThreadService threadService;

@Override
public void insertUserForBatch(List<UserBO> userBOList) throws CommonException {
    log.info("==================" + Thread.currentThread().getName());
    log.info("================== 开始时间:" + TimeUtil.getLocalDateTime());
    Long datacenterId = machine.getDatacenterId();
    Long workerId = machine.getWorkerId();
    String createBy = "machine-" +
            datacenterId +
            "-" +
            workerId;
    SnowFlakeIdUtil idUtil = new SnowFlakeIdUtil(workerId, datacenterId);

    List<User> userList = CopyUtil.copyList(userBOList, User.class);

    userList.forEach(user -> {
        user.setUserId(idUtil.genNextId());
        user.setCreateTime(TimeUtil.getLocalDateTime());
        user.setUpdateTime(TimeUtil.getLocalDateTime());
        user.setCreateBy(createBy);
    });

    userMapper.insertForBatch(userList);

    threadService.execute(() -> {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("================== 结束时间:" + TimeUtil.getLocalDateTime());
    });

}

测试:

获取异步线程池的返回值


使用service层Future获取返回值:

@Async(value = "asyncExecutor")
@Override
public Future<String> executeAsync() {
    log.info("==================" + Thread.currentThread().getName());
    LocalDateTime start = TimeUtil.getLocalDateTime();
    log.info("================== 开始时间:" + start);

    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    LocalDateTime end = TimeUtil.getLocalDateTime();
    long time = ChronoUnit.SECONDS.between(start, end);
    log.info("================== 结束时间:" + end);

    return new AsyncResult<>(String.valueOf(time));
}

controller层:

@PostMapping("/r/r2")
public ResultModel<String> api2() throws CommonException, ExecutionException, InterruptedException {
    ResultModel<String> resultModel = new ResultModel<>();
    Future<String> future = userService.executeAsync();
    resultModel.setTime(TimeUtil.getNowTime());
    resultModel.setMsg("执行成功");
    resultModel.setRes(future.get());
    return resultModel;
}

测试:

异步多线程组合


service层:

@Async(value = "asyncExecutor")
@Override
public CompletableFuture<String> executeCompletableAsync() {
    LocalDateTime start = TimeUtil.getLocalDateTime();
    log.info("================== 开始时间:" + start);

    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    LocalDateTime end = TimeUtil.getLocalDateTime();
    long time = ChronoUnit.SECONDS.between(start, end);
    log.info("================== 结束时间:" + end);

    return CompletableFuture.completedFuture(String.valueOf(time));
}

controller层:

@PostMapping("/r/r2")
public ResultModel<String> api2() throws CommonException, ExecutionException, InterruptedException {
    ResultModel<String> resultModel = new ResultModel<>();
    CompletableFuture<String> future1 = userService.executeCompletableAsync();
    CompletableFuture<String> future2 = userService.executeCompletableAsync();
    CompletableFuture<String> future3 = userService.executeCompletableAsync();
    CompletableFuture.allOf(future1, future2, future3);

    resultModel.setTime(TimeUtil.getNowTime());
    resultModel.setMsg("执行成功");
    int res = Integer.parseInt(future1.get()) + Integer.parseInt(future2.get()) + Integer.parseInt(future3.get());
    resultModel.setRes(String.valueOf(res));
    return resultModel;
}

测试:

多线程访问同一个对象

上一篇 下一篇

猜你喜欢

热点阅读