springboot线程池
2022-10-07 本文已影响0人
sunpy
什么时候在项目中使用线程池
- 项目中有的任务耗时长,造成项目一直阻塞运行中。
- 多线程任务之间不需要协同的关系。
- 将耗时的任务异步执行,保证主线程的任务可以一直执行下去。
- 譬如我们再导入大量数据,批量插入的时候,可以采用线程池方式。
线程池基础知识
https://www.jianshu.com/p/f5e2c8b6ed75
springboot中使用自定义线程池
- 配置线程池 appliction.yml
thread :
param:
corePoolSize: 5
maxPoolSize: 5
keepAliveSeconds: 10
queueCapacity: 500
allowCoreThreadTimeOut: false
- 编写线程池配置类
@EnableAsync
@Data
@Configuration
public class ThreadConfig {
@Autowired
private ThreadParamModel threadParam;
@Bean("asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数量
executor.setCorePoolSize(threadParam.getCorePoolSize());
// 配置最大线程数
executor.setMaxPoolSize(threadParam.getMaxPoolSize());
// 配置队列容量
executor.setQueueCapacity(threadParam.getQueueCapacity());
// 配置空闲线程存活时间
executor.setKeepAliveSeconds(threadParam.getKeepAliveSeconds());
// 是否允许核心线程超时
executor.setAllowCoreThreadTimeOut(threadParam.isAllowCoreThreadTimeOut());
// 设置拒绝策略,直接在execute方法的调用线程中运行被拒绝的任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
- 业务方法中使用线程池
service层:
@Async(value = "asyncExecutor")
@Override
public void insertUserForBatch(List<UserBO> userBOList) throws CommonException {
log.info("==================" + Thread.currentThread().getName());
log.info("================== 开始时间:" + TimeUtil.getLocalDateTime());
Long datacenterId = machine.getDatacenterId();
Long workerId = machine.getWorkerId();
String createBy = "machine-" +
datacenterId +
"-" +
workerId;
SnowFlakeIdUtil idUtil = new SnowFlakeIdUtil(workerId, datacenterId);
List<User> userList = CopyUtil.copyList(userBOList, User.class);
userList.forEach(user -> {
user.setUserId(idUtil.genNextId());
user.setCreateTime(TimeUtil.getLocalDateTime());
user.setUpdateTime(TimeUtil.getLocalDateTime());
user.setCreateBy(createBy);
});
userMapper.insertForBatch(userList);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("================== 结束时间:" + TimeUtil.getLocalDateTime());
}
controller层:
@PostMapping("/insert/batch")
public ResultModel<String> insertBatch() throws CommonException {
List<UserBO> userBOList = new ArrayList<>();
for (int i = 200 ; i < 300 ; i++) {
UserBO userBO = new UserBO();
userBO.setUserName("张三" + i);
userBO.setPassword("123456");
userBOList.add(userBO);
}
userService.insertUserForBatch(userBOList);
ResultModel<String> resultModel = new ResultModel<>();
resultModel.setTime(TimeUtil.getNowTime());
return resultModel;
}
测试:
日志:
当发生阻塞的时候主线程直接返回了,没有造成项目阻塞等待。
老方法:使用jdk线程池类实现
封装工具类:
@Slf4j
@Component
public class ThreadService {
@Autowired
private ThreadParamModel threadParamModel;
// 线程池执行器
private volatile ThreadPoolExecutor executor;
// 私有化构造子,阻止外部直接实例化对象
private ThreadService() {}
/**
* 获取单例的线程池对象--单例的双重校验
*
* @return 线程池
*/
public ThreadPoolExecutor getThreadPool() {
if (executor == null) {
synchronized (ThreadService.class) {
if (executor == null) {
// 获取处理器数量
//int cpuNum = Runtime.getRuntime().availableProcessors();
// 根据cpu数量,计算出合理的线程并发数
//int maximumPoolSize = cpuNum * 2 + 1;
executor = new ThreadPoolExecutor(
// 核心线程数
threadParamModel.getCorePoolSize(),
// 最大线程数
threadParamModel.getMaxPoolSize(),
// 活跃时间
threadParamModel.getKeepAliveSeconds(),
// 活跃时间单位
TimeUnit.SECONDS,
// 线程队列
new LinkedBlockingDeque<>(threadParamModel.getQueueCapacity()),
// 线程工厂
Executors.defaultThreadFactory(),
// 队列已满,而且当前线程数已经超过最大线程数时的异常处理策略(这里可以自定义拒绝策略)
new ThreadPoolExecutor.AbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.warn("线程等待队列已满,当前运行线程总数:{},活动线程数:{},等待运行任务数:{}",
e.getPoolSize(),
e.getActiveCount(),
e.getQueue().size());
}
}
);
executor.allowCoreThreadTimeOut(false);
}
}
}
return executor;
}
/**
* 向线程池提交一个任务,返回线程结果
*
* @param callable 任务
* @return 处理结果
*/
public <T> Future<T> submit(Callable<T> callable) {
return getThreadPool().submit(callable);
}
/**
* 向线程池提交一个任务,不关心处理结果
*
* @param runnable 任务
*/
public void execute(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
getThreadPool().execute(runnable);
}
/**
* 获取当前线程池线程数量
*/
public int getSize() {
return getThreadPool().getPoolSize();
}
/**
* 获取当前活动的线程数量
*/
public int getActiveCount() {
return getThreadPool().getActiveCount();
}
/**
* 从线程队列中移除对象
*/
public void cancel(Runnable runnable) {
if (executor != null) {
getThreadPool().getQueue().remove(runnable);
}
}
}
业务方法调用:
@Autowired
private ThreadService threadService;
@Override
public void insertUserForBatch(List<UserBO> userBOList) throws CommonException {
log.info("==================" + Thread.currentThread().getName());
log.info("================== 开始时间:" + TimeUtil.getLocalDateTime());
Long datacenterId = machine.getDatacenterId();
Long workerId = machine.getWorkerId();
String createBy = "machine-" +
datacenterId +
"-" +
workerId;
SnowFlakeIdUtil idUtil = new SnowFlakeIdUtil(workerId, datacenterId);
List<User> userList = CopyUtil.copyList(userBOList, User.class);
userList.forEach(user -> {
user.setUserId(idUtil.genNextId());
user.setCreateTime(TimeUtil.getLocalDateTime());
user.setUpdateTime(TimeUtil.getLocalDateTime());
user.setCreateBy(createBy);
});
userMapper.insertForBatch(userList);
threadService.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("================== 结束时间:" + TimeUtil.getLocalDateTime());
});
}
测试:
获取异步线程池的返回值
使用service层Future获取返回值:
@Async(value = "asyncExecutor")
@Override
public Future<String> executeAsync() {
log.info("==================" + Thread.currentThread().getName());
LocalDateTime start = TimeUtil.getLocalDateTime();
log.info("================== 开始时间:" + start);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalDateTime end = TimeUtil.getLocalDateTime();
long time = ChronoUnit.SECONDS.between(start, end);
log.info("================== 结束时间:" + end);
return new AsyncResult<>(String.valueOf(time));
}
controller层:
@PostMapping("/r/r2")
public ResultModel<String> api2() throws CommonException, ExecutionException, InterruptedException {
ResultModel<String> resultModel = new ResultModel<>();
Future<String> future = userService.executeAsync();
resultModel.setTime(TimeUtil.getNowTime());
resultModel.setMsg("执行成功");
resultModel.setRes(future.get());
return resultModel;
}
测试:
异步多线程组合
service层:
@Async(value = "asyncExecutor")
@Override
public CompletableFuture<String> executeCompletableAsync() {
LocalDateTime start = TimeUtil.getLocalDateTime();
log.info("================== 开始时间:" + start);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalDateTime end = TimeUtil.getLocalDateTime();
long time = ChronoUnit.SECONDS.between(start, end);
log.info("================== 结束时间:" + end);
return CompletableFuture.completedFuture(String.valueOf(time));
}
controller层:
@PostMapping("/r/r2")
public ResultModel<String> api2() throws CommonException, ExecutionException, InterruptedException {
ResultModel<String> resultModel = new ResultModel<>();
CompletableFuture<String> future1 = userService.executeCompletableAsync();
CompletableFuture<String> future2 = userService.executeCompletableAsync();
CompletableFuture<String> future3 = userService.executeCompletableAsync();
CompletableFuture.allOf(future1, future2, future3);
resultModel.setTime(TimeUtil.getNowTime());
resultModel.setMsg("执行成功");
int res = Integer.parseInt(future1.get()) + Integer.parseInt(future2.get()) + Integer.parseInt(future3.get());
resultModel.setRes(String.valueOf(res));
return resultModel;
}
测试:
多线程访问同一个对象