Spring 事务原理

2017-01-12  本文已影响0人  LordZhou

Spring事务拦截器为

org.springframework.transaction.interceptor.TransactionInterceptor

当Spring对函数进行事务拦截时,会调用到TransactionInterceptor的invoke函数。

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    // 在事务中调用函数,使函数支持事务
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
        @Override
        public Object proceedWithInvocation() throws Throwable {
            return invocation.proceed();
        }
    });
}

invokeWithTransaction函数

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
        throws Throwable {

    // 数据准备
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 1.新建一个事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            //2.执行业务代码
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // 异常处理
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {//
            cleanupTransactionInfo(txInfo);
        }
        //3.提交事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
//......其他代码
}

整个事务处理过程中主要涉及到四个类

  1. TransactionAttribute
  2. PlatformTransactionManager
  3. TransactionStatus
  4. TransactionInfo

TransactionAttribute

public interface TransactionAttribute extends TransactionDefinition {
String getQualifier();
boolean rollbackOn(Throwable ex);
}

父类TransactionDefinition的定义

public interface TransactionDefinition {
  int getIsolationLevel();
  String getName();
  int getPropagationBehavior();
  boolean isReadOnly();
  int getTimeout();
}

TransactionAttribute用于保存@Transactional注解上的属性内容。

PlatformTransactionManager

package org.springframework.transaction;
public interface PlatformTransactionManager {
  //启动一个新事务
  TransactionStatus getTransaction(TransactionDefinition definition)
  //提交事务
  void commit(TransactionStatus status)
  //回滚事务
  void rollback(TransactionStatus status)
  }

PlatformTransactionManager是事务的具体操作类,类似于JDBC的事务操作,使用PlatformTransactionManager进行事务处理的伪代码如下:

PlatformTransactionManager pm;
TransactionDefinition definition
//启动事务
TransactionStatus  status=pm.getTransaction(definition);
  try{
  //处理业务逻辑
  .....
  //提交事务
  pm.commit(status);
  }catch(Throwable e){
    //回滚事务
    pm.rollback(status);
  }
}

**TransactionStatus **

public interface TransactionStatus{
  boolean isNewTransaction();
  boolean hasSavepoint();
  void setRollbackOnly();
  boolean isRollbackOnly();
  void flush();
  boolean isCompleted();
}

TransactionStatus表征一次事务操作。

TransactionInfo

protected final class TransactionInfo {

    private final PlatformTransactionManager transactionManager;

    private final TransactionAttribute transactionAttribute;

    private final String joinpointIdentification;

    private TransactionStatus transactionStatus;

    private TransactionInfo oldTransactionInfo;
}

TransactionInfo将上面的三个类的内容融合在一起,即当前事务的相关信息,
oldTransactionInfo为父事务的相关信息。

Spring的事务的具体操作是PlatformTransactionManager,下面来具体说明PlatformTransactionManager得三个函数。
1. getTransaction

    TransactionStatus org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(TransactionDefinition definition)

getTransaction函数的业务逻辑可简化为

    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        //1.获取当前的事务对象,如数据库的connection对象
        Object transaction = doGetTransaction();
        //2.如果当前的事务对象可用,根据当前的传递方式,构建TransactionStatus
        if (isExistingTransaction(transaction)) {
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }//3.如果不存在,根据当前的传递方式,构建TransactionStatus
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            return status;
            
        }
    }

doGetTransaction函数
以PlatformTransactionManager的实现之一DataSourceTransactionManager为例,

    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

TransactionSynchronizationManager.getResource是从线程上下文获取数据库连接信息,如果存在,直接返回,否则返回null.

isExistingTransaction
判断当前事务对象是否可用,在DataSourceTransactionManager中,只需要简单判断ConnectionHolder是否可用即可。

    protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
    }

handleExistingTransaction
是处理事务传递的关键类,在handleExistingTransaction中会根据当前不同的事务传递方式创建不同的TransactionStatus

1.PROPAGATION_NEVER
当前方法不应在Transaction中运行,如果存在已经定义的Transaction则抛出异常。

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

所以代码中直接抛出异常

2.PROPAGATION_NOT_SUPPORTED
当前方法不应在Transaction中运行,如果存在已经定义的Transaction,则该Transaction暂停(挂起)直至该方法运行完毕。

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

前面介绍过transaction是从上下文中获取的,suspend函数挂起transaction的主要作用是将transaction的ConnectionHolder和事务的相关信息从线程上下文移除,这样在业务代码中获取数据库连接时,就不能从线程上下文中获取到父事务的连接。

DefaultTransactionStatus prepareTransactionStatus(
        TransactionDefinition definition, Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, Object suspendedResources)

prepareTransactionStatus的作用有两个

  1. 组装DefaultTransactionStatus
  2. 如果newTransaction==true,更新当前线程上下文,保存当前事务的信息,如事务隔离级别,传递方式等。newTransaction还有另外一个作用:表示当前事务需不需要进行提交,回滚操作。

当传递方式为PROPAGATION_NOT_SUPPORTED时,prepareTransactionStatus函数newTransaction参数的值为false,不会更新线程上下文信息。业务代码中获取数据库连接时会从数据源中获取一个全新的connection使用。

3.Propagation.REQUIRES_NEW
当前方法必须在新开的Transaction中运行。如果存在已经定义的Transaction,则该已定义的Transaction暂停直至新开的Transaction执行完毕。

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
        catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
        }
    }

操作分为3步:

  1. 挂起当前事务
  2. 从数据源中获取一个新的连接,将connection的AutoCommit设置false.并将该connection绑定到当前线程(doBegin函数),以便业务代码能获取到该connection。
  3. 更新线程上下文关于当前事务的相关信息。

........
其他传递方式略

2.commit
commit 就比较简单,主要做两件事

  1. 如果当前事务是newTransaction,提交事务
  2. 恢复当前connection的属性,释放connection
  3. 恢复被挂起的事务

提交connection

    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            //数据库connection提交
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

恢复connection属性,释放connection

    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

        // Remove the connection holder from the thread, if exposed.
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.unbindResource(this.dataSource);
        }

        // Reset connection.
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            if (txObject.isMustRestoreAutoCommit()) {
                con.setAutoCommit(true);
            }
            DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
        }
        catch (Throwable ex) {
            logger.debug("Could not reset JDBC Connection after transaction", ex);
        }

        if (txObject.isNewConnectionHolder()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
            }
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }

        txObject.getConnectionHolder().clear();
    }

恢复被挂起的事务

    protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
            throws TransactionException {

        if (resourcesHolder != null) {
            Object suspendedResources = resourcesHolder.suspendedResources;
            if (suspendedResources != null) {
                doResume(transaction, suspendedResources);
            }
            List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
            if (suspendedSynchronizations != null) {
                TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
                doResumeSynchronization(suspendedSynchronizations);
            }
        }
    }

3.rollback
与commit雷同,不做解释。

其他

Spring的Transactional注解是如何在mybatis中生效的?
Spring通过控制mybatis获取的connection来起作用。

mybatis的数据库连接是从Mybatis的事务中获取的

    package org.apache.ibatis.executor;
    public abstract class BaseExecutor{
        protected Connection getConnection(Log statementLog) throws SQLException {
            Connection connection = transaction.getConnection();
            if (statementLog.isDebugEnabled()) {
              return ConnectionLogger.newInstance(connection, statementLog, queryStack);
            } else {
              return connection;
            }
        }
    }

mybatis的事务对象为:

org.apache.ibatis.transaction.Transaction

Transaction是有mybatis的事务工厂产生的

org.apache.ibatis.transaction.TransactionFactory

而mybatis默认的事务工厂为

org.mybatis.spring.transaction.SpringManagedTransactionFactory

见下面mybatis代码

    package org.mybatis.spring;
    public class SqlSessionFactoryBean{
        protected SqlSessionFactory buildSqlSessionFactory(){
            //部分代码略..........
            if (this.transactionFactory == null) {
                this.transactionFactory = new SpringManagedTransactionFactory();
            //部分代码略.............
        }
        }
    }

SpringManagedTransactionFactory生产的事务的类为

org.mybatis.spring.transaction.SpringManagedTransaction

SpringManagedTransaction的getConnection会调用到openConnection函数。

    private void openConnection() throws SQLException {
        this.connection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.connection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);

        if (logger.isDebugEnabled()) {
          logger.debug(
              "JDBC Connection ["
                  + this.connection
                  + "] will"
                  + (this.isConnectionTransactional ? " " : " not ")
                  + "be managed by Spring");
        }
    }

DataSourceUtils.getConnection最终会调用到DataSourceUtils.doGetConnection函数

    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
        Assert.notNull(dataSource, "No DataSource specified");

        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
            conHolder.requested();
            if (!conHolder.hasConnection()) {
                logger.debug("Fetching resumed JDBC Connection from DataSource");
                conHolder.setConnection(dataSource.getConnection());
            }
            return conHolder.getConnection();
        }
        // Else we either got no holder or an empty thread-bound holder here.

        logger.debug("Fetching JDBC Connection from DataSource");
        Connection con = dataSource.getConnection();

        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            logger.debug("Registering transaction synchronization for JDBC Connection");
            // Use same Connection for further JDBC actions within the transaction.
            // Thread-bound object will get removed by synchronization at transaction completion.
            ConnectionHolder holderToUse = conHolder;
            if (holderToUse == null) {
                holderToUse = new ConnectionHolder(con);
            }
            else {
                holderToUse.setConnection(con);
            }
            holderToUse.requested();
            TransactionSynchronizationManager.registerSynchronization(
                    new ConnectionSynchronization(holderToUse, dataSource));
            holderToUse.setSynchronizedWithTransaction(true);
            if (holderToUse != conHolder) {
                TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
            }
        }

        return con;
    }

上面的代码首先会从线程上下文中获取数据库连接,如果不能获取到,就直接从数据源中获取。

上一篇下一篇

猜你喜欢

热点阅读