Spring+ActiveMQ 事务
一、思路
- 背景介绍
- spring 调用链路 + activemq事务链路介绍
二、背景介绍
写这篇文章背景是什么呢?或者说作者碰到了什么问题呢?是这样的,有个需求要在service层执行一些业务逻辑,如果失败了,则会抛出了一个RunTimeException(至于为什么会抛出RunTimeException就不纠结了),与此同时还需要向MQ发送一个Msg便于后续容错处理。而作者碰到的问题就出现了,jdbc的事务随着RunTimeException异常的抛出回滚了,JmsTemplate的事务也回滚了,导致事务发送失败。之前也没有研究过Spring是如何管理MQ的事务,以为它们是独立的,没想到在这里踩了一个坑。没办法,那就debug源码看看。
下面是spring-jms的配置,注意其中的sessionTransacted属性配置成了true,表示jms的事务由spring托管。
<bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="pubSubDomain" value="false"/>
<property name="deliveryPersistent" value="true"/>
<property name="sessionTransacted" value="true"/>
</bean>
下面是Service层的方法,由spring管理jdbc的事务,其中的ToleranceProducer使用JmsTemplate发送消息。
@Resource
private ToleranceProducer toleranceProducer;
// 某个service的方法,这个方法由spring管理jdbc的事务。
public Object xxx(){
try {
// .... 一大坨业务
transactionManager.commit(transactionForOrder);
} catch (Exception e) {
if (e instanceof CustomFailedException) {
Object failedObject = e.getFailedObject();
toleranceProducer.sendMessage(failedObject);
throw e;
}
return something;
}
}
三、spring 调用链路 + activemq事务链路介绍
我们知道spring的事务管理机制也是依赖于AOP,就先从这出发看看到底做了啥。
可以看到TransactionInteracptor也是采用了代理模式,执行真正的业务逻辑之前先开启事务。继续debug看看invokeWithinTransaction中做了什么。
//来自 org.springframework.transaction.interceptor.TransactionInterceptor
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method var10001 = invocation.getMethod();
invocation.getClass();
// 开始失误并执行业务逻辑
return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
}
ok,这里可以看到spring开启一个新的事务,并调用代码对象执行业务逻辑代码,如果捕获到异常就回滚事务。看看completeTransactionAfterThrowing中发生了什么有趣的事。
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
// 开启新事务或者从当前线程获取事务
TransactionAttributeSource tas = this.getTransactionAttributeSource();
TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
Object result;
if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
// 忽略,这里不关心
} else {
// 事务开启
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
result = null;
try {
// 执行业务逻辑
result = invocation.proceedWithInvocation();
} catch (Throwable var17) {
// 如果执行业务代码过程中抛出了异常那么就就行回滚
this.completeTransactionAfterThrowing(txInfo, var17);
throw var17;
} finally {
this.cleanupTransactionInfo(txInfo);
}
this.commitTransactionAfterReturning(txInfo);
return result;
}
// 省略其他else了,这里不关心
}
哈哈,其实也没啥,调用TransactionManager进行事务回滚。继续debug。
protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 调用TransactionManager回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException var6) {
this.logger.error("Application exception overridden by rollback exception", ex);
var6.initApplicationException(ex);
throw var6;
} catch (Error | RuntimeException var7) {
this.logger.error("Application exception overridden by rollback exception", ex);
throw var7;
}
} else {
// don't care
}
}
}
调用this.processRollback继续回滚。。。说的我都烦了,贴个栈信息自行观赏。
Spring回滚调用栈
ok,省略一大堆回滚的调用栈了,到关键的方法。
public void afterCompletion(int status) {
if (this.shouldUnbindAtCompletion()) {
boolean releaseNecessary = false;
if (this.holderActive) {
this.holderActive = false;
TransactionSynchronizationManager.unbindResourceIfPossible(this.resourceKey);
this.resourceHolder.unbound();
releaseNecessary = true;
} else {
releaseNecessary = this.shouldReleaseAfterCompletion(this.resourceHolder);
}
if (releaseNecessary) {
// 释放事务同步器中的资源
this.releaseResource(this.resourceHolder, this.resourceKey);
}
} else {
this.cleanupResource(this.resourceHolder, this.resourceKey, status == 0);
}
this.resourceHolder.reset();
}
看到resource中赫然就有我们要找的MQ连接信息。继续看在releaseResource中发生了啥?
Resource信息
一系列方法调用后,我们发现最终调用了Spring Jms包中的ConnectionFactory方法,对JMS的事务进行了回滚。
// from org.springframework.jms.connection.CachingConnectionFactory
private void logicalClose(Session proxy) throws JMSException {
// Preserve rollback-on-close semantics.
if (this.transactionOpen && this.target.getTransacted()) {
this.transactionOpen = false;
this.target.rollback();
}
// Physically close durable subscribers at time of Session close call.
for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
if (entry.getKey().subscription != null) {
entry.getValue().close();
it.remove();
}
}
// Allow for multiple close calls...
boolean returned = false;
synchronized (this.sessionList) {
if (!this.sessionList.contains(proxy)) {
this.sessionList.addLast(proxy);
returned = true;
}
}
if (returned && logger.isTraceEnabled()) {
logger.trace("Returned cached Session: " + this.target);
}
}
到这里为什么JMS事务会随着JDBC的事务回滚了就一目了然了。但是还有一个问题,TransactionSynchronizationManager事务管理器中resource的MQ connection信息是哪儿来的?
又经过一番debug后,在JmsTransactionManager中终于找到了MQ是如何注册事务到TransactionSynchronizationManager中的。
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
}
ConnectionFactory connectionFactory = obtainConnectionFactory();
JmsTransactionObject txObject = (JmsTransactionObject) transaction;
Connection con = null;
Session session = null;
try {
JmsResourceHolder resourceHolder;
if (this.lazyResourceRetrieval) {
resourceHolder = new LazyJmsResourceHolder(connectionFactory);
}
else {
con = createConnection();
session = createSession(con);
if (logger.isDebugEnabled()) {
logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
}
resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
}
resourceHolder.setSynchronizedWithTransaction(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
resourceHolder.setTimeoutInSeconds(timeout);
}
txObject.setResourceHolder(resourceHolder);
// 注册当前MQ事务到当前线程的事务管理器中。
TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
}
catch (Throwable ex) {
if (session != null) {
try {
session.close();
}
catch (Throwable ex2) {
// ignore
}
}
if (con != null) {
try {
con.close();
}
catch (Throwable ex2) {
// ignore
}
}
throw new CannotCreateTransactionException("Could not create JMS transaction", ex);
}
}
OK,到此为止,作者之前碰到问题就有了答案了。但是debug过程中对spring事务倒是有研究的兴趣了,下次我们来说Spring事务的细节。