SpringBoot线程池ThreadPoolExecutor

2019-08-26  本文已影响0人  jeffrey_hjf
SpringBoot线程池ThreadPoolExecutor

SpringBoot框架@Async注解文章:SpringBoot异步调用@Async
SpringBoot线程池ThreadPoolTaskExecutor文章:SpringBoot线程池ThreadPoolTaskExecutor

ThreadPoolExecutor是JDK中的JUC中的线程池技术

SpringBoot线程池ThreadPoolTaskExecutor代码实现

service层
  1. 创建一个service层的接口AsyncService,如下:
public interface AsyncService {
    /**
     * 执行异步任务
     * */
    void executeAsync1() throws InterruptedException;

    /**
     * 执行异步任务
     * */
    void executeAsync2() throws InterruptedException;
}
  1. 对应的AsyncServiceImpl,实现如下:
/**
 * 异步线程service
 * @author jeffrey_hjf
 */
@Service
@Log
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("asyncExecutor")
    public void executeAsync1() throws InterruptedException {
        System.out.println("MsgServer send A thread name->" + Thread.currentThread().getName());
        Long startTime = System.currentTimeMillis();
        TimeUnit.SECONDS.sleep(2);

        Long endTime = System.currentTimeMillis();
        System.out.println("MsgServer send A 耗时:" + (endTime - startTime));
    }

    @Override
    @Async("asyncExecutor")
    public void executeAsync2() throws InterruptedException {
        System.out.println("MsgServer send B thread name->" + Thread.currentThread().getName());
        Long startTime = System.currentTimeMillis();
        TimeUnit.SECONDS.sleep(2);
        Long endTime = System.currentTimeMillis();
        System.out.println("MsgServer send B耗时:" + (endTime - startTime));
    }
}
线程池配置

创建一个配置类ThreadPoolExecutorConfig,用来定义如何创建一个ThreadPoolExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类,如下所示:

/**
 * @author jeffrey_hjf
 */
@Configuration
public class ThreadPoolExecutorConfig {
    /**
     * 获得Java虚拟机可用的处理器个数 + 1
     */
    private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize = THREADS;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize = 2 * THREADS;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity = 1024;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix = "async-service-";

    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
            // -%d不要少
            .setNameFormat(namePrefix + "%d")
            .setDaemon(true)
            .build();

    /**
     *
     * @return
     */
    @Bean("asyncExecutor")
    public Executor asyncExecutor() {
        return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
                5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(queueCapacity),
                threadFactory, (r, executor) -> {
                //打印日志,添加监控等
                System.out.println("task is rejected!");
        });
    }
}
controller层

创建一个controller为Hello,里面定义一个http接口,做的事情是调用Service层的服务,如下:

/**
 * @ClassName UserController
 * @Author jeffrey_hjf
 * @Description User
 **/

@RestController
@RequestMapping("/api")
@Log
public class UserController {

    @Autowired
    private AsyncService asyncService;

    /**
     * ThreadPoolExecutor线程池
     * @return
     */
    @GetMapping("/executeThreadPoolExecutor")
    public String executeThreadPoolExecutor() throws Exception {
        System.out.println("主线程 name -->" + Thread.currentThread().getName());
        asyncService.executeAsync1();
        asyncService.executeAsync2();
        return "Hello World";
    }
}
执行效果

控制台看见日志如下:

主线程 name -->http-nio-9090-exec-1
MsgServer send A thread name->async-service-0
MsgServer send B thread name->async-service-1
MsgServer send A 耗时:2000
MsgServer send B耗时:2000

如上日志所示,我们可以看到controller的执行线程是”nio-8080-exec-1”,这是tomcat的执行线程,而service层的日志显示线程名为“async-service-0”,显然已经在我们配置的线程池中执行了,并且每次请求中,controller的起始和结束日志都是连续打印的,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;

SpringBoot线程池扩展ThreadPoolExecutor代码实现

虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,改写的方法:beforeExecute、afterExecute、terminated。这些方法可以添加日志、计时、监控或统计信息收集等功能。

扩展ThreadPoolExecutor类
public class ThreadPoolExecutorExtend extends ThreadPoolExecutor {
    private final ThreadLocal startTime = new ThreadLocal();
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            long endTime = System.currentTimeMillis();
            long useTime = endTime - (long)startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(useTime);
            System.out.println("afterExecute " + r);
        }finally{
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        System.out.println("beforeExecute " + r);
        startTime.set(System.currentTimeMillis());
    }

    @Override
    protected void terminated() {
        try{
            System.out.println("terminated avg time " + totalTime.get()  + " " + numTasks.get());
        }finally{
            super.terminated();
        }
    }
}
修改ThreadPoolExecutorConfig配置类

修改ThreadPoolExecutorConfig.java的asyncExecutorExtend方法,将new ThreadPoolExecutor改为new ThreadPoolExecutorExtend,如下所示:

 @Bean("asyncExecutorExtend")
    public Executor asyncExecutorExtend() {
        return new ThreadPoolExecutorExtend(corePoolSize, maxPoolSize,
                5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(queueCapacity),
                threadFactory, (r, executor) -> {
            //打印日志,添加监控等
            System.out.println("task is rejected!");
        });
    }
执行效果

日志如下:

主线程 name -->http-nio-9090-exec-1
beforeExecute java.util.concurrent.FutureTask@372c5560
MsgServer send A thread name->async-service-0
MsgServer send A 耗时:2000
afterExecute java.util.concurrent.FutureTask@372c5560

SpringBoot框架@Async注解文章:SpringBoot异步调用@Async
SpringBoot线程池ThreadPoolTaskExecutor文章:SpringBoot线程池ThreadPoolTaskExecutor

上一篇下一篇

猜你喜欢

热点阅读