spring源码------@EnableAsync注解以及@A
@[toc]
1.@EnableAsync
以及@Async
的说明
1.1 @Async
spring从3.0版本开始支持异步的方法调用,只需要在方法或者类上面加上一个@Async
注解就可以了,当注解在类上面的时候,则表示整个类都作为异步方法。但是需要注意的是,当一个方法所在类上面已经存在@Configuration
注解的时候,这个时候@Async
注解是不支持的,至于为什么会不支持因为@Configuration
注解会对当前类进行代理增强,这个时候返回的类是一个经过代理的类,这个时候的方法可能已经不是原方法了,至于代理增强的相关的东西可以看看spring源码------@Configuration跟@Component及其派生注解@Service等的区别以及spring对其代理增强的原理
这个里面有对@Configuration
注解的解析。
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
//指定对应的Executor的beanName
String value() default "";
}
1.2 @EnableAsync
spring在3.1版本的时候加上了是否开启异步方法支持的注解@EnableAsync
。这个注解是基于@Import
注解进行扩展的,关于@Import
可以看看spring源码解析------@Import注解解析与ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector区别
了解以下。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//用户定义的需要进行异步执行的注解,默认只有Async注解
Class<? extends Annotation> annotation() default Annotation.class;
//是否创建基于CGLIB的代理对象
boolean proxyTargetClass() default false;
//代理模式选择 PROXY表示jdk的代理
AdviceMode mode() default AdviceMode.PROXY;
//对应的AsyncAnnotationBeanPostProcessor的拦截顺序
int order() default Ordered.LOWEST_PRECEDENCE;
}
2. 源码分析
2.1 基于@Import
扩展的AsyncConfigurationSelector
AsyncConfigurationSelector
类继承了AdviceModeImportSelector
类,而AdviceModeImportSelector
类在前面的一篇文章spring源码------@EnableCaching,@Cacheable,@CacheEvict,@CachePut的实现原理
中提到过。这个类主要是根据@Import
注解的扩展注解的类型的mode
属性来设置代理类型。
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";
protected String getAdviceModeAttributeName() {
return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
}
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
//获取AdviceModeImportSelector中的注解泛型的Class对象
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
//从传入的importingClassMetadata对象中获取对应的Class类型的注解的内部属性
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}
//获取属性中的代理类型属性“mode”的值
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
//根据代理类型获取需要注入的bean
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}
}
其中selectImports
方法由子类来实现,这里进入AsyncConfigurationSelector
来看看实现的逻辑
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
//默认的是jdk PROXY的模式
case PROXY:
//返回ProxyAsyncConfiguration类,注入到容器中,配置异步方法相关的配置
return new String[] {ProxyAsyncConfiguration.class.getName()};
//使用AspectJ的模式
case ASPECTJ:
//返回AspectJ关于异步方法相关的配置类AspectJAsyncConfiguration
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
可以看到这里就是选择不同的代理模式情况下,需要注入不同的配置类。而默认情况下@EnableAsync
注解中的是PROXY
模式,这里我们也就这种模式进行分析,所以接下来就是ProxyAsyncConfiguration
类了。
2.2 配置异步方法执行相关配置的ProxyAsyncConfiguration
spring对于方法的异步执行的逻辑跟@EnableCache
注解类似的,都是基于对方法拦截完成的,这里的拦截涉及到了切点跟增强类。而ProxyAsyncConfiguration
中就配置了这两点。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//创建AsyncAnnotationBeanPostProcessor,实现了AbstractAdvisingBeanPostProcessor(内部定义了Advisor对象可以可以注册到容器中)
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//设置executor跟exceptionHandler,spring都是空实现,也就是这两个值都是null,后面会设置默认的对象,也可以自己指定实现,
bpp.configure(this.executor, this.exceptionHandler);
//获取@EnableAsync注解中的annotation属性,这个annotation表示贴有这些注解的方法就是需要拦截的进行异步执行的方法
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
//如果annotation值不是默认的,则将这个加入到AsyncAnnotationBeanPostProcessor,后面进行拦截用
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//是否对目标类进行代理
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
//设置AsyncAnnotationBeanPostProcessor的顺序,默认是最低的,这样可以在其他的 post-processors执行后在处理
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
可以看到主要逻辑如下:
- 创建
AsyncAnnotationBeanPostProcessor
, - 配置异步执行方法的执行器executor,以及错误异常处理的exceptionHandler
- 然后解析
@EnableAsync
注解的annotation
属性,或许需要拦截代理的方法,然后设置进入。 - 设置是否代理目标对象
- 设置拦截器所处的顺序
接下来就是进入AsyncAnnotationBeanPostProcessor
了解相关的配置和处理异步相关的类。
2.3 创建切点以及增强类的AsyncAnnotationBeanPostProcessor
及其父类
2.3.1 创建增强类的AsyncAnnotationBeanPostProcessor
在AsyncAnnotationBeanPostProcessor
的构造器中有一个逻辑就是将当前类会创建的增强类作为所有增强类的第一个
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public AsyncAnnotationBeanPostProcessor() {
//是否将这个advisor放在advisor列表的第一个,也就是最开始拦截
setBeforeExistingAdvisors(true);
}
}
这个属性是在其父类中被用到的,这里简单的展示以下部分的代码。
//实现了BeanPostProcessor,在bean初始化之后调用
public Object postProcessAfterInitialization(Object bean, String beanName) {
......
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
//是否将当前的Advised放在调用链的最前面
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
......
}
AsyncAnnotationBeanPostProcessor
在这里还间接的实现了BeanFactoryAware
接口的setBeanFactory
方法,而这个方法里面就创建了增强相关的advisor。
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//创建AsyncAnnotationAdvisor,
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
//将需要拦截的方法上的注解加入到advisor
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
//设置容器到advisor
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
这里的创建的AsyncAnnotationAdvisor
间接实现了PointcutAdvisor
跟Advisor
,会将内部的Advice
以及Pointcut
注入到容器中,关于这两个类可以看看6.1Spring的AOP的解析——AOP的自定义组件
了解一下。这里直接进入到AsyncAnnotationAdvisor
中查看。
2.3.2 创建增强跟切点的AsyncAnnotationAdvisor
在AsyncAnnotationAdvisor
的构造器中,就包含了advice跟pointcut的创建入口。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
//在用户自定义的拦截注解上额外加入Async到需要拦截的注解集合
asyncAnnotationTypes.add(Async.class);
try {
//增加对java的ejv的Asynchronous注解的支持
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//创建advice
this.advice = buildAdvice(executor, exceptionHandler);
//设置pointcut
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
这里主要就是将表示需要拦截的注解加入到需要对应的集合中,然后就开始对advice跟pointcut的对象的创建。这里先看advice。
2.3.3 创建拦截器AnnotationAsyncExecutionInterceptor
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//设置拦截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//设置executor跟exceptionHandler
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
这里的configure就是配置用来执行异步调用方法的执行器Executor
以及处理调用出异常的时候处理类AsyncUncaughtExceptionHandler
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//如果defaultExecutor为null,也就是没有指定Executor,设置默认的SimpleAsyncTaskExecutor(在子类AsyncExecutionInterceptor中重载getDefaultExecutor方法)
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//如果没有指定exceptionHandler默认为SimpleAsyncUncaughtExceptionHandler
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
其中getDefaultExecutor
方法子类AsyncExecutionInterceptor
进行了重载
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
//先调用父类的方法从容器中获取,
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
//如果没有设置Executor 则使用SimpleAsyncTaskExecutor
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
2.3.4 创建切点
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
//创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为TrueMethodMatcher
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
//创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为AnnotationMethodMatcher
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
这里主要包含两种,一种是AnnotationClassFilter
跟TrueMethodMatcher
结合的,另外一种是AnnotationClassFilter
跟AnnotationMethodMatcher
结合的。对于这三个类这里不进行分析,只说明其作用。作用就是对当前拦截的方法进行匹配判断,是否包含前面设置的那些需要拦截的注解,如果包含则说明,这个方法是需要进行异步执行的,没有则表示不匹配则会跳过这个方法。
2.4 方法拦截增强的逻辑
先总结一下上面那些步骤做了啥,主要就是做了下面的几点:
- 设置需要拦截的注解集合
asyncAnnotationType
- 设置方法的拦截处理器
AsyncExecutionInterceptor
, - 进行异步调用的执行器
Executor
,错误处理器SimpleAsyncUncaughtExceptionHandler
接下来就对这些进行分析。
2.4.1 拦截方法的AsyncExecutionInterceptor
这里直接进入实现了MethodInterceptor
接口的invoke
方法。
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//根据方法相关信息,获取executor,这个executor可以指定也可以是默认的
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
//进入下一个拦截器链
Object result = invocation.proceed();
if (result instanceof Future) {
//获取结果返回
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
//进入到SimpleAsyncUncaughtExceptionHandler进行错误处理
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
//进入到SimpleAsyncUncaughtExceptionHandler进行错误处理
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//使用选定的执行程序实际执行给定任务的委托,使用的是CompletableFuture
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
这里主要逻辑如下:
- 从目标类中获取最匹配的需要执行的方法
- 获取异步调度方法用的executor
- 执行完拦截链获取到最终结果,中间如果出错,则最后交给
SimpleAsyncUncaughtExceptionHandler
处理 - 使用executor来完成异步调用
这里进入到determineAsyncExecutor
看看怎么选择执行器的。
2.4.2 获取方法调度用的执行器
determineAsyncExecutor
方法在MethodInterceptor
的父类AsyncExecutionAspectSupport
中。
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//缓存的AsyncTaskExecutor跟method的map集合
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
//获取贴有Async注解的方法,最终的是现在注册的AnnotationAsyncExecutionInterceptor类中,寻找Async注解中的value字段
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
//如果value有指定Executor则在容器中寻找关联的执行器
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
//使用默认的Executor,SimpleAsyncTaskExecutor
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
//转换为AsyncTaskExecutor类型
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
//保存到缓存中
this.executors.put(method, executor);
}
return executor;
}
这里主要逻辑就是先获取配置指定,如果没有指定则用默认的。而获取配置的逻辑在AsyncExecutionInterceptor
的子类AnnotationAsyncExecutionInterceptor
中实现的。
protected String getExecutorQualifier(Method method) {
//获取方法上的Async注解
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
//注解为空,则获取声明这个方法的类上面有没有这个注解
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
//返回Async注解value或者null
return (async != null ? async.value() : null);
}
其主要逻辑就是获取方法或者类上面有没有@Async
注解,有则代表是需要异步调用的方法,没有则不是,是的还需要进一步获取注解中value
属性,看有没有指定执行器并返回。
2.4.3 进行方法的异步调用——使用线程池
在AsyncExecutionInterceptor
中获取到了AsyncTaskExecutor
类型的执行器之后,就是信息异步的方法调用了。这个逻辑在doSubmit
方法中。
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//使用的java8 的新的Future相关API,CompletableFuture来完成
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
//提交任务
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
//提交任务
return executor.submit(task);
}
else {
//提交任务
executor.submit(task);
return null;
}
}
可以看到,最后的所有异步调用还是离不开线程池。到这里整个@EnableAsync
注解以及@Async
注解的分析就完结了。