SpringBoot线程池ThreadPoolExecutor
SpringBoot框架@Async注解文章:SpringBoot异步调用@Async
SpringBoot线程池ThreadPoolTaskExecutor文章:SpringBoot线程池ThreadPoolTaskExecutor
ThreadPoolExecutor是JDK中的JUC中的线程池技术
SpringBoot线程池ThreadPoolTaskExecutor代码实现
service层
- 创建一个service层的接口AsyncService,如下:
public interface AsyncService {
/**
* 执行异步任务
* */
void executeAsync1() throws InterruptedException;
/**
* 执行异步任务
* */
void executeAsync2() throws InterruptedException;
}
- 对应的AsyncServiceImpl,实现如下:
/**
* 异步线程service
* @author jeffrey_hjf
*/
@Service
@Log
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("asyncExecutor")
public void executeAsync1() throws InterruptedException {
System.out.println("MsgServer send A thread name->" + Thread.currentThread().getName());
Long startTime = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(2);
Long endTime = System.currentTimeMillis();
System.out.println("MsgServer send A 耗时:" + (endTime - startTime));
}
@Override
@Async("asyncExecutor")
public void executeAsync2() throws InterruptedException {
System.out.println("MsgServer send B thread name->" + Thread.currentThread().getName());
Long startTime = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(2);
Long endTime = System.currentTimeMillis();
System.out.println("MsgServer send B耗时:" + (endTime - startTime));
}
}
线程池配置
创建一个配置类ThreadPoolExecutorConfig,用来定义如何创建一个ThreadPoolExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类,如下所示:
/**
* @author jeffrey_hjf
*/
@Configuration
public class ThreadPoolExecutorConfig {
/**
* 获得Java虚拟机可用的处理器个数 + 1
*/
private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize = THREADS;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize = 2 * THREADS;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity = 1024;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix = "async-service-";
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
// -%d不要少
.setNameFormat(namePrefix + "%d")
.setDaemon(true)
.build();
/**
*
* @return
*/
@Bean("asyncExecutor")
public Executor asyncExecutor() {
return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
5, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
threadFactory, (r, executor) -> {
//打印日志,添加监控等
System.out.println("task is rejected!");
});
}
}
controller层
创建一个controller为Hello,里面定义一个http接口,做的事情是调用Service层的服务,如下:
/**
* @ClassName UserController
* @Author jeffrey_hjf
* @Description User
**/
@RestController
@RequestMapping("/api")
@Log
public class UserController {
@Autowired
private AsyncService asyncService;
/**
* ThreadPoolExecutor线程池
* @return
*/
@GetMapping("/executeThreadPoolExecutor")
public String executeThreadPoolExecutor() throws Exception {
System.out.println("主线程 name -->" + Thread.currentThread().getName());
asyncService.executeAsync1();
asyncService.executeAsync2();
return "Hello World";
}
}
执行效果
控制台看见日志如下:
主线程 name -->http-nio-9090-exec-1
MsgServer send A thread name->async-service-0
MsgServer send B thread name->async-service-1
MsgServer send A 耗时:2000
MsgServer send B耗时:2000
如上日志所示,我们可以看到controller的执行线程是”nio-8080-exec-1”,这是tomcat的执行线程,而service层的日志显示线程名为“async-service-0”,显然已经在我们配置的线程池中执行了,并且每次请求中,controller的起始和结束日志都是连续打印的,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;
SpringBoot线程池扩展ThreadPoolExecutor代码实现
虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,改写的方法:beforeExecute、afterExecute、terminated。这些方法可以添加日志、计时、监控或统计信息收集等功能。
扩展ThreadPoolExecutor类
public class ThreadPoolExecutorExtend extends ThreadPoolExecutor {
private final ThreadLocal startTime = new ThreadLocal();
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try{
long endTime = System.currentTimeMillis();
long useTime = endTime - (long)startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(useTime);
System.out.println("afterExecute " + r);
}finally{
super.afterExecute(r, t);
}
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
System.out.println("beforeExecute " + r);
startTime.set(System.currentTimeMillis());
}
@Override
protected void terminated() {
try{
System.out.println("terminated avg time " + totalTime.get() + " " + numTasks.get());
}finally{
super.terminated();
}
}
}
修改ThreadPoolExecutorConfig配置类
修改ThreadPoolExecutorConfig.java的asyncExecutorExtend方法,将new ThreadPoolExecutor改为new ThreadPoolExecutorExtend,如下所示:
@Bean("asyncExecutorExtend")
public Executor asyncExecutorExtend() {
return new ThreadPoolExecutorExtend(corePoolSize, maxPoolSize,
5, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
threadFactory, (r, executor) -> {
//打印日志,添加监控等
System.out.println("task is rejected!");
});
}
执行效果
日志如下:
主线程 name -->http-nio-9090-exec-1
beforeExecute java.util.concurrent.FutureTask@372c5560
MsgServer send A thread name->async-service-0
MsgServer send A 耗时:2000
afterExecute java.util.concurrent.FutureTask@372c5560
SpringBoot框架@Async注解文章:SpringBoot异步调用@Async
SpringBoot线程池ThreadPoolTaskExecutor文章:SpringBoot线程池ThreadPoolTaskExecutor