Spring Boot - 异步任务
前言
有时候,前端可能提交了一个耗时任务,如果后端接收到请求后,直接执行该耗时任务,那么前端需要等待很久一段时间才能接受到响应。如果该耗时任务是通过浏览器直接进行请求,那么浏览器页面会一直处于转圈等待状态。一个简单的例子如下所示:
@RestController
@RequestMapping("async")
public class AsyncController {
@GetMapping("/")
public String index() throws InterruptedException {
// 模拟耗时操作
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
return "consuming time behavior done!";
}
}
当我们在浏览器请求localhost:8080/async/
页面时,可以看到浏览器一直处于转圈等待状态,这样体验十分不友好。
事实上,当后端要处理一个耗时任务时,通常都会将耗时任务提交到一个异步任务中进行执行,此时前端提交耗时任务后,就可直接返回,进行其他操作。
在 Java 中,开启异步任务最常用的方式就是开辟线程执行异步任务,如下所示:
@RestController
@RequestMapping("async")
public class AsyncController {
@GetMapping("/")
public String index() {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 模拟耗时操作
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return "consuming time behavior processing!";
}
}
这时浏览器请求localhost:8080/async/
,就可以很快得到响应,并且耗时任务会在后台得到执行。
一般来说,前端不会关注耗时任务结果,因此前端只需负责提交该任务给到后端即可。但是如果前端需要获取耗时任务结果,则可通过Future
等方式将结果返回,详细内容请参考后文。
事实上,在 Spring Boot 中,我们不需要手动创建线程异步执行耗时任务,因为 Spring 框架已提供了相关异步任务执行解决方案,本文主要介绍下在 Spring Boot 中执行异步任务的相关内容。
执行异步任务
Spring 3.0 时提供了一个@Async
注解,该注解用于标记要进行异步执行的方法,当在其他线程调用被@Async
注解的方法时,就会开启一个线程执行该方法。
注:@Async
注解通常用在方法上,但是也可以用作类型上,当类被@Async
注解时,表示该类中所有的方法都是异步执行的。
在 Spring Boot 中,如果要执行一个异步任务,只需进行如下两步操作:
-
使用注解
@EnableAsync
开启异步任务支持,如下所示:@SpringBootApplication @ComponentScan("com.yn.async") @EnableAsync // 开启异步调用 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
@EnableAsync
注解可以让 Spring 开启异步方法执行,它会让 Spring 扫描被其注解的包及其子包下被@Async
注解的类或方法,所以这里我们在根包下配置@EnableAsync
。 -
使用
@Async
注解标记要进行异步执行的方法,如下所示:@Service // 假设当前类是一个 Service @Slf4j public class AsyncTaskService { @Async public void asyncTaskWithoutReturnType() throws InterruptedException { log.info("asyncTaskWithoutReturnType: AsyncTaskService Thread = {}",Thread.currentThread().getName()); // 模拟耗时任务 Thread.sleep(TimeUnit.SECONDS.toMillis(5)); } @Async public Future<String> asyncTaskWithReturnType() throws InterruptedException { log.info("asyncTaskWithReturnType: AsyncTaskService Thread = {}",Thread.currentThread().getName()); // 模拟耗时任务 Thread.sleep(TimeUnit.SECONDS.toMillis(5)); return new AsyncResult<>("async tasks done!"); } }
上述代码使用
@Async
标记了两个异步执行方法,一个没有返回值的asyncTaskWithoutReturnType
,另一个拥有返回值asyncTaskWithReturnType
,这里需要注意的一点时,被@Async
注解的方法可以接受任意类型参数,但只能返回void
或Future
类型数据。所以当异步方法返回数据时,需要使用Future
包装异步任务结果,上述代码使用AsyncResult
包装异步任务结果,AsyncResult
间接继承Future
,是 Spring 提供的一个可用于追踪异步方法执行结果的包装类。其他常用的Future
类型还有 Spring 4.2 提供的ListenableFuture
,或者 JDK 8 提供的CompletableFuture
,这些类型可提供更丰富的异步任务操作。如果前端需要获取耗时任务结果,则异步任务方法应当返回一个
Future
类型数据,此时Controller
相关接口需要调用该Future
的get()
方法获取异步任务结果,get()
方法是一个阻塞方法,因此该操作相当于将异步任务转换为同步任务,浏览器同样会面临我们前面所讲的转圈等待过程,但是异步执行还是有他的好处的,因为我们可以控制get()
方法的调用时序,因此可以先执行其他一些操作后,最后再调用get()
方法。 -
经过前面两个步骤后,其实就已经完成了异步任务配置。到此就可以调用这些异步任务方法,如下所示:
@RestController @RequestMapping("async") @Slf4j public class AsyncController { @Autowired // 注入异步任务类 private AsyncTaskService asyncTaskService; @GetMapping("/asyncTaskWithoutReturnType") public void asyncTaskWithoutReturnType() throws InterruptedException { log.info("asyncTaskWithoutReturnType: Controller Thread = {}",Thread.currentThread().getName()); this.asyncTaskService.asyncTaskWithoutReturnType(); } @GetMapping("/asyncTaskWithReturnType") public String asyncTaskWithReturnType() throws Exception { log.info("asyncTaskWithReturnType: Controller Thread = {}",Thread.currentThread().getName()); Future<String> future = this.asyncTaskService.asyncTaskWithReturnType(); return future.get(); } }
请求上述两个接口,如下所示:
$ curl -X GET localhost:8080/async/asyncTaskWithoutReturnType $ curl -X GET localhost:8080/async/asyncTaskWithReturnType async tasks done!
查看日志,如下图所示:
可以看到,异步任务方法运行在于
Controller
不同的线程上。
异步任务相关限制
被@Async
注解的异步任务方法存在相关限制:
-
被
@Async
注解的方法必须是public
的,这样方法才可以被代理。 -
不能在同一个类中调用
@Async
方法,因为同一个类中调用会绕过方法代理,调用的是实际的方法。 -
被
@Async
注解的方法不能是static
。 -
@Async
不能用于被@Configuration
注解的类方法上。
注:官方文档写的是不能在@Configuration
类中使用,但本人实际测试发现,无论是将@Async
注解到@Configuration
类上,还是将@Async
注解到方法上,都是可以异步执行方法的。 -
@Async
注解不能与 Bean 对象的生命周期回调函数(比如@PostConstruct
)一起注解到同一个方法中。解决方法可参考:Spring - The @Async annotation -
异步类必须注入到 Spring IOC 容器中(也即异步类必须被
@Component
/@Service
等进行注解)。 -
其他类中使用异步类对象必须通过
@Autowired
等方式进行注入,不能手动new
对象。
自定义 Executor
默认情况下,Spring 会自动搜索相关线程池定义:要么是一个唯一TaskExecutor
Bean 实例,要么是一个名称为taskExecutor
的Executor
Bean 实例。如果这两个 Bean 实例都不存在,就会使用SimpleAsyncTaskExecutor
来异步执行被@Async
注解的方法。
综上,可以知道,默认情况下,Spring 使用的 Executor 是SimpleAsyncTaskExecutor
,SimpleAsyncTaskExecutor
每次调用都会创建一个新的线程,不会重用之前的线程。很多时候,这种实现方式不符合我们的业务场景,因此通常我们都会自定义一个 Executor 来替换SimpleAsyncTaskExecutor
。
对于自定义 Executor(自定义线程池),可以分为如下两个层级:
-
方法层级:即为单独一个或多个方法指定运行线程池,其他未指定的异步方法运行在默认线程池。如下所示:
@SpringBootApplication @ComponentScan("com.yn.async") @EnableAsync public class Application { // ... @Bean("methodLevelExecutor1") public TaskExecutor getAsyncExecutor1() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(4); // 设置最大线程数 executor.setMaxPoolSize(20); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 设置线程默认前缀名 executor.setThreadNamePrefix("Method-Level-Async1-"); return executor; } @Bean("methodLevelExecutor2") public TaskExecutor getAsyncExecutor2() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(8); // 设置最大线程数 executor.setMaxPoolSize(20); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 设置线程默认前缀名 executor.setThreadNamePrefix("Method-Level-Async2-"); return executor; } }
上述特意设置了多个
TaskExecutor
,因为如果只设置一个TaskExecutor
,那么 Spring 就会默认采用该TaskExecutor
作为所有@Async
的Executor
,而设置了多个TaskExecutor
,Spring 检测到全局存在多个Executor
,就会降级使用默认的SimpleAsyncTaskExecutor
,此时我们就可以为@Async
方法配置执行线程池,其他未配置的@Async
就会默认运行在SimpleAsyncTaskExecutor
中,这就是方法层级的自定义 Executor。如下代码所示:@RestController @RequestMapping("async") @Slf4j public class AsyncController { @Autowired // 注入异步任务类 private AsyncTaskService asyncTaskService; @GetMapping("/asyncTaskWithoutReturnType") public void asyncTaskWithoutReturnType() throws InterruptedException { log.info("asyncTaskWithoutReturnType: Controller Thread = {}",Thread.currentThread().getName()); this.asyncTaskService.asyncTaskWithoutReturnType(); } @GetMapping("/asyncTaskWithReturnType") public String asyncTaskWithReturnType() throws Exception { log.info("asyncTaskWithReturnType: Controller Thread = {}",Thread.currentThread().getName()); Future<String> future = this.asyncTaskService.asyncTaskWithReturnType(); return future.get(); } }
请求上述接口,如下所示:
$ curl -X GET localhost:8080/async/asyncTaskWithoutReturnType $ curl -X GET localhost:8080/async/asyncTaskWithReturnType async tasks done!
请求日志如下所示:
2020-09-25 00:55:31,953 INFO [http-nio-8080-exec-1] com.yn.async.AsyncController: asyncTaskWithoutReturnType: Controller Thread = http-nio-8080-exec-1 2020-09-25 00:55:31,984 INFO [Method-Level-Async1-1] com.yn.async.AsyncTaskService: asyncTaskWithoutReturnType: AsyncTaskService Thread = Method-Level-Async1-1 2020-09-25 00:55:45,592 INFO [http-nio-8080-exec-2] com.yn.async.AsyncController: asyncTaskWithReturnType: Controller Thread = http-nio-8080-exec-2 2020-09-25 00:55:45,594 INFO [http-nio-8080-exec-2] org.springframework.aop.interceptor.AsyncExecutionAspectSupport: More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: [methodLevelExecutor1, methodLevelExecutor2] 2020-09-25 00:55:45,595 INFO [SimpleAsyncTaskExecutor-1] com.yn.async.AsyncTaskService: asyncTaskWithReturnType: AsyncTaskService Thread = SimpleAsyncTaskExecutor-1
结果跟我们上述的分析一致。
-
应用层级:即全局生效的 Executor。依据 Spring 默认搜索机制,其实就是配置一个全局唯一的
TaskExecutor
实例或者一个名称为taskExecutor
的Executor
实例即可,如下所示:@SpringBootApplication @ComponentScan("com.yn.async") @EnableAsync public class Application { // ... @Bean("taskExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 int cores = Runtime.getRuntime().availableProcessors(); executor.setCorePoolSize(cores); // 设置最大线程数 executor.setMaxPoolSize(20); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 设置线程默认前缀名 executor.setThreadNamePrefix("Application-Level-Async-"); return executor; } }
上述代码定义了一个名称为
taskExecutor
的Executor
,此时@Async
方法默认就会运行在该Executor
中。其实 Spring 还提供了另一个功能更加强大的接口
AsyncConfigurer
,该接口主要是用于自定义一个Executor
配置类,提供了应用层级Executor
接口,以及对于@Async
方法异常捕获功能。如果 Spring 检测到该接口实例,会优先采用该接口自定义的Executor
。如下所示:@Configuration @EnableAsync public class AsyncConfigure implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 int cores = Runtime.getRuntime().availableProcessors(); executor.setCorePoolSize(cores); // 设置最大线程数 executor.setMaxPoolSize(20); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 设置线程默认前缀名 executor.setThreadNamePrefix("AsyncConfigure-"); // 注意,此时需要调用 initialize executor.initialize(); return executor; } }
注:使用自定义实现
AsyncConfigurer
接口的配置类的另一个好处就是无论@EnableAsync
的包层级多深,默认都会对整个项目扫描@Async
方法,这样我们就无需将@EnableAsync
注解到根包类中。
异常处理
前文介绍过,对于被@Async
注解的异步方法,只能返回void
或者Future
类型。对于返回Future
类型数据,如果异步任务方法抛出异常,则很容易进行处理,因为Future.get()
会重新抛出该异常,我们只需对其进行捕获即可。但是对于返回void
的异步任务方法,异常不会传播到被调用者线程,因此我们需要自定义一个额外的异步任务异常处理器,捕获异步任务方法抛出的异常。
自定义异步任务异常处理器的步骤如下所示:
-
首先自定义一个异常处理器类实现接口
AsyncUncaughtExceptionHandler
,如下所示:public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... objects) { System.out.println("Exception message - " + throwable.getMessage()); System.out.println("Method name - " + method.getName()); for (Object param : objects) { System.out.println("Parameter value - " + param); } } }
-
然后,创建一个自定义
Executor
异步配置类,将我们的自定义异常处理器设置到其接口上。如下所示:@Configuration @EnableAsync public class AsyncConfigure implements AsyncConfigurer { // ... @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new CustomAsyncExceptionHandler(); } }
此时异步方法如果抛出异常,就可以被我们的自定义异步异常处理器捕获得到。