多线程Java后端

11、Spring 多线程 ThreadPoolTaskExec

2019-12-11  本文已影响0人  俊果果

继续上集SSM集成swagger 和 log4j,这次需要实现在service里面并行插入1000条数据,在全部完成后返回结果

一、添加 'ThreadPoolTaskExecutor' Bean

1、新增配置类ExcutorConfig

@EnableAsync
@Configuration
public class ExcutorConfig {
    private static Logger logger = Logger.getLogger(ExcutorConfig.class);

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start executor -->");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //设置核心线程数
        executor.setCorePoolSize(50);
        //设置最大线程数
        executor.setMaxPoolSize(300);
        //设置队列大小
        executor.setQueueCapacity(300);
        //配置线程池的前缀
        executor.setThreadNamePrefix("async-service-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //设置空闲时间
        executor.setKeepAliveSeconds(60);
        //进行加载
        executor.initialize();
        return executor;
    }
}

这里故意把线程池的数目设置的比较大
注意: 这里需要在 spring-mvc的配置中把config类的报名注册到component-scan里面,修改后如下:

image.png

2、如果不想通过代码,则在spring-service中新增如下配置即可

<bean id="asyncServiceExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
         <!-- 核心线程数 -->
        <property name="corePoolSize" value="50" />
        <!-- 最大线程数 -->
        <property name="maxPoolSize" value="300" />
        <!-- 队列最大长度 >=mainExecutor.maxSize -->
        <property name="queueCapacity" value="300" />
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="60" />
        <!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.  -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
</bean>

二、编写service

1、修改RoleService和实现类,添加如下接口

@Override
    public int insertRole(Role role) {
        return roleMapper.insert(role);
    }

接受一个 Role 对象,直接插入到数据表中

2、新增一个TaskExecuterTestService,用于实现并行插入N条记录

接口

public interface TaskExecuterTestService {

     void insertRoles(List<Role> roles);
}

实现类

@Service
public class TaskExecuterTestServiceImpl implements TaskExecuterTestService {

    private Logger logger = Logger.getLogger(this.getClass());

    @Resource(name = "asyncServiceExecutor")
    private ThreadPoolTaskExecutor taskExecutor;

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public void insertRoles(List<Role> roles) {
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        for (int i = 0; i < roles.size(); i++) {
            Role role = roles.get(i);
            taskExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        RoleService roleService = applicationContext.getBean(RoleService.class);
                        int ret = roleService.insertRole(role);
                        System.out.println("插入Role[" + role.getRoleName()+"]结果: " + ret + ",    当前线程id:  " + Thread.currentThread().getId());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();  //这个不管是否异常都需要数量减,否则会被堵塞无法结束
                    }
                }
            });
        }
        try {
            countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
            // 这样就可以在下面拿到所有线程执行完的集合结果
            System.out.println("all roles insert done..........................................................");
        } catch (Exception e) {
            logger.error("阻塞异常");
        }
    }
}

注意事项

三、编写Controller

新增一个TaskExecuterTestController用作入口

@Controller
@RequestMapping("/task")
@Api(value = "/task", tags = {"TaskExecuter测试"})
public class TaskExecuterTestController {

    @Autowired
    private TaskExecuterTestService taskExecuterTestService;

    @RequestMapping(value = "/doTest", method = RequestMethod.GET)
    @ResponseBody
    public String doTest() {
        StopWatch sc = new StopWatch();
        sc.start();
        SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
        List<Role> roles = new ArrayList<Role>();
        int totalCount = 100000;
        for (int i = 0; i < totalCount; i++) {
            Role role = new Role();
            role.setId(UUID.randomUUID().toString());
            role.setRoleName("testRole_" + i);
            role.setNote(ft.format(new Date()));
            roles.add(role);
        }
        taskExecuterTestService.insertRoles(roles);
        sc.stop();
        return "测试完成,插入数据 "+totalCount+" 条,总耗时 " + sc.getTotalTimeMillis() + " 毫秒";
    }
}

逻辑很简单,生成100000个role对象,然后调用service并行插入数据库,返回总耗时

四、测试

1、Swagger直接运行

初始数据库记录如下:


image.png

2、Swagger运行结果

image.png
插入数据10万条,总耗时 25 s

3、IDEA log

image.png

4、数据库记录

image.png

五、本次修改代码变更

Github-Commit spring多线程 TaskExecuter测试

上一篇下一篇

猜你喜欢

热点阅读