记使用spring事务产生的问题

2021-04-13  本文已影响0人  dy_only

看源码大部分都要从开启某种服务的注解上看,找到源头才能继续。
所以看事务的源码就在Application启动类上找到注解@EnableTransactionManagement

本次整理是出于在实际应用中出现的问题,进行排查问题时顺便总结的。有错误的地方请指正。

遇到的具体问题是这样的:

有几个要素:

1.系统中有全局的线程,使用切面匹配的所有service。
2.数据库连接池最大活动数量是50,最大超时时间是一分钟。
3.系统中有一个全局连接池,最大活动的线程数量也是50

问题发生的现场

1.在线程池中放了50+的任务,这些任务都配置了@Transactional(propagation = Propagation.REQUIRES_NEW)
2.切面会先创建50个事务,此时获取了50个数据库连接。
3.在执行方法时,扫描到注解中配置了Propagation.REQUIRES_NEW,所以要将外部事务挂起,开启新事务。
4.问题来了,新事务在获取数据库连接时,可用资源不足,进行等待。
5.一分钟后个别任务超时,其余任务有可能获得连接继续执行。

解决方案

1.慎用Propagation.REQUIRES_NEW
2.合理配置线程池与数据库连接池配置
3.做压力测试

爆出的异常信息

maxWaitThreadCount 50 , current wait Thread count 50

下面是分享

EnableTransactionManagement中使用@Import注解可以将TransactionManagementConfigurationSelector实现的selectImports方法返回对象交给SpringIOC管理(后续再深究如何交给spring的)。默认返回的是AutoProxyRegistrar与ProxyTransactionManagementConfiguration接下来关注这两个类做了些什么事情。


EnableTransactionManagement(编译后) ProxyTransactionManagementConfiguration(编译后)

AutoProxyRegistrar

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    boolean candidateFound = false;
    //获取启动类注解
    Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
    for (String annType : annTypes) {
        //拿到注解中的属性
        AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
        if (candidate == null) {
            continue;
        }
        Object mode = candidate.get("mode");
        Object proxyTargetClass = candidate.get("proxyTargetClass");
        if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
                Boolean.class == proxyTargetClass.getClass()) {
            //符合开启事务的注解 进行标记候选
            candidateFound = true;
            if (mode == AdviceMode.PROXY) {
                AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
                if ((Boolean) proxyTargetClass) {
                    AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                    return;
                }
            }
        }
    }
一点log....
}

ProxyTransactionManagementConfiguration

@Role(BeanDefinition.ROLE_INFRASTRUCTURE)涉及到AOP的功能,后续再进行分析吧。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

    @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
        //创建切面用的
        BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
        //组装对Transaction的增强,包括readOnly、timeout、rollbackFor、rollbackForClassName等
        advisor.setTransactionAttributeSource(transactionAttributeSource());
        advisor.setAdvice(transactionInterceptor());
        if (this.enableTx != null) {
            advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
        }
        return advisor;
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionAttributeSource transactionAttributeSource() {
        return new AnnotationTransactionAttributeSource();
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionInterceptor transactionInterceptor() {
        //事务处理的核心类
        TransactionInterceptor interceptor = new TransactionInterceptor();
        interceptor.setTransactionAttributeSource(transactionAttributeSource());
        if (this.txManager != null) {
            interceptor.setTransactionManager(this.txManager);
        }
        return interceptor;
    }
}

经过服务的启动,spring自己将各种组件帮我们初始化好,配置好。接下来看看在运行时它是如何管理事务的。

TransactionInterceptor 事务拦截器

AOP可以使用自定义注解(切点)+interceptor(增强Advice)构成织入(DefaultPointcutAdvisor)来实现。
TransactionInterceptor实现了MethodInterceptor中的invoke方法,所以当代理对象执行目标方法时,会执行invoke方法,在invoke方法中最重要的是invokeWithinTransaction。

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    //获取注解参数
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    //确定事务管理器
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    //进行事务处理的方法全限定名 path.class.method
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        //创建数据库链接,获取事务,修改AutoCommit为false,此处为处理数据库资源的核心代码,最重要的是tm.getTransaction方法
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            //进行接下来的调用,如没啥意外会直接调用目标方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);
        }
        //AbstractPlatformTransactionManager 提交事务,包括回滚
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
略....
}

核心方法:AbstractPlatformTransactionManager.getTransaction 让我们继续深入

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    //获取一个数据库连接
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    if (definition == null) {
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }
    //判断这个链接是否已经存在事务了,如果存在事务则进行特殊处理,这里要注意TransactionDefinition.PROPAGATION_REQUIRES_NEW的情况
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //核心方法
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

其中doBegin是实际执行的方法,构建了事务的具体实现。需要注意的是txObject.getConnectionHolder().getConnection();方法,这个方法在资源不够的情况下会循环不断的去获取数据库连接,导致程序超时。

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        if (!txObject.hasConnectionHolder() ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        //此处会去数据库中去获取连接,会不断的去获取
        con = txObject.getConnectionHolder().getConnection();

        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        //关闭自动提交
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }

        prepareTransactionalConnection(con, definition);
        //开启事务设置参数
        txObject.getConnectionHolder().setTransactionActive(true);

        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // Bind the connection holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

获取完事务后,会执行retVal = invocation.proceedWithInvocation();进行后续的调用,最后调用commitTransactionAfterReturning(txInfo);进行事务提交或回滚的操作。

上一篇 下一篇

猜你喜欢

热点阅读