11、Spring 多线程 ThreadPoolTaskExec
2019-12-11 本文已影响0人
俊果果
继续上集SSM集成swagger 和 log4j,这次需要实现在service里面并行插入1000条数据,在全部完成后返回结果
一、添加 'ThreadPoolTaskExecutor' Bean
1、新增配置类ExcutorConfig
@EnableAsync
@Configuration
public class ExcutorConfig {
private static Logger logger = Logger.getLogger(ExcutorConfig.class);
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
logger.info("start executor -->");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置核心线程数
executor.setCorePoolSize(50);
//设置最大线程数
executor.setMaxPoolSize(300);
//设置队列大小
executor.setQueueCapacity(300);
//配置线程池的前缀
executor.setThreadNamePrefix("async-service-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//设置空闲时间
executor.setKeepAliveSeconds(60);
//进行加载
executor.initialize();
return executor;
}
}
这里故意把线程池的数目设置的比较大
注意: 这里需要在 spring-mvc的配置中把config类的报名注册到
component-scan里面
,修改后如下:
2、如果不想通过代码,则在spring-service
中新增如下配置即可
<bean id="asyncServiceExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="50" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="300" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="300" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="60" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
二、编写service
1、修改RoleService
和实现类,添加如下接口
@Override
public int insertRole(Role role) {
return roleMapper.insert(role);
}
接受一个 Role
对象,直接插入到数据表中
2、新增一个TaskExecuterTestService
,用于实现并行插入N条记录
接口
public interface TaskExecuterTestService {
void insertRoles(List<Role> roles);
}
实现类
@Service
public class TaskExecuterTestServiceImpl implements TaskExecuterTestService {
private Logger logger = Logger.getLogger(this.getClass());
@Resource(name = "asyncServiceExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private ApplicationContext applicationContext;
@Override
public void insertRoles(List<Role> roles) {
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < roles.size(); i++) {
Role role = roles.get(i);
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RoleService roleService = applicationContext.getBean(RoleService.class);
int ret = roleService.insertRole(role);
System.out.println("插入Role[" + role.getRoleName()+"]结果: " + ret + ", 当前线程id: " + Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); //这个不管是否异常都需要数量减,否则会被堵塞无法结束
}
}
});
}
try {
countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
System.out.println("all roles insert done..........................................................");
} catch (Exception e) {
logger.error("阻塞异常");
}
}
}
注意事项
- 因为想要每个最终执行插入的逻辑由不同的数据库交互
service
去做,所以这里需要手动在每个Task
里面去获取独立的service bean
image.png - 使用
CountDownLatch
类来实现主线程的等待,在所有子线程工作完成前,主线程会一直等待在下面位置:
image.png
CountDownLatch
类的详解可以参考文章CountDownLatch详解 - 这里用的是
@Resource
来注入的ThreadPoolTaskExecutor
bean,是采用的类配置方式;若使用的xml,可以直接用@Autowired
即可
三、编写Controller
新增一个TaskExecuterTestController
用作入口
@Controller
@RequestMapping("/task")
@Api(value = "/task", tags = {"TaskExecuter测试"})
public class TaskExecuterTestController {
@Autowired
private TaskExecuterTestService taskExecuterTestService;
@RequestMapping(value = "/doTest", method = RequestMethod.GET)
@ResponseBody
public String doTest() {
StopWatch sc = new StopWatch();
sc.start();
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
List<Role> roles = new ArrayList<Role>();
int totalCount = 100000;
for (int i = 0; i < totalCount; i++) {
Role role = new Role();
role.setId(UUID.randomUUID().toString());
role.setRoleName("testRole_" + i);
role.setNote(ft.format(new Date()));
roles.add(role);
}
taskExecuterTestService.insertRoles(roles);
sc.stop();
return "测试完成,插入数据 "+totalCount+" 条,总耗时 " + sc.getTotalTimeMillis() + " 毫秒";
}
}
逻辑很简单,生成100000个role对象,然后调用service并行插入数据库,返回总耗时
四、测试
1、Swagger
直接运行
初始数据库记录如下:
image.png
2、Swagger
运行结果
image.png
插入数据10万条,总耗时
25 s