Spring4.x 手动事务,监听处理未关闭事务的几点思路,文末
简书 慢黑八
转载请注明原创出处,谢谢!
如果读完觉得有收获的话,欢迎点赞加关注
背景
由于某项目独特的特色需要手动开启事务。然而,在手动开启事务后,事务能否正常结束 commit or rollback
就出现了各式各样的不确定情况。如果commit or rollback
未执行或执行失败,将会导致该事务持有的数据库连接无法正常归还到连接池中。高并发场景下的现象就是连接池中的可用连接越来越少,最后导致获取连接超时
的异常。
以下为手动事务工具类
@Service
public class TransactionTool {
//spring注入事务管理对象
@Resource(name = "transactionManager")
private PlatformTransactionManager transManager ;
public TransactionStatus getTransSatus(int propagate) {
// TransactionStatus.
// TransactionDefinition
// 事务定义
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 传播范围
def.setPropagationBehavior(propagate);
TransactionStatus transactionStatus = transManager.getTransaction(def);
return transactionStatus;
}
}
下面是开启事务的业务处理逻辑
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代码手动开启事务
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..业务逻辑
transactionManager.commit(transactionStatus);
}catch(Exception e){
transactionManager.rollback(transactionStatus);
}finally{
//略掉一些分库分表的特殊处理
}
}
}
主要导致事务没有正常结束的三种场景
- 场景 1、处理
业务逻辑
时,抛出的是Error
而不是Exception
,catch
接不住,导致rollback
不能正常执行,这也意味着事务无法正常回滚,造成连接泄露。 - 场景 2、处理
业务逻辑
时,未执行到commit
就return
了,这样也会导致了该事务没有正常结束,connection
没有正常归还连接池,造成泄露。 - 场景3、同一个方法中事务双开,双关,按照以下顺序执行
开启事务1(requires_new
)-> 然后开事务2(requires_new
) -> 之后提交事务1(commit
) -> 在提交事务2(commit
)
事务上下文状态切换如下:
TS=TransactionStatus TE=TransactionEvent T=Transaction
步骤 | 事务操作 | TransactionSynchronizationManager | 挂起\执行 |
---|---|---|---|
1 | TS1=getTransaction(REQUIRES_NEW) publish TE1 |
T1(con1)、TE1 | 挂起 NULL |
2 | TS2=getTransaction(REQUIRES_NEW) publish TE2 |
T2(con2)、TE2 | 挂起T1,TE1 |
3 | commit(TS1) | TE2执行,同步器清理T2 解挂步骤1挂起的null事务资源 |
执行T1.commit成功 con1归还连接池 |
4 | commit(TS2) | 当前事务资源为null导致同步器 事件处理出现异常,导致con2 不能正常归还到连接池,造成 连接泄露 |
执行 T2.commit失败 con2泄露 |
在开启事务1的时候挂起的事务资源为空,在commit
事务1的之后,会解挂当前线程的事务资源为:null
,提交事务2时候,如果当前线程的事务资源为null
,会抛空指针异常,最后在解绑资源unbindResource()
的时候抛出以下代码块中的IllegalStateException
异常(遗憾的是,该异常被spring框架捕获后没有打印出来)。最终导致事务2持有的连接不能正常释放。TransactionEvent
会在事务结束的时候执行当前TransactionSynchronizationManager
线程本地变量中的synchronizations
事件。
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
以上3中情况,在自动事务@Transactional的处理逻辑中都不会出现。首先spring-tx都进行了统一封装充分考虑了非正常的可以。其次,在嵌套事务双开的时候,都是先开的事务后关。所以,手动事务一定要遵循先开的事务后关这个原则
。
监控解决未关闭事务的几个思路
-
思路1:采用spring的
ApplicationEventPublisher
的事件发布监听机制。
订阅@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
事务完成阶段的监听,对“一定时间内”未关闭的事件进行预警,发现后整改。 -
思路2:在finally中对事务进行统一关闭。
调整catch
的范围,从Exception
修改为Throwable
捕捉到所有Exception
或者Error
的情况,把commit移动到finally中。commit的前置条件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted()
,这样会对所有 新建的且未完成的 事务进行commit
。如果小伙伴觉得思路2改动方式比较激进,想暂时先观察一下那些服务存在 事务未正常结束 的情况,可以参考思路3。
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代码手动开启事务
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..业务逻辑
}catch(Throwable t){
transactionManager.rollback(transactionStatus);
}finally{
//try..catch内容可提炼成公共方法
try {
if (transactionStatus != null && transactionStatus.isNewTransaction()
&& !transactionStatus.isCompleted()) {
//TODO: arms日志输出 堆栈相关信息
transactionManager.commit(transactionStatus);
}
} catch (Exception e) {
e.printStackTrace();
}
//略掉一些分库分表的特殊处理
}
}
}
-
思路3: 在finally中检查未完成的事物并进行预警。
预警的前提条件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted()
,这样会对所有 新建的且未完成的 事务进行预警日志信息输出。该思路在finally中增加try..catch块进行检查,对应用程序改动影响较小。
需要注意的是:这种方式仍然监控不到上文中场景3连接泄露的问题,如果想解决场景3的问题,需要从TransactionStatus
中获取事务对象,抽取ConnectionHolder
中的数据库Connection
,用conn.isClosed()
来判断连接是否已经关闭。另外还需要修改DataSourceTransactionManager
源码,把内部类DataSourceTransactionObject
的访问修饰符从private
修改为public
。
参考如下代码:
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代码手动开启事务
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..业务逻辑
transactionManager.commit(transactionStatus);
}catch(Throwable t){
transactionManager.rollback(transactionStatus);
}finally{
//try..catch内容可提炼成公共方法
try {
if (transactionStatus != null && transactionStatus.isNewTransaction()) {
if(!transactionStatus.isCompleted()) {
// arms日志输出 堆栈相关信息
System.out.println("事务未结束原因[事务-未完成]");
printStackTrace(Thread.currentThread().getStackTrace());
}else {
Connection conn = null;
DefaultTransactionStatus defaultTransactionStatus = (DefaultTransactionStatus)transactionStatus;
if(defaultTransactionStatus.getTransaction().getClass().getClassLoader() == DataSourceTransactionObject.class.getClassLoader()) {
conn = ((DataSourceTransactionObject)defaultTransactionStatus.getTransaction()).getConnectionHolder().getConnection();
if(conn != null && conn.isClosed()==false) {
System.out.println("事务未结束原因[连接-未关闭]");
printStackTrace(Thread.currentThread().getStackTrace());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
//略掉一些分库分表的特殊处理
}
}
}
接下来说下上面三种思路的可行性
[X ] 思路1,不可行
[ok] 思路2,可行
[ok] 思路3为过渡监控性的解决方案,可行
[ok] 思路2+思路3为最终解决方案,可行
思路1中,基于spring事件的发布订阅模式会存在什么问题?
使用spring的ApplicationEventPublisher
的事件发布监听机制。
订阅@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
事务完成阶段**的监听,对“一定时间内”未关闭的事件进行预警,发现后整改。
1、改造TransactionTool
在执行getTransSatus
方法时调用publishTransactionEvent(transactionStatus , propagate)
发布包含transactionId 的 "新事务事件" ,然后把需要监控的事务事件存放在aliveTransactionMap
中 。
@Service
public class TransactionTool {
private AtomicLong transactionId = new AtomicLong(0);
// transcatioId,BizTransactionEvent 存储存活的事务事件
public static ConcurrentHashMap<String, BizTransactionEvent> aliveTransactionMap =
new ConcurrentHashMap<String, BizTransactionEvent>();
//spring注入事务管理对象
@Resource(name = "transactionManager")
private PlatformTransactionManager transManager ;
@Autowired
private ApplicationEventPublisher publisher;
public TransactionStatus getTransSatus(int propagate) {
// TransactionStatus.
// TransactionDefinition
// 事务定义
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 传播范围
def.setPropagationBehavior(propagate);
TransactionStatus transactionStatus = transManager.getTransaction(def);
// 增加事务监听
publishTransactionEvent(transactionStatus , propagate);
return transactionStatus;
}
public void publishEvent(long tid,int propagate) {
long temp = tid;
StackTraceElement[] stackTraceElementArray = Thread.currentThread().getStackTrace();
if(stackTraceElementArray.length>2) {
if(transactionId.longValue() == Long.MAX_VALUE) {
transactionId.compareAndSet(Long.MAX_VALUE, 0);
}
BizTransactionEvent bizTransactionEvent = new BizTransactionEvent();
bizTransactionEvent.setTransactionId(""+temp);
bizTransactionEvent.setTransactionName(stackTraceElementArray[3].getClassName()+":"
+stackTraceElementArray[3].getMethodName()+":"+stackTraceElementArray[3].getLineNumber());
bizTransactionEvent.setCurrentTimeMillis(System.currentTimeMillis());
bizTransactionEvent.setStackTraceElement(stackTraceElementArray);
bizTransactionEvent.setPropagate(propagate);
System.out.println("[NEWTX"+bizTransactionEvent.getTransactionId()+"]"+bizTransactionEvent.toString());
publisher.publishEvent(bizTransactionEvent);
//在这里处理新建的事务操作,可以放入一个map中
TransactionTool.aliveTransactionMap.put(bizTransactionEvent.getTransactionId(), bizTransactionEvent);
}
}
}
2、增加事物事件类1BizTransactionEvent
,事务监听类BizTransactionEventListener
,通过事务commit时候,同步调用标有注解@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
的afterCompletion
方法把aliveTransactionMap
中transactionId对应的事务事件删掉。
事务事件监听类
@Component
public class BizTransactionEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<BizTransactionEvent> event) {
System.out.println("[NEWTX" + event.getPayload().getTransactionId() + "-REMOVE] " + event.toString()
+ "Duration:" + (System.currentTimeMillis() - event.getPayload().getCurrentTimeMillis()) + "ms");
TransactionTool.aliveTransactionMap.remove(event.getPayload().getTransactionId());
}
}
事务事件类
public class BizTransactionEvent {
private static final int STACK_TRACE_ELEMENT_DEEP = 4;
private String transactionId;
private String transactionName;
private StackTraceElement[] stackTraceElement;
private long currentTimeMillis;
private int propagate;
public String getTransactionName() {
return transactionName;
}
public void setTransactionName(String transactionName) {
this.transactionName = transactionName;
}
public int getPropagate() {
return propagate;
}
public void setPropagate(int propagate) {
this.propagate = propagate;
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public long getCurrentTimeMillis() {
return currentTimeMillis;
}
public void setCurrentTimeMillis(long currentTimeMillis) {
this.currentTimeMillis = currentTimeMillis;
}
public StackTraceElement[] getStackTraceElement() {
return stackTraceElement;
}
public void setStackTraceElement(StackTraceElement[] stackTraceElement) {
this.stackTraceElement = stackTraceElement;
}
@Override
public String toString() {
return "BizTransactionEvent [transactionId=" + transactionId + ", transactionName=" + transactionName
+ ", stackTraceElement=" + Arrays.toString(Arrays.copyOf(stackTraceElement, STACK_TRACE_ELEMENT_DEEP))
+ ", currentTimeMillis=" + currentTimeMillis + ", propagate=" + propagate + "]";
}
}
3、我们可以通过监控aliveTransactionMap
中的事务事件存活时间来寻找发现事务未关闭的业务代码。
代码略...
4、我们看下以下逻辑中问题出在哪:
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
@Transactional
public void bizMethod(){
//以下代码手动开启事务
TransactionStatus transactionStatus1 = null;
TransactionStatus transactionStatus2 = null;
try{
transactionStatus1 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..业务逻辑
//transactionManager.commit(transactionStatus1);
}catch(Exception){
transactionManager.rollback(transactionStatus1);
}finally{
//略掉一些分库分表的特殊处理
}
try{
transactionStatus2 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..业务逻辑
transactionManager.commit(transactionStatus2);
}catch(Exception){
transactionManager.rollback(transactionStatus2);
}finally{
//略掉一些分库分表的特殊处理
}
}
}
事务上下文状态切换如下:
TS=TransactionStatus TE=TransactionEvent T=Transaction
步骤 | 事务操作 | TransactionSynchronizationManager | 挂起\执行 |
---|---|---|---|
1 | @Transactional TS0=getTransaction(REQUIRESD) | T0(con0) | 挂起 NULL |
2 | TS1=getTransaction(REQUIRES_NEW) publish TE1 |
T1(con1)、TE1 | 挂起T0 |
3 | commit(TS1)被注掉了,不执行 | . | con1连接泄露 |
4 | TS2=getTransaction(REQUIRES_NEW) publish TE2 |
T2(con1)、TE2 | 挂起T1、TE1 |
5 | commit(TS2) | TE2执行,同步器清理T2 解挂步骤4的T1、TE1 |
执行T2.commit成功 con2归还连接池 |
6 | commit(TS0) | TE1执行,同步器清理T1 解挂步骤2的T0 |
执行 T0.commit成功 con0归还连接池 |
这种方式的最大问题在于,程序执行完成后,当前线程在事务同步器中仍存在解挂的事务资源(T0),并且事务commit(TS1)没有执行,TE1却被正常执行了,同时aliveTransactionMap中的TE1被移除了,失去了后续的监控基础。
所以对于手动事务来说,思路1比较失败
文末彩蛋:简述手动Spring事务处理逻辑
spring-tx、spring-jdbc中比较重要的四个关键处理类:
-
AbstractPlatformTransactionManager
:事务核心处理类,开启事务,挂起/恢复,释放资源等功能 -
DataSourceTransactionManager
:数据库操作都有这个类来完成,例如:setAutoCommit,commit,rollback -
TransactionSynchronizationManager
:这里的TransactionSynchronizationManager都是以线程为单位来记录相关的资源息。resources中记录了,key为datasource,value为ConnectionHolder的map结构信息。上文中publisher.publishEvent(bizTransactionEvent)会把事务事件到synchronizations中,后续事务在提交的时候会执行synchronizations中的事件。 -
DefaultTransactionStatus
:存放当前事务,挂起的事务资源,事务定义等内容。
自动事务cglib代理可参考TransactionAspectSupport
类
在事务处理的过程中参考如下步骤,偷个懒不画时序图了,大家按照序号,脑补一下
[package:spring-tx]AbstractPlatformTransactionManager
1、首先调用getTransaction()方法,获取连接,获取当前事务状态
4、调用handleExistingTransaction()处理已存在的事务
- 如果是
REQUIRES_NEW
就要挂起当前存在事务、创建新事务把挂起的事务资源放入新事务中,并且切换TransactionSynchronizationManager的本地线程变量为新事务相关内容,解绑当前事务资源。 - 如果是
NESTED
则需要创建保存点 - 如果是
REQUIRED
,创建新把newTransaction设定为false。
5、挂起资源SuspendedResourcesHolder结构与TransactionSynchronizationManager相同,用于解挂时恢复TransactionSynchronizationManager
中的本地线程变量。
7、调用prepareSynchronization方法,初始化当前线程的事务同步管理器,设置Threadlocal相关内容,并反回新的TransactionStatus对象。
以下为事务提交后的操作
8、调用commit方法提交事务。这里会调用processCommit方法,在这个方法中会调用事务事件监听逻辑
。通过ApplicationListenerMethodTransactionalAdapter处理各个不同阶段的transactionEvent,需要注意的是待处理的transactionEvent是从TransactionSynchronizationManager.getSynchronizations()
当前的本地线程变量中获取的。
9、cleanupAfterCompletion设置事务状态为完成,清理当前线程TransactionSynchronizationManager资源,解绑connection资源,设置autocommit=true。还原connection属性,回并且把连接归还给连接池。
10、调用resume()方法还原挂起的资源,继续执行。
[package:spring-jdbc]DataSourceTransactionManager
2、调用doGetTransaction() 获取事务对象DataSourceTransactionObject
3、检索绑定到当前线程(TransactionSynchronizationManager)的资源(ConnectionHolder),把ConnectionHolder放入DataSourceTransactionObject中
6、调用dobegin开启事务con.setAutoCommit(false);并且修改transactionActive为true。如果连接资源为空则获取新的连接,并且在TransactionSynchronizationManager进行资源绑定。
8.1、调用doCommit提交事务