Spring 事务原理
Spring事务拦截器为
org.springframework.transaction.interceptor.TransactionInterceptor
当Spring对函数进行事务拦截时,会调用到TransactionInterceptor的invoke函数。
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 在事务中调用函数,使函数支持事务
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
invokeWithTransaction函数
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// 数据准备
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 1.新建一个事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
//2.执行业务代码
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {//
cleanupTransactionInfo(txInfo);
}
//3.提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
//......其他代码
}
整个事务处理过程中主要涉及到四个类
- TransactionAttribute
- PlatformTransactionManager
- TransactionStatus
- TransactionInfo
TransactionAttribute
public interface TransactionAttribute extends TransactionDefinition {
String getQualifier();
boolean rollbackOn(Throwable ex);
}
父类TransactionDefinition的定义
public interface TransactionDefinition {
int getIsolationLevel();
String getName();
int getPropagationBehavior();
boolean isReadOnly();
int getTimeout();
}
TransactionAttribute用于保存@Transactional注解上的属性内容。
PlatformTransactionManager
package org.springframework.transaction;
public interface PlatformTransactionManager {
//启动一个新事务
TransactionStatus getTransaction(TransactionDefinition definition)
//提交事务
void commit(TransactionStatus status)
//回滚事务
void rollback(TransactionStatus status)
}
PlatformTransactionManager是事务的具体操作类,类似于JDBC的事务操作,使用PlatformTransactionManager进行事务处理的伪代码如下:
PlatformTransactionManager pm;
TransactionDefinition definition
//启动事务
TransactionStatus status=pm.getTransaction(definition);
try{
//处理业务逻辑
.....
//提交事务
pm.commit(status);
}catch(Throwable e){
//回滚事务
pm.rollback(status);
}
}
**TransactionStatus **
public interface TransactionStatus{
boolean isNewTransaction();
boolean hasSavepoint();
void setRollbackOnly();
boolean isRollbackOnly();
void flush();
boolean isCompleted();
}
TransactionStatus表征一次事务操作。
TransactionInfo
protected final class TransactionInfo {
private final PlatformTransactionManager transactionManager;
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
private TransactionStatus transactionStatus;
private TransactionInfo oldTransactionInfo;
}
TransactionInfo将上面的三个类的内容融合在一起,即当前事务的相关信息,
oldTransactionInfo为父事务的相关信息。
Spring的事务的具体操作是PlatformTransactionManager,下面来具体说明PlatformTransactionManager得三个函数。
1. getTransaction
TransactionStatus org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(TransactionDefinition definition)
getTransaction函数的业务逻辑可简化为
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//1.获取当前的事务对象,如数据库的connection对象
Object transaction = doGetTransaction();
//2.如果当前的事务对象可用,根据当前的传递方式,构建TransactionStatus
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}//3.如果不存在,根据当前的传递方式,构建TransactionStatus
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
return status;
}
}
doGetTransaction函数
以PlatformTransactionManager的实现之一DataSourceTransactionManager为例,
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
TransactionSynchronizationManager.getResource是从线程上下文获取数据库连接信息,如果存在,直接返回,否则返回null.
isExistingTransaction
判断当前事务对象是否可用,在DataSourceTransactionManager中,只需要简单判断ConnectionHolder是否可用即可。
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
}
handleExistingTransaction
是处理事务传递的关键类,在handleExistingTransaction中会根据当前不同的事务传递方式创建不同的TransactionStatus
1.PROPAGATION_NEVER
当前方法不应在Transaction中运行,如果存在已经定义的Transaction则抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
所以代码中直接抛出异常
2.PROPAGATION_NOT_SUPPORTED
当前方法不应在Transaction中运行,如果存在已经定义的Transaction,则该Transaction暂停(挂起)直至该方法运行完毕。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
前面介绍过transaction是从上下文中获取的,suspend函数挂起transaction的主要作用是将transaction的ConnectionHolder和事务的相关信息从线程上下文移除,这样在业务代码中获取数据库连接时,就不能从线程上下文中获取到父事务的连接。
DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, Object suspendedResources)
prepareTransactionStatus的作用有两个
- 组装DefaultTransactionStatus
- 如果newTransaction==true,更新当前线程上下文,保存当前事务的信息,如事务隔离级别,传递方式等。newTransaction还有另外一个作用:表示当前事务需不需要进行提交,回滚操作。
当传递方式为PROPAGATION_NOT_SUPPORTED时,prepareTransactionStatus函数newTransaction参数的值为false,不会更新线程上下文信息。业务代码中获取数据库连接时会从数据源中获取一个全新的connection使用。
3.Propagation.REQUIRES_NEW
当前方法必须在新开的Transaction中运行。如果存在已经定义的Transaction,则该已定义的Transaction暂停直至新开的Transaction执行完毕。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
操作分为3步:
- 挂起当前事务
- 从数据源中获取一个新的连接,将connection的AutoCommit设置false.并将该connection绑定到当前线程(doBegin函数),以便业务代码能获取到该connection。
- 更新线程上下文关于当前事务的相关信息。
........
其他传递方式略
2.commit
commit 就比较简单,主要做两件事
- 如果当前事务是newTransaction,提交事务
- 恢复当前connection的属性,释放connection
- 恢复被挂起的事务
提交connection
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
//数据库connection提交
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
恢复connection属性,释放connection
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
恢复被挂起的事务
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
3.rollback
与commit雷同,不做解释。
其他
Spring的Transactional注解是如何在mybatis中生效的?
Spring通过控制mybatis获取的connection来起作用。
mybatis的数据库连接是从Mybatis的事务中获取的
package org.apache.ibatis.executor;
public abstract class BaseExecutor{
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
}
mybatis的事务对象为:
org.apache.ibatis.transaction.Transaction
Transaction是有mybatis的事务工厂产生的
org.apache.ibatis.transaction.TransactionFactory
而mybatis默认的事务工厂为
org.mybatis.spring.transaction.SpringManagedTransactionFactory
见下面mybatis代码
package org.mybatis.spring;
public class SqlSessionFactoryBean{
protected SqlSessionFactory buildSqlSessionFactory(){
//部分代码略..........
if (this.transactionFactory == null) {
this.transactionFactory = new SpringManagedTransactionFactory();
//部分代码略.............
}
}
}
SpringManagedTransactionFactory生产的事务的类为
org.mybatis.spring.transaction.SpringManagedTransaction
SpringManagedTransaction的getConnection会调用到openConnection函数。
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (logger.isDebugEnabled()) {
logger.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
DataSourceUtils.getConnection最终会调用到DataSourceUtils.doGetConnection函数
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
上面的代码首先会从线程上下文中获取数据库连接,如果不能获取到,就直接从数据源中获取。