Spring+ActiveMQ 事务

2019-08-26  本文已影响0人  超人也害羞

一、思路

  1. 背景介绍
  2. spring 调用链路 + activemq事务链路介绍

二、背景介绍

写这篇文章背景是什么呢?或者说作者碰到了什么问题呢?是这样的,有个需求要在service层执行一些业务逻辑,如果失败了,则会抛出了一个RunTimeException(至于为什么会抛出RunTimeException就不纠结了),与此同时还需要向MQ发送一个Msg便于后续容错处理。而作者碰到的问题就出现了,jdbc的事务随着RunTimeException异常的抛出回滚了,JmsTemplate的事务也回滚了,导致事务发送失败。之前也没有研究过Spring是如何管理MQ的事务,以为它们是独立的,没想到在这里踩了一个坑。没办法,那就debug源码看看。
下面是spring-jms的配置,注意其中的sessionTransacted属性配置成了true,表示jms的事务由spring托管。

     <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="pubSubDomain" value="false"/>
        <property name="deliveryPersistent" value="true"/>
        <property name="sessionTransacted" value="true"/>
    </bean>

下面是Service层的方法,由spring管理jdbc的事务,其中的ToleranceProducer使用JmsTemplate发送消息。

@Resource
private ToleranceProducer toleranceProducer;
// 某个service的方法,这个方法由spring管理jdbc的事务。
public Object xxx(){
        try {
            // .... 一大坨业务          
            transactionManager.commit(transactionForOrder);
        } catch (Exception e) {
            if (e instanceof CustomFailedException) {
                Object failedObject = e.getFailedObject();
                toleranceProducer.sendMessage(failedObject);
                throw e;
            }
            return something;
        }
}

三、spring 调用链路 + activemq事务链路介绍

我们知道spring的事务管理机制也是依赖于AOP,就先从这出发看看到底做了啥。
可以看到TransactionInteracptor也是采用了代理模式,执行真正的业务逻辑之前先开启事务。继续debug看看invokeWithinTransaction中做了什么。

//来自 org.springframework.transaction.interceptor.TransactionInterceptor
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
        Method var10001 = invocation.getMethod();
        invocation.getClass();
        // 开始失误并执行业务逻辑
        return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
    }

ok,这里可以看到spring开启一个新的事务,并调用代码对象执行业务逻辑代码,如果捕获到异常就回滚事务。看看completeTransactionAfterThrowing中发生了什么有趣的事。

    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
        // 开启新事务或者从当前线程获取事务
        TransactionAttributeSource tas = this.getTransactionAttributeSource();
        TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
        PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
        String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
        Object result;
        if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
            // 忽略,这里不关心
        } else {
            // 事务开启
            TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            result = null;

            try {
                // 执行业务逻辑
                result = invocation.proceedWithInvocation();
            } catch (Throwable var17) {
                // 如果执行业务代码过程中抛出了异常那么就就行回滚
                this.completeTransactionAfterThrowing(txInfo, var17);
                throw var17;
            } finally {
                this.cleanupTransactionInfo(txInfo);
            }

            this.commitTransactionAfterReturning(txInfo);
            return result;
        }
        // 省略其他else了,这里不关心
    }

哈哈,其实也没啥,调用TransactionManager进行事务回滚。继续debug。

    protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
            }

            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    // 调用TransactionManager回滚事务
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                } catch (TransactionSystemException var6) {
                    this.logger.error("Application exception overridden by rollback exception", ex);
                    var6.initApplicationException(ex);
                    throw var6;
                } catch (Error | RuntimeException var7) {
                    this.logger.error("Application exception overridden by rollback exception", ex);
                    throw var7;
                }
            } else {
                // don't care
            }
        }
    }

调用this.processRollback继续回滚。。。说的我都烦了,贴个栈信息自行观赏。


Spring回滚调用栈

ok,省略一大堆回滚的调用栈了,到关键的方法。

    public void afterCompletion(int status) {
        if (this.shouldUnbindAtCompletion()) {
            boolean releaseNecessary = false;
            if (this.holderActive) {
                this.holderActive = false;
                TransactionSynchronizationManager.unbindResourceIfPossible(this.resourceKey);
                this.resourceHolder.unbound();
                releaseNecessary = true;
            } else {
                releaseNecessary = this.shouldReleaseAfterCompletion(this.resourceHolder);
            }

            if (releaseNecessary) {
                // 释放事务同步器中的资源
                this.releaseResource(this.resourceHolder, this.resourceKey);
            }
        } else {
            this.cleanupResource(this.resourceHolder, this.resourceKey, status == 0);
        }

        this.resourceHolder.reset();
    }

看到resource中赫然就有我们要找的MQ连接信息。继续看在releaseResource中发生了啥?


Resource信息

一系列方法调用后,我们发现最终调用了Spring Jms包中的ConnectionFactory方法,对JMS的事务进行了回滚。

// from org.springframework.jms.connection.CachingConnectionFactory
        private void logicalClose(Session proxy) throws JMSException {
            // Preserve rollback-on-close semantics.
            if (this.transactionOpen && this.target.getTransacted()) {
                this.transactionOpen = false;
                this.target.rollback();
            }
            // Physically close durable subscribers at time of Session close call.
            for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
                Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
                if (entry.getKey().subscription != null) {
                    entry.getValue().close();
                    it.remove();
                }
            }
            // Allow for multiple close calls...
            boolean returned = false;
            synchronized (this.sessionList) {
                if (!this.sessionList.contains(proxy)) {
                    this.sessionList.addLast(proxy);
                    returned = true;
                }
            }
            if (returned && logger.isTraceEnabled()) {
                logger.trace("Returned cached Session: " + this.target);
            }
        }

到这里为什么JMS事务会随着JDBC的事务回滚了就一目了然了。但是还有一个问题,TransactionSynchronizationManager事务管理器中resource的MQ connection信息是哪儿来的?
又经过一番debug后,在JmsTransactionManager中终于找到了MQ是如何注册事务到TransactionSynchronizationManager中的。

    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
        }

        ConnectionFactory connectionFactory = obtainConnectionFactory();
        JmsTransactionObject txObject = (JmsTransactionObject) transaction;
        Connection con = null;
        Session session = null;
        try {
            JmsResourceHolder resourceHolder;
            if (this.lazyResourceRetrieval) {
                resourceHolder = new LazyJmsResourceHolder(connectionFactory);
            }
            else {
                con = createConnection();
                session = createSession(con);
                if (logger.isDebugEnabled()) {
                    logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
                }
                resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
            }
            resourceHolder.setSynchronizedWithTransaction(true);
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                resourceHolder.setTimeoutInSeconds(timeout);
            }
            txObject.setResourceHolder(resourceHolder);
                        // 注册当前MQ事务到当前线程的事务管理器中。
            TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
        }
        catch (Throwable ex) {
            if (session != null) {
                try {
                    session.close();
                }
                catch (Throwable ex2) {
                    // ignore
                }
            }
            if (con != null) {
                try {
                    con.close();
                }
                catch (Throwable ex2) {
                    // ignore
                }
            }
            throw new CannotCreateTransactionException("Could not create JMS transaction", ex);
        }
    }

OK,到此为止,作者之前碰到问题就有了答案了。但是debug过程中对spring事务倒是有研究的兴趣了,下次我们来说Spring事务的细节。

上一篇下一篇

猜你喜欢

热点阅读