Spring事务——事务原理源码分析

2023-02-21  本文已影响0人  小波同学

前言

先贴一张从网上找到的Spring事务图,因为源码比较长,结合图看的话,不容易看到后边忘记


以SpringBoot为例,看下SpringBoot是怎么做的。

第一部分:Spring生成事务代理增强类源码分析


一、/META-INF/spring.factories

要想实现某个功能,首先要做的肯定是把bean加载到容器中,Spring通过[spring.factories的方式可以加载一些特定的bean

关于事务的bean请看下图:

TransactionAutoConfiguration、JtaAutoConfiguration

二、TransactionAutoConfiguration

@Configuration
// 类路径下包含PlatformTransactionManager本类生效
@ConditionalOnClass(PlatformTransactionManager.class)
// 在以下四个类本加载后生效
@AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
        DataSourceTransactionManagerAutoConfiguration.class,
        Neo4jDataAutoConfiguration.class })
// 确保前缀为 spring.transaction 的事务有关属性配置项被加载到 bean TransactionProperties
@EnableConfigurationProperties(TransactionProperties.class)
public class TransactionAutoConfiguration {

    
    @Bean
    // 仅在该 bean 不存在时才定义
    @ConditionalOnMissingBean
    public TransactionManagerCustomizers platformTransactionManagerCustomizers(
            ObjectProvider<PlatformTransactionManagerCustomizer<?>> customizers) {
        return new TransactionManagerCustomizers(
                customizers.orderedStream().collect(Collectors.toList()));
    }

    @Configuration
    // 当能够唯一确定一个PlatformTransactionManager bean时才生效
    @ConditionalOnSingleCandidate(PlatformTransactionManager.class)
    public static class TransactionTemplateConfiguration {

        private final PlatformTransactionManager transactionManager;

        public TransactionTemplateConfiguration(
                PlatformTransactionManager transactionManager) {
            this.transactionManager = transactionManager;
        }

        @Bean
        // 仅在该 bean 不存在时才生效
        @ConditionalOnMissingBean
        public TransactionTemplate transactionTemplate() {
            return new TransactionTemplate(this.transactionManager);
        }

    }

    // 内部类
    @Configuration
    @ConditionalOnBean(PlatformTransactionManager.class)
    @ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
    public static class EnableTransactionManagementConfiguration {

        @Configuration
        // 非常重要的事务注解,下一步要进入这个注解看
        // 在属性 spring.aop.proxy-target-class 被明确设置为 false 时启用注解 
        @EnableTransactionManagement(proxyTargetClass = false)
        @ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false", matchIfMissing = false)
        public static class JdkDynamicAutoProxyConfiguration {

        }

        @Configuration
        // 非常重要的事务注解,下一步要进入这个注解看
        // 在属性 spring.aop.proxy-target-class 缺失或者被明确设置为 true 时启用注解 
        @EnableTransactionManagement(proxyTargetClass = true)
        // matchIfMissing 为true,因为默认是没有配置的,所以spring默认走的是cglib代理
        @ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true", matchIfMissing = true)
        public static class CglibAutoProxyConfiguration {

        }

    }

}

三、@EnableTransactionManagement

到这里我们第一个问题已经有答案了,AdviceMode 默认使用的是PROXY,同一个类中方法调用,拦截器不会生效,所以我们在同一类中即使调用一个事务方法,方法拦截器不会生效,事务就不会生效。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({TransactionManagementConfigurationSelector.class})
public @interface EnableTransactionManagement {

    // proxyTargetClass = false表示是JDK动态代理支持接口代理。true表示是Cglib代理支持子类继承代理。
    boolean proxyTargetClass() default false;

    // 事务通知模式(切面织入方式),默认代理模式(同一个类中方法互相调用拦截器不会生效),可以选择增强型AspectJ
    AdviceMode mode() default AdviceMode.PROXY;

    // 连接点上有多个通知时,排序,默认最低。值越大优先级越低。
    int order() default 2147483647;
}

这里通过@Import导入了TransactionManagementConfigurationSelector类

四、TransactionManagementConfigurationSelector

如上图所示,TransactionManagementConfigurationSelector继承自AdviceModeImportSelector实现了ImportSelector接口

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
    public TransactionManagementConfigurationSelector() {
    }

    protected String[] selectImports(AdviceMode adviceMode) {
        switch(adviceMode) {
        // 默认会走这里,载入了AutoProxyRegistrar、ProxyTransactionManagementConfiguration2个类
        case PROXY:
            return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
        case ASPECTJ:
            return new String[]{this.determineTransactionAspectClass()};
        default:
            return null;
        }
    }

    //。。。。。。
}

4.1、AutoProxyRegistrar

实现了ImportBeanDefinitionRegistrar接口,重写registerBeanDefinitions方法,源码如下:

最终调用的是:registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);源码自己看下,因为关联关系不大,就不展开了,这个方法的目的是注册InfrastructureAdvisorAutoProxyCreator的BeanDefinitio

public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {

    private final Log logger = LogFactory.getLog(getClass());

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        boolean candidateFound = false;
        Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
        for (String annType : annTypes) {
            AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
            if (candidate == null) {
                continue;
            }
            Object mode = candidate.get("mode");
            Object proxyTargetClass = candidate.get("proxyTargetClass");
            if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
                    Boolean.class == proxyTargetClass.getClass()) {
                candidateFound = true;
                // 默认代理模式,进入到这个方法
                if (mode == AdviceMode.PROXY) {
                    // 最终调用的是:registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);基础构建增强自动代理构造器
                    AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
                    if ((Boolean) proxyTargetClass) {
                        AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                        return;
                    }
                }
            }
        }
        if (!candidateFound && logger.isInfoEnabled()) {
            String name = getClass().getSimpleName();
        }
    }

}

4.2、InfrastructureAdvisorAutoProxyCreator

Infrastructure:基础设施

看下类继承结构,这里需要首先了解下Spring容器的加载过程,了解BeanPostProcessor接口的作用

因为间接实现了InstantiationAwareBeanPostProcessor,AbstractAutoProxyCreator(InfrastructureAdvisorAutoProxyCreator继承了它)重写了接口两个方法,对bean进行了一些操作:

4.3、AbstractAutoProxyCreator

public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
        implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {
        
    /**
     * 在创建Bean的流程中还没调用构造器来实例化Bean的时候进行调用(实例化前后)
     * AOP解析切面以及事务解析事务注解都是在这里完成的
     */
    @Override
    public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) {
        //获取BeanClass的缓存key
        Object cacheKey = getCacheKey(beanClass, beanName);

        if (!StringUtils.hasLength(beanName) || !this.targetSourcedBeans.contains(beanName)) {
            //advisedBeans保存了所有已经做过动态代理的Bean
            // 如果被解析过则直接返回
            if (this.advisedBeans.containsKey(cacheKey)) {
                return null;
            }
            // 1. 判断当前bean是否是基础类型:是否实现了Advice,Pointcut,Advisor,AopInfrastructureBean这些接口或是否是切面(@Aspect注解)
            // 2. 判断是不是应该跳过 (AOP解析直接解析出我们的切面信息,
            // 而事务在这里是不会解析的)
            if (isInfrastructureClass(beanClass) || shouldSkip(beanClass, beanName)) {
                //添加进advisedBeans ConcurrentHashMap<k=Object,v=Boolean>标记是否需要增强实现,
                // 这里基础构建bean不需要代理,都置为false,供后面postProcessAfterInitialization实例化后使用。
                this.advisedBeans.put(cacheKey, Boolean.FALSE);
                return null;
            }
        }


        //获取用户自定义的targetSource, 如果存在则直接在对象实例化之前进行代理创建,
        // 避免了目标对象不必要的实例化
        TargetSource targetSource = getCustomTargetSource(beanClass, beanName);
        //如果有自定义targetSource就要这里创建代理对象
        //这样做的好处是被代理的对象可以动态改变,而不是值针对一个target对象(可以对对象池中对象进行代理,可以每次创建代理都创建新对象
        if (targetSource != null) {
            if (StringUtils.hasLength(beanName)) {
                this.targetSourcedBeans.add(beanName);
            }
            //获取Advisors, 这个是交给子类实现的
            Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(beanClass, beanName, targetSource);
            Object proxy = createProxy(beanClass, beanName, specificInterceptors, targetSource);
            this.proxyTypes.put(cacheKey, proxy.getClass());
            //返回代理的对象
            return proxy;
        }

        return null;
    }

    @Override
    public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
        if (bean != null) {
            Object cacheKey = getCacheKey(bean.getClass(), beanName);
            //当 Bean 被循环引用, 并且被暴露了,
            // 则会通过 getEarlyBeanReference 来创建代理类;
            // 通过判断 earlyProxyReferences 中
            // 是否存在 beanName 来决定是否需要对 target 进行动态代理
            if (this.earlyProxyReferences.remove(cacheKey) != bean) {
                //该方法将会返回代理类
                return wrapIfNecessary(bean, beanName, cacheKey);
            }
        }
        return bean;
    }

    /**
     * Spring实现Bean代理的核心方法。wrapIfNecessary在两处会被调用,一处是getEarlyBeanReference,
     * 另一处是postProcessAfterInitialization
     */
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        //已经被处理过
        // 1.判断当前bean是否在targetSourcedBeans缓存中存在(已经处理过),如果存在,则直接返回当前bean
        if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
            return bean;
        }
        //不需要被织入逻辑的
        // 2.在advisedBeans缓存中存在,并且value为false,则代表无需处理
        if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
            return bean;
        }
        //是不是基础的bean 是不是需要跳过的
        // 3.bean的类是aop基础设施类 || bean应该跳过,则标记为无需处理,并返回
        if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
            this.advisedBeans.put(cacheKey, Boolean.FALSE);
            return bean;
        }

        // Create proxy if we have advice.
        // 返回匹配当前Bean的所有Advice\Advisor\Interceptor
        Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
        // 5.如果存在增强器则创建代理
        if (specificInterceptors != DO_NOT_PROXY) {
            this.advisedBeans.put(cacheKey, Boolean.TRUE);
            //创建Bean对应的代理,SingletonTargetSource用于封装实现类的信息
            // 5.1 创建代理对象:这边SingletonTargetSource的target属性存放的就是我们原来的bean实例(也就是被代理对象),
            // 用于最后增加逻辑执行完毕后,通过反射执行我们真正的方法时使用(method.invoke(bean, args))
            Object proxy = createProxy(
                    bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
            // 5.2 创建完代理后,将cacheKey -> 代理类的class放到缓存
            this.proxyTypes.put(cacheKey, proxy.getClass());
            // 返回代理对象
            return proxy;
        }
        //该Bean是不需要进行代理的,下次就不需要重复生成了
        this.advisedBeans.put(cacheKey, Boolean.FALSE);
        return bean;
    }
}   

4.4 createProxy

public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
        implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {

    protected Object createProxy(Class<?> beanClass, @Nullable String beanName,
            @Nullable Object[] specificInterceptors, TargetSource targetSource) {
        //如果beanFactory是ConfigurableListableBeanFactory的类型,暴露目标类
        if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
            AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
        }
        //创建一个ProxyFactory,当前ProxyCreator在创建代理时将需要用到的字段赋值到ProxyFactory中去
        ProxyFactory proxyFactory = new ProxyFactory();
        //将 当前的AnnotationAwareAspectJAutoProxyCreator 对象的属性赋值给ProxyFactory对象
        proxyFactory.copyFrom(this);
        // 处理 proxyTargetClass 属性
        // 如果希望使用 CGLIB 来代理接口,可以配置
        // proxy-target-class="true",这样不管有没有接口,都使用 CGLIB 来生成代理:
        // <aop:config proxy-target-class="true"></aop:config>
        if (!proxyFactory.isProxyTargetClass()) {
            // 检查preserveTargetClass属性,判断beanClass是应该基于类代理还是基于接口代理
            if (shouldProxyTargetClass(beanClass, beanName)) {
                // 如果是基于类代理,则将proxyTargetClass赋值为true
                proxyFactory.setProxyTargetClass(true);
            }
            else {
                // 评估bean的代理接口
                // 1. 有接口的,调用一次或多次:proxyFactory.addInterface(ifc);
                // 2. 没有接口的,调用:proxyFactory.setProxyTargetClass(true);
                evaluateProxyInterfaces(beanClass, proxyFactory);
            }
        }

        // 这个方法主要来对前面传递进来的横切逻辑实例进行包装
        // 注意:如果 specificInterceptors 中有 Advice 和 Interceptor,它们也会被包装成 Advisor
        // 方法会整理合并得到最终的advisors (毕竟interceptorNames还指定了一些拦截器的)
        // 至于调用的先后顺序,通过方法里的applyCommonInterceptorsFirst参数可以进行设置,
        // 若applyCommonInterceptorsFirst为true,interceptorNames属性指定的Advisor优先调用。默认为true
        Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
        // 将advisors添加到proxyFactory
        proxyFactory.addAdvisors(advisors);
        // 设置要代理的类,将targetSource赋值给proxyFactory的targetSource属性,之后可以通过该属性拿到被代理的bean的实例
        proxyFactory.setTargetSource(targetSource);
        // 这个方法是交给子类的,子类可以继续去定制此proxyFactory
        customizeProxyFactory(proxyFactory);

        // 用来控制proxyFactory被配置之后,是否还允许修改通知。默认值为false(即在代理被配置之后,不允许修改代理类的配置)
        proxyFactory.setFrozen(this.freezeProxy);
        // 设置preFiltered的属性值,默认是false。子类:AbstractAdvisorAutoProxyCreator修改为true
        // preFiltered字段意思为:是否已为特定目标类筛选Advisor
        // 这个字段和DefaultAdvisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice获取所有的Advisor有关
        //CglibAopProxy和JdkDynamicAopProxy都会调用此方法,然后递归执行所有的Advisor
        if (advisorsPreFiltered()) {
            proxyFactory.setPreFiltered(true);
        }

        // 使用proxyFactory获取代理
        return proxyFactory.getProxy(getProxyClassLoader());
    }
}

五、看到这里,大家可能对createProxy 有些疑问,我们还没有定义Advisor呀,Advisor是哪来的?

我们再回到第四步看一下,之前4.1、4.2、4.3、4.4看的全都是AutoProxyRegistrar源码,下边分析下ProxyTransactionManagementConfiguration是干嘛的。

第一个方法就是定义事务增强器的,返回的BeanFactoryTransactionAttributeSourceAdvisor对象其实是间接实现了Advisor接口的,大家可以看下类图。

最终,加载了一个事务的Advisor,这个里边配置了Advice:TransactionInterceptor,真正方法执行时会拦截到。

注意:BeanFactoryTransactionAttributeSourceAdvisor实现的是PointcutAdvisor接口,既可以设置Advice,又可以设置Pointcut。
但是下面代码中只配置了Adice,而pointcut是由BeanFactoryTransactionAttributeSourceAdvisor类完成的配置。

@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
    public ProxyTransactionManagementConfiguration() {
    }


    /**
     * 定义事务增强器 注册了Advisor
     */
    @Bean(
        name = {"org.springframework.transaction.config.internalTransactionAdvisor"}
    )
    //声明的role为基础类(Spring内部使用的类)
    //int ROLE_INFRASTRUCTURE = 2;
    @Role(2)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
        BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
        advisor.setTransactionAttributeSource(this.transactionAttributeSource());
        //配置Advice 
        // 这里的定义了事务拦截器
        advisor.setAdvice(this.transactionInterceptor());
        if (this.enableTx != null) {
            advisor.setOrder((Integer)this.enableTx.getNumber("order"));
        }

        return advisor;
    }


    /**
     * 定义基于注解的事务属性资源
     */
    @Bean
    @Role(2)
    public TransactionAttributeSource transactionAttributeSource() {
        return new AnnotationTransactionAttributeSource();
    }



    /**
     * 定义事务拦截器 注册了Advice
     */
    @Bean
    @Role(2)
    public TransactionInterceptor transactionInterceptor() {
        TransactionInterceptor interceptor = new TransactionInterceptor();
        interceptor.setTransactionAttributeSource(this.transactionAttributeSource());
        if (this.txManager != null) {
            interceptor.setTransactionManager(this.txManager);
        }

        return interceptor;
    }
}
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

    @Nullable
    private TransactionAttributeSource transactionAttributeSource;

    //pointcut 是TransactionAttributeSourcePointcut 的子类,实现了TransactionAttributeSource 方法。
    //该方法的作用,是用户可以配置`解析器`。
    private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
        @Override
        @Nullable
        protected TransactionAttributeSource getTransactionAttributeSource() {
            return transactionAttributeSource;
        }
    };

    public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
        this.transactionAttributeSource = transactionAttributeSource;
    }

    @Override
    public Pointcut getPointcut() {
        return this.pointcut;
    }

}

六、Spring解析事务注解

6.1 方法匹配

如何判断bean方法符合规则?判断是否能获取事务属性对象。

自动代理器会遍历bean的所有方法,判断是否与MethodMatcher匹配。实际上会调用TransactionAttributeSourcePointcut的matches方法。

abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {

    @Override
    public boolean matches(Method method, Class<?> targetClass) {
        if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
                PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
                PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
            return false;
        }
        //若TransactionAttributeSource 对象不为空,那么获取方法上的事务属性对象。
        TransactionAttributeSource tas = getTransactionAttributeSource();
        return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
    }


    /**
     * 抽象方法,但是注册的Advisor中的pointcut是TransactionAttributeSourcePointcut 子类。
     * 实际上获取的是TransactionAttributeSource 对象
     */
    @Nullable
    protected abstract TransactionAttributeSource getTransactionAttributeSource();

}

6.2 事务属性获取

解析事务注解,获取事务属性对象。

实际上TransactionAttributeSource为3.1配置的AnnotationTransactionAttributeSource对象。
当然这个类实现的只是特有的方法,而算法骨干由其父类AbstractFallbackTransactionAttributeSource实现。

注意getTransactionAttribute方法返回null,则证明pointcut不匹配bean的某方法。

public abstract class AbstractFallbackTransactionAttributeSource implements TransactionAttributeSource {

    @Override
    @Nullable
    public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        //若是Object的方法,直接返回null
        if (method.getDeclaringClass() == Object.class) {
            return null;
        }

        // 首先看缓存是否存在
        Object cacheKey = getCacheKey(method, targetClass);
        TransactionAttribute cached = this.attributeCache.get(cacheKey);
        if (cached != null) {
            // Value will either be canonical value indicating there is no transaction attribute,
            // or an actual transaction attribute.
            if (cached == NULL_TRANSACTION_ATTRIBUTE) {
                return null;
            }
            else {
                return cached;
            }
        }
        else {
            //获取TransactionAttribute 对象的核心方法(如下所示)
            TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
            //填充缓存
            if (txAttr == null) {
                this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
            }
            else {
                String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
                if (txAttr instanceof DefaultTransactionAttribute) {
                    ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
                }
                this.attributeCache.put(cacheKey, txAttr);
            }
            return txAttr;
        }
    }
    
    @Nullable
    protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        // 该方法是否要求必须是public级别(因为自动代理会对所有方法进行代匹配)
        if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
            return null;
        }

        //获取明确类的方法(处理桥接方法)
        Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

        //由子类实现(获取方法上的事务配置)
        TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
        if (txAttr != null) {
            return txAttr;
        }

        //由子类实现(获取类上的事务配置)
        txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
        if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
            return txAttr;
        }

        //获取原始方法上的事务配置
        if (specificMethod != method) {
            // Fallback is to look at the original method.
            txAttr = findTransactionAttribute(method);
            if (txAttr != null) {
                return txAttr;
            }
            // Last fallback is the class of the original method.
            txAttr = findTransactionAttribute(method.getDeclaringClass());
            if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                return txAttr;
            }
        }

        return null;
    }
}   

下图的两个抽象方法,由子类实现。


还需要注意的一个细节,AnnotationTransactionAttributeSource是其唯一子类。


public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource
        implements Serializable {


    private final Set<TransactionAnnotationParser> annotationParsers;


    public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
        this.publicMethodsOnly = publicMethodsOnly;
        if (jta12Present || ejb3Present) {
            this.annotationParsers = new LinkedHashSet<>(4);
            this.annotationParsers.add(new SpringTransactionAnnotationParser());
            if (jta12Present) {
                this.annotationParsers.add(new JtaTransactionAnnotationParser());
            }
            if (ejb3Present) {
                this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
            }
        }
        else {
            //通用模式下,为SpringTransactionAnnotationParser解析器
            this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
        }
    }


    @Override
    @Nullable
    protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
        return determineTransactionAttribute(clazz);
    }

    @Override
    @Nullable
    protected TransactionAttribute findTransactionAttribute(Method method) {
        return determineTransactionAttribute(method);
    }

    //遍历解析器,获取事务注解
    @Nullable
    protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
        for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
            TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
            if (attr != null) {
                return attr;
            }
        }
        return null;
    }
}

parseTransactionAnnotation()方法,遍历方法/类上的事务注解。获取到TransactionAttribute对象。

public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {

    @Override
    @Nullable
    public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
        AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
                element, Transactional.class, false, false);
        if (attributes != null) {
            return parseTransactionAnnotation(attributes);
        }
        else {
            return null;
        }
    }
}

若MethodMatcher返回true,则证明该方法可以被事务增强器去增强。将Advisor加入到List中,并开始处理下一个Advisor。最终获取到所有可切入的Advisor。去创建代理对象。

总结

第二部分:Spring 事务执行


上面讲到 TransactionManagementConfigurationSelector加载了ProxyTransactionManagementConfiguration,里边注册了一个bean:TransactionInterceptor,实现了MethodInterceptor(Spring方法拦截器),事务就是在这个类中进行处理的,首先看一下类图


因为实现了MethodInterceptor,看下invoke()方法,方法拦截的核心代码。

一、invoke()

调用了父类TransactionAspectSupport的invokeWithinTransaction()方法

invokeWithinTransaction方法的大致流程是这样的:

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        // 调用TransactionAspectSupport的 invokeWithinTransaction方法
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }
}

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        // 如果transaction attribute为空,该方法就是非事务(非编程式事务)
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        // 标准声明式事务:如果事务属性为空 或者 非回调偏向的事务管理器
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // 这里就是一个环绕增强,在这个proceed前后可以自己定义增强实现
                // 实现类 ReflectiveMethodInvocation
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 根据事务定义的,该异常需要回滚就回滚,否则提交事务
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                //清空当前事务信息,重置为老的
                cleanupTransactionInfo(txInfo);
            }
            //返回结果之前提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        // 从此往下是编程式事务,暂时不做了解
        else {
            final ThrowableHolder throwableHolder = new ThrowableHolder();

            // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                    TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                    try {
                        return invocation.proceedWithInvocation();
                    }
                    catch (Throwable ex) {
                        if (txAttr.rollbackOn(ex)) {
                            // A RuntimeException: will lead to a rollback.
                            if (ex instanceof RuntimeException) {
                                throw (RuntimeException) ex;
                            }
                            else {
                                throw new ThrowableHolderException(ex);
                            }
                        }
                        else {
                            // A normal return value: will lead to a commit.
                            throwableHolder.throwable = ex;
                            return null;
                        }
                    }
                    finally {
                        cleanupTransactionInfo(txInfo);
                    }
                });

                // Check result state: It might indicate a Throwable to rethrow.
                if (throwableHolder.throwable != null) {
                    throw throwableHolder.throwable;
                }
                return result;
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
            catch (TransactionSystemException ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    ex2.initApplicationException(throwableHolder.throwable);
                }
                throw ex2;
            }
            catch (Throwable ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                }
                throw ex2;
            }
        }
    }
}

总结

到此,Spring事务的流程就完了,上边提到的几个方法感兴趣的可以去翻一下源码

第三部分:PlatformTransactionManager


四、PlatformTransactionManager

jdbc主要用到的管理器是DataSourceTransactionManager,实现了PlatformTransactionManager,分析一下三个方法。


public interface PlatformTransactionManager {

    // 获取事务状态
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException;

    // 提交
    void commit(TransactionStatus status) throws TransactionException;

    // 回滚
    void rollback(TransactionStatus status) throws TransactionException;

}

五、getTransaction获取事务

Spring事务的传播行为实现就在这里,上图。


public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
        Object transaction = doGetTransaction();

        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // 事务属性为空,使用默认的属性
            definition = new DefaultTransactionDefinition();
        }

        // 如果当前已经存在事务
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            // 根据不同传播机制不同处理
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // 超时不能小于默认值
        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        // 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        // 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                // 是否需要开启同步
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                // 开启新事务
                doBegin(transaction, definition);
                //准备同步
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            // 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况,
            // 创建“空”事务:没有实际事务,但可能是同步。
            // 警告:定义了隔离级别,但并没有真实的事务初始化,
            // 隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化,
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
}

这里分成了两部分:

handleExistingTransaction()源码如下:

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    /**
     * Create a TransactionStatus for an existing transaction.
     */
    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        // 1.NERVER(不支持当前事务;如果当前事务存在,抛出异常)报错
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }

        // 2.NOT_SUPPORTED(不支持当前事务,现有同步将被挂起)挂起当前事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        // 3.REQUIRES_NEW挂起当前事务,创建新事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            // 挂起当前事务
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }

        // 4.NESTED嵌套事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            
            // 是否支持保存点:非JTA事务走这个分支。
            // AbstractPlatformTransactionManager默认是true,
            // JtaTransactionManager复写了该方法false,
            // DataSourceTransactionmanager没有复写,
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                // 创建保存点
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        // 到这里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,
        // 存在事务加入事务即可,prepareTransactionStatus第三个参数就是是否需要新事务。false代表不需要新事务
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
}

这里有两个核心方法

参考:
https://blog.csdn.net/StringBuff/article/details/111812282

https://blog.csdn.net/StringBuff/article/details/112094055

https://www.cnblogs.com/chenxingyang/p/15559352.html

上一篇下一篇

猜你喜欢

热点阅读