Mybatis 和 Spring 整合时事务是怎么执行的?

2020-01-09  本文已影响0人  AlanSun2

首先要明白一点:需要 Spring 管理 Mybatis 的事务,它两运行时必须在同一个 Connection 的同一事务下。明白这一点就比较容易了。接下来就是想方设法的让它两在同一个连接的同一事务下。如果你熟悉 Spring 的话,肯定能猜到它就是使用了动态代理。

Mybatis 中最主要的几个类:

Mybatis-Spring 自动配置类 :

重要类简单介绍

1. SqlSessionTemplate

SqlSessionTemplate 是 SqlSession 的实现类,它用来替代 Mybatis 原来的 DefaultSelSession。它在 MapperProxyFactory 中被封装进代理类。

SqlSessionTemplate 中一个重要属性:sqlSessionProxy。它也是一个 SqlSession,它在 SqlSessionTemplate 构造方法中创建。但是仔细看它的创建,它创建的一个 SqlSession 的代理类,使用的 JDK 动态代理。InvocationHandler 是 SqlSessionInterceptor,源码如下:

private class SqlSessionInterceptor implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      // 获取 SqlSession,首先从 TransactionSynchronizationManager.getResource 获取,如果没有则之间调用 SqlSessionFactory 创建一个新的 SqlSession (默认是 DefaultSqlSession)
      SqlSession sqlSession = getSqlSession(
          SqlSessionTemplate.this.sqlSessionFactory,
          SqlSessionTemplate.this.executorType,
          SqlSessionTemplate.this.exceptionTranslator);
      try {
        // 调用 DefaultSqlSession 的方法
        Object result = method.invoke(sqlSession, args);
        if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
          // force commit even on non-dirty sessions because some databases require
          // a commit/rollback before calling close()
          sqlSession.commit(true);
        }
        return result;
      } catch (Throwable t) {
        Throwable unwrapped = unwrapThrowable(t);
        if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
          // release the connection to avoid a deadlock if the translator is no loaded. See issue #22
          closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
          sqlSession = null;
          Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
          if (translated != null) {
            unwrapped = translated;
          }
        }
        throw unwrapped;
      } finally {
        if (sqlSession != null) {
          closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
        }
      }
    }
  }

之后 MapperMethod 中调用时所有的方法都会点用这个 invoke 方法。

2. SpringManagedTransactionFactory,SpringManagedTransaction

SpringManagedTransactionFactory 是 TransactionFactory 的实现类,在创建 Mybatis 的 Confituration 的 Environment 时创建,具体可以看 SqlSessionFactoryBean#buildSqlSessionFactory 方法,源码如下:

  protected SqlSessionFactory buildSqlSessionFactory() throws IOException {

    Configuration configuration;
    ...
    ...部分省略
    if (this.transactionFactory == null) {
      // 创建 SpringManagedTransactionFactory
      this.transactionFactory = new SpringManagedTransactionFactory();
    }
    // 把 SpringManagedTransactionFactory 和 datasource 放入 Environment
    configuration.setEnvironment(new Environment(this.environment, this.transactionFactory, this.dataSource));
    ...
    ...部分省略
    return this.sqlSessionFactoryBuilder.build(configuration);
  }

SpringManagedTransactionFactory 源码:

public class SpringManagedTransactionFactory implements TransactionFactory {

  /**
   * {@inheritDoc}
   */
  @Override
  public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
    return new SpringManagedTransaction(dataSource);
  }
  ...
  ...

很简单就是返回一个 SpringManagedTransaction。那么 SpringManagedTransactionFactory 在何时使用呢?在 SqlSessionFactory 创建 SqlSession 时使用,源码如下:

  private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
    Transaction tx = null;
    try {
      final Environment environment = configuration.getEnvironment();
      final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
      tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
      final Executor executor = configuration.newExecutor(tx, execType);
      return new DefaultSqlSession(configuration, executor, autoCommit);
    } catch (Exception e) {
      closeTransaction(tx); // may have fetched a connection so lets call close()
      throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

可以看到 Transaction 最后被放入了执行器 Executor。

这里说下,Mybatis 大概的执行流程,MapperProxy->MapperMethod->SqlSession -> Executor->Statement

接下来看下 SpringManagedTransaction 源码如下:

  // 获取连接
  @Override
  public Connection getConnection() throws SQLException {
    if (this.connection == null) {
      openConnection();
    }
    return this.connection;
  }
  
  private void openConnection() throws SQLException {
    // 通过 DataSourceUtils 获取连接
    this.connection = DataSourceUtils.getConnection(this.dataSource);
    this.autoCommit = this.connection.getAutoCommit();
    this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);

    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug(
          "JDBC Connection ["
              + this.connection
              + "] will"
              + (this.isConnectionTransactional ? " " : " not ")
              + "be managed by Spring");
    }
  }

DataSourceUtils.getConnection(this.dataSource);

    public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
        try {
            return doGetConnection(dataSource);
        }
        catch (SQLException ex) {
            throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
        }
        catch (IllegalStateException ex) {
            throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
        }
    }

    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
        Assert.notNull(dataSource, "No DataSource specified");
        // TransactionSynchronizationManager 通过 dateSource 获取 connection
        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
            conHolder.requested();
            if (!conHolder.hasConnection()) {
                logger.debug("Fetching resumed JDBC Connection from DataSource");
                conHolder.setConnection(fetchConnection(dataSource));
            }
            return conHolder.getConnection();
        }
        // Else we either got no holder or an empty thread-bound holder here.
       // ThreadLocal 中没有 connection 则从连接池中获取
        logger.debug("Fetching JDBC Connection from DataSource");
        Connection con = fetchConnection(dataSource);
       // 如果开启事务同步的话就放入 ThreadLocal
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            try {
                // Use same Connection for further JDBC actions within the transaction.
                // Thread-bound object will get removed by synchronization at transaction completion.
                ConnectionHolder holderToUse = conHolder;
                if (holderToUse == null) {
                    holderToUse = new ConnectionHolder(con);
                }
                else {
                    holderToUse.setConnection(con);
                }
                holderToUse.requested();
                TransactionSynchronizationManager.registerSynchronization(
                        new ConnectionSynchronization(holderToUse, dataSource));
                holderToUse.setSynchronizedWithTransaction(true);
                if (holderToUse != conHolder) {
                    TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
                }
            }
            catch (RuntimeException ex) {
                // Unexpected exception from external delegation call -> close Connection and rethrow.
                releaseConnection(con, dataSource);
                throw ex;
            }
        }

        return con;
    }

这里如果使用了事务的话,TransactionSynchronizationManager.getResource(dataSource) 返回的不会是一个null。接下来就分析下,Spring 是何时把 ConnectionHolder 放入到 TransactionSynchronizationManager 的。Spring 使用了一个 TransactionInterceptor,它是 MethodInterceptor 的实现。

中间的调用过程
ReflectiveMethodInvocation#proceed ->
TransactionInterceptor#invoke ->
TransactionAspectSupport#invokeWithinTransaction ->
TransactionAspectSupport#createTransactionIfNecessary ->
AbstractPlatformTransactionManager#getTransaction ->
DataSourceTransactionManager#doBegin

最终它会进入到 DataSourceTransactionManager#doBegin,源码如下:

@Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() ||
                    :txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }

            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            // 把 ConnectionHolder 通过 TransactionSynchronizationManager 绑定到 ThreadLocal
            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

最后

最后我们把这个逻辑捋下:

  1. 当我们调用代理方法是,如果有 TransactionInterceptor 拦截器在,调用 拦截器方法
  2. 使用 AbstractPlatformTransactionManager 的 doBegin 方法(根据不同的事务管理器实现),一般我们使用数据源事务管理器,把 Connection 绑定到 ThreadLocal
  3. Mybatis Executor 在执行时,使用 SpringManagedTransaction 获取第二步中的 Connection
上一篇下一篇

猜你喜欢

热点阅读