SpringSpring

spring源码------@EnableAsync注解以及@A

2019-11-25  本文已影响0人  szhlcy

@[toc]

1.@EnableAsync以及@Async的说明

1.1 @Async

 spring从3.0版本开始支持异步的方法调用,只需要在方法或者类上面加上一个@Async注解就可以了,当注解在类上面的时候,则表示整个类都作为异步方法。但是需要注意的是,当一个方法所在类上面已经存在@Configuration注解的时候,这个时候@Async注解是不支持的,至于为什么会不支持因为@Configuration注解会对当前类进行代理增强,这个时候返回的类是一个经过代理的类,这个时候的方法可能已经不是原方法了,至于代理增强的相关的东西可以看看spring源码------@Configuration跟@Component及其派生注解@Service等的区别以及spring对其代理增强的原理
这个里面有对@Configuration注解的解析。

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

    //指定对应的Executor的beanName
    String value() default "";

}
1.2 @EnableAsync

 spring在3.1版本的时候加上了是否开启异步方法支持的注解@EnableAsync。这个注解是基于@Import注解进行扩展的,关于@Import可以看看spring源码解析------@Import注解解析与ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector区别
了解以下。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

    //用户定义的需要进行异步执行的注解,默认只有Async注解
    Class<? extends Annotation> annotation() default Annotation.class;

    //是否创建基于CGLIB的代理对象
    boolean proxyTargetClass() default false;


    //代理模式选择 PROXY表示jdk的代理
    AdviceMode mode() default AdviceMode.PROXY;
    
    //对应的AsyncAnnotationBeanPostProcessor的拦截顺序
    int order() default Ordered.LOWEST_PRECEDENCE;

}

2. 源码分析

2.1 基于@Import扩展的AsyncConfigurationSelector

AsyncConfigurationSelector类继承了AdviceModeImportSelector类,而AdviceModeImportSelector类在前面的一篇文章spring源码------@EnableCaching,@Cacheable,@CacheEvict,@CachePut的实现原理
中提到过。这个类主要是根据@Import注解的扩展注解的类型的mode属性来设置代理类型。

public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {    
    public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

    protected String getAdviceModeAttributeName() {
        return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
    }

    public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
        //获取AdviceModeImportSelector中的注解泛型的Class对象
        Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
        Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
        //从传入的importingClassMetadata对象中获取对应的Class类型的注解的内部属性
        AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
        if (attributes == null) {
            throw new IllegalArgumentException(String.format(
                    "@%s is not present on importing class '%s' as expected",
                    annType.getSimpleName(), importingClassMetadata.getClassName()));
        }
        //获取属性中的代理类型属性“mode”的值
        AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
        //根据代理类型获取需要注入的bean
        String[] imports = selectImports(adviceMode);
        if (imports == null) {
            throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
        }
        return imports;
    }
}

 其中selectImports方法由子类来实现,这里进入AsyncConfigurationSelector来看看实现的逻辑

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            //默认的是jdk PROXY的模式
            case PROXY:
                //返回ProxyAsyncConfiguration类,注入到容器中,配置异步方法相关的配置
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            //使用AspectJ的模式
            case ASPECTJ:
                //返回AspectJ关于异步方法相关的配置类AspectJAsyncConfiguration
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }
}

 可以看到这里就是选择不同的代理模式情况下,需要注入不同的配置类。而默认情况下@EnableAsync注解中的是PROXY模式,这里我们也就这种模式进行分析,所以接下来就是ProxyAsyncConfiguration类了。

2.2 配置异步方法执行相关配置的ProxyAsyncConfiguration

 spring对于方法的异步执行的逻辑跟@EnableCache注解类似的,都是基于对方法拦截完成的,这里的拦截涉及到了切点跟增强类。而ProxyAsyncConfiguration中就配置了这两点。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        //创建AsyncAnnotationBeanPostProcessor,实现了AbstractAdvisingBeanPostProcessor(内部定义了Advisor对象可以可以注册到容器中)
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        //设置executor跟exceptionHandler,spring都是空实现,也就是这两个值都是null,后面会设置默认的对象,也可以自己指定实现,
        bpp.configure(this.executor, this.exceptionHandler);
        //获取@EnableAsync注解中的annotation属性,这个annotation表示贴有这些注解的方法就是需要拦截的进行异步执行的方法
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        //如果annotation值不是默认的,则将这个加入到AsyncAnnotationBeanPostProcessor,后面进行拦截用
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        //是否对目标类进行代理
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        //设置AsyncAnnotationBeanPostProcessor的顺序,默认是最低的,这样可以在其他的 post-processors执行后在处理
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }

}

 可以看到主要逻辑如下:

  1. 创建AsyncAnnotationBeanPostProcessor
  2. 配置异步执行方法的执行器executor,以及错误异常处理的exceptionHandler
  3. 然后解析@EnableAsync注解的annotation属性,或许需要拦截代理的方法,然后设置进入。
  4. 设置是否代理目标对象
  5. 设置拦截器所处的顺序

 接下来就是进入AsyncAnnotationBeanPostProcessor了解相关的配置和处理异步相关的类。

2.3 创建切点以及增强类的AsyncAnnotationBeanPostProcessor及其父类
2.3.1 创建增强类的AsyncAnnotationBeanPostProcessor

 在AsyncAnnotationBeanPostProcessor的构造器中有一个逻辑就是将当前类会创建的增强类作为所有增强类的第一个

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
        public AsyncAnnotationBeanPostProcessor() {
        //是否将这个advisor放在advisor列表的第一个,也就是最开始拦截
        setBeforeExistingAdvisors(true);
    }
}

 这个属性是在其父类中被用到的,这里简单的展示以下部分的代码。

//实现了BeanPostProcessor,在bean初始化之后调用
public Object postProcessAfterInitialization(Object bean, String beanName) {
        ......
        if (bean instanceof Advised) {
            Advised advised = (Advised) bean;
            if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
                // Add our local Advisor to the existing proxy's Advisor chain...
                //是否将当前的Advised放在调用链的最前面
                if (this.beforeExistingAdvisors) {
                    advised.addAdvisor(0, this.advisor);
                }
                else {
                    advised.addAdvisor(this.advisor);
                }
                return bean;
            }
        }
        ......
}

AsyncAnnotationBeanPostProcessor在这里还间接的实现了BeanFactoryAware接口的setBeanFactory方法,而这个方法里面就创建了增强相关的advisor。

    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);
        //创建AsyncAnnotationAdvisor,
        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        //将需要拦截的方法上的注解加入到advisor
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        //设置容器到advisor
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }

 这里的创建的AsyncAnnotationAdvisor间接实现了PointcutAdvisorAdvisor,会将内部的Advice以及Pointcut注入到容器中,关于这两个类可以看看6.1Spring的AOP的解析——AOP的自定义组件
了解一下。这里直接进入到AsyncAnnotationAdvisor中查看。

2.3.2 创建增强跟切点的AsyncAnnotationAdvisor

 在AsyncAnnotationAdvisor的构造器中,就包含了advice跟pointcut的创建入口。

    public AsyncAnnotationAdvisor(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

        Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
        //在用户自定义的拦截注解上额外加入Async到需要拦截的注解集合
        asyncAnnotationTypes.add(Async.class);
        try {
            //增加对java的ejv的Asynchronous注解的支持
            asyncAnnotationTypes.add((Class<? extends Annotation>)
                    ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
        }
        catch (ClassNotFoundException ex) {
            // If EJB 3.1 API not present, simply ignore.
        }
        //创建advice
        this.advice = buildAdvice(executor, exceptionHandler);
        //设置pointcut
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }

 这里主要就是将表示需要拦截的注解加入到需要对应的集合中,然后就开始对advice跟pointcut的对象的创建。这里先看advice。

2.3.3 创建拦截器AnnotationAsyncExecutionInterceptor
    protected Advice buildAdvice(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
        //设置拦截器
        AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
        //设置executor跟exceptionHandler
        interceptor.configure(executor, exceptionHandler);
        return interceptor;
    }

 这里的configure就是配置用来执行异步调用方法的执行器Executor以及处理调用出异常的时候处理类AsyncUncaughtExceptionHandler

    public void configure(@Nullable Supplier<Executor> defaultExecutor,
            @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
        //如果defaultExecutor为null,也就是没有指定Executor,设置默认的SimpleAsyncTaskExecutor(在子类AsyncExecutionInterceptor中重载getDefaultExecutor方法)
        this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
        //如果没有指定exceptionHandler默认为SimpleAsyncUncaughtExceptionHandler
        this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
    }

 其中getDefaultExecutor方法子类AsyncExecutionInterceptor进行了重载

    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
        //先调用父类的方法从容器中获取,
        Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
        //如果没有设置Executor 则使用SimpleAsyncTaskExecutor
        return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }
2.3.4 创建切点
    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
        ComposablePointcut result = null;
        for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            //创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为TrueMethodMatcher
            Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            //创建Pointcut,会设置classFilter为AnnotationClassFilter,methodMatcher为AnnotationMethodMatcher
            Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
            if (result == null) {
                result = new ComposablePointcut(cpc);
            }
            else {
                result.union(cpc);
            }
            result = result.union(mpc);
        }
        return (result != null ? result : Pointcut.TRUE);
    }

 这里主要包含两种,一种是AnnotationClassFilterTrueMethodMatcher结合的,另外一种是AnnotationClassFilterAnnotationMethodMatcher结合的。对于这三个类这里不进行分析,只说明其作用。作用就是对当前拦截的方法进行匹配判断,是否包含前面设置的那些需要拦截的注解,如果包含则说明,这个方法是需要进行异步执行的,没有则表示不匹配则会跳过这个方法。

2.4 方法拦截增强的逻辑

 先总结一下上面那些步骤做了啥,主要就是做了下面的几点:

  1. 设置需要拦截的注解集合asyncAnnotationType
  2. 设置方法的拦截处理器AsyncExecutionInterceptor
  3. 进行异步调用的执行器Executor,错误处理器SimpleAsyncUncaughtExceptionHandler

 接下来就对这些进行分析。

2.4.1 拦截方法的AsyncExecutionInterceptor

 这里直接进入实现了MethodInterceptor接口的invoke方法。

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        //根据方法相关信息,获取executor,这个executor可以指定也可以是默认的
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }
        Callable<Object> task = () -> {
            try {
                //进入下一个拦截器链
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    //获取结果返回
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                //进入到SimpleAsyncUncaughtExceptionHandler进行错误处理
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                //进入到SimpleAsyncUncaughtExceptionHandler进行错误处理
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };
        //使用选定的执行程序实际执行给定任务的委托,使用的是CompletableFuture
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

 这里主要逻辑如下:

  1. 从目标类中获取最匹配的需要执行的方法
  2. 获取异步调度方法用的executor
  3. 执行完拦截链获取到最终结果,中间如果出错,则最后交给SimpleAsyncUncaughtExceptionHandler处理
  4. 使用executor来完成异步调用

 这里进入到determineAsyncExecutor看看怎么选择执行器的。

2.4.2 获取方法调度用的执行器

determineAsyncExecutor方法在MethodInterceptor的父类AsyncExecutionAspectSupport中。

    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        //缓存的AsyncTaskExecutor跟method的map集合
        AsyncTaskExecutor executor = this.executors.get(method);
        if (executor == null) {
            Executor targetExecutor;
            //获取贴有Async注解的方法,最终的是现在注册的AnnotationAsyncExecutionInterceptor类中,寻找Async注解中的value字段
            String qualifier = getExecutorQualifier(method);
            if (StringUtils.hasLength(qualifier)) {
                //如果value有指定Executor则在容器中寻找关联的执行器
                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
            }
            else {
                //使用默认的Executor,SimpleAsyncTaskExecutor
                targetExecutor = this.defaultExecutor.get();
            }
            if (targetExecutor == null) {
                return null;
            }
            //转换为AsyncTaskExecutor类型
            executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
            //保存到缓存中
            this.executors.put(method, executor);
        }
        return executor;
    }

 这里主要逻辑就是先获取配置指定,如果没有指定则用默认的。而获取配置的逻辑在AsyncExecutionInterceptor的子类AnnotationAsyncExecutionInterceptor中实现的。

    protected String getExecutorQualifier(Method method) {
        //获取方法上的Async注解
        Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
        if (async == null) {
            //注解为空,则获取声明这个方法的类上面有没有这个注解
            async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
        }
        //返回Async注解value或者null
        return (async != null ? async.value() : null);
    }

 其主要逻辑就是获取方法或者类上面有没有@Async注解,有则代表是需要异步调用的方法,没有则不是,是的还需要进一步获取注解中value属性,看有没有指定执行器并返回。

2.4.3 进行方法的异步调用——使用线程池

 在AsyncExecutionInterceptor中获取到了AsyncTaskExecutor类型的执行器之后,就是信息异步的方法调用了。这个逻辑在doSubmit方法中。

    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        //使用的java8 的新的Future相关API,CompletableFuture来完成
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                }
                catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            //提交任务
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
        else if (Future.class.isAssignableFrom(returnType)) {
            //提交任务
            return executor.submit(task);
        }
        else {
            //提交任务
            executor.submit(task);
            return null;
        }
    }

 可以看到,最后的所有异步调用还是离不开线程池。到这里整个@EnableAsync注解以及@Async注解的分析就完结了。

上一篇下一篇

猜你喜欢

热点阅读