Spring MyBatis 事务分析 -- MyBatis 整
上一篇我们分析了 Spring 的 Transaction 处理过程,这里我们一起看下,MyBatis 是如何跟 Spring 整合,提供完整的事务处理方案。
想要把 MyBatis 跟 Spring 整合,都需要这样一个 Jar 包:mybatis-spring-x.x.x.jar,这个 Jar 包可以说是 MyBatis 与 Spring 的通信桥梁,将两个不相关的框架可以整合到一起,提供完整的 ORM 功能。
在 Spring 配置文件中需要配置如下两个 Bean:
<!-- mybatis配置 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dynamicDataSource" />
<property name="configLocation" value="classpath:mybatis.xml"></property>
<!-- mybatis配置文件 -->
<property name="mapperLocations" value="classpath:com/blackbread/dao/mapper/*.xml" />
</bean>
<!--mapper scanning -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.blackbread.dao" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>
SqlSessionFactoryBean
首先让我们来看 SqlSessionFactoryBean 类,在这个类初始化的时候,需要注入 DataSource,而且是需要跟初始化 TransactionManager 时候注入的 DataSource 为同一个。
SqlSessionFactoryBean 类实现了 InitializingBean 接口,所以初始化后会执行 afterPropertiesSet() 方法,在 afterPropertiesSet() 方法中会执行 buildSqlSessionFactory() 方法生成一个 SqlSessionFactory 对象,让我们看下 buildSqlSessionFactory() 方法。
SqlSessionFactoryBean#buildSqlSessionFactory()
protected SqlSessionFactory buildSqlSessionFactory() throws IOException {
Configuration configuration;
XMLConfigBuilder xmlConfigBuilder = null;
// 初始化一个configuration
if (this.configLocation != null) {
xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(), null, this.configurationProperties);
configuration = xmlConfigBuilder.getConfiguration();
} else {
configuration = new Configuration();
configuration.setVariables(this.configurationProperties);
}
if (this.objectFactory != null) {
configuration.setObjectFactory(this.objectFactory);
}
if (this.objectWrapperFactory != null) {
configuration.setObjectWrapperFactory(this.objectWrapperFactory);
}
if (hasLength(this.typeAliasesPackage)) {
String[] typeAliasPackageArray = tokenizeToStringArray(this.typeAliasesPackage,
ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
for (String packageToScan : typeAliasPackageArray) {
configuration.getTypeAliasRegistry().registerAliases(packageToScan,
typeAliasesSuperType == null ? Object.class : typeAliasesSuperType);
}
}
// 设置别名
if (!isEmpty(this.typeAliases)) {
for (Class<?> typeAlias : this.typeAliases) {
configuration.getTypeAliasRegistry().registerAlias(typeAlias);
}
}
// 装入插件,mybatis的插件都是以拦截器的形式
// 比如比如分页插件,这里是载入 spring 中注入的
if (!isEmpty(this.plugins)) {
for (Interceptor plugin : this.plugins) {
configuration.addInterceptor(plugin);
}
}
if (hasLength(this.typeHandlersPackage)) {
String[] typeHandlersPackageArray = tokenizeToStringArray(this.typeHandlersPackage,
ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
for (String packageToScan : typeHandlersPackageArray) {
configuration.getTypeHandlerRegistry().register(packageToScan);
}
}
if (!isEmpty(this.typeHandlers)) {
for (TypeHandler<?> typeHandler : this.typeHandlers) {
configuration.getTypeHandlerRegistry().register(typeHandler);
}
}
// 这里将解析mybatis.xml文件,载入所有配置,插件、setting等
if (xmlConfigBuilder != null) {
try {
xmlConfigBuilder.parse();
} catch (Exception ex) {
throw new NestedIOException("Failed to parse config resource: " + this.configLocation, ex);
} finally {
ErrorContext.instance().reset();
}
}
// 这个很重要,这里定义了用的transactionFactory为SpringManagedTransactionFactory
// 这个在获取 connection 等地方都有用到,是mybatis跟spring的主要链接类
if (this.transactionFactory == null) {
this.transactionFactory = new SpringManagedTransactionFactory();
}
// 新建一个Environment对象,并将新建的transactionFactory放入其中
Environment environment = new Environment(this.environment, this.transactionFactory, this.dataSource);
configuration.setEnvironment(environment);
if (this.databaseIdProvider != null) {
try {
configuration.setDatabaseId(this.databaseIdProvider.getDatabaseId(this.dataSource));
} catch (SQLException e) {
throw new NestedIOException("Failed getting a databaseId", e);
}
}
if (!isEmpty(this.mapperLocations)) {
for (Resource mapperLocation : this.mapperLocations) {
if (mapperLocation == null) {
continue;
}
try {
// 这里主要是解析配置的sql mapper配置文件
XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(mapperLocation.getInputStream(),
configuration, mapperLocation.toString(), configuration.getSqlFragments());
xmlMapperBuilder.parse();
} catch (Exception e) {
throw new NestedIOException("Failed to parse mapping resource: '" + mapperLocation + "'", e);
} finally {
ErrorContext.instance().reset();
}
}
} else {
}
}
return this.sqlSessionFactoryBuilder.build(configuration);
}
比较长的一段代码,其实这里做的工作就是解析配置文件生成 Configuration 对象而已。在 XmlMapperBuilder#parse() 方法中,这里将解析 Sql Mapper 文件中的映射关系生成 MappedStatement 对象,并执行 Configuration#addMappedStatement() 方法,将其放入到 Configuration 对象中,有兴趣的同学可以仔细看下。
这里最需要注意的一块就是 this.transactionFactory = new SpringManagedTransactionFactory()。SpringManagedTransactionFactory 就是 MyBatis 跟 Spring 的链接。
MapperScannerConfigurer
接着我们看一下 MapperScannerConfigurer 对象的初始化过程,这个对象实现了BeanDefinitionRegistryPostProcessor 接口,所以看 postProcessBeanDefinitionRegistry() 方法,在这个方法中初始化一个对象 ClassPathMapperScanner,并执行 scan() --> doScan() 方法。
ClassPathMapperScanner#doScan()
public Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);
for (BeanDefinitionHolder holder : beanDefinitions) {
GenericBeanDefinition definition = (GenericBeanDefinition) holder.getBeanDefinition();
definition.getPropertyValues().add("mapperInterface", definition.getBeanClassName());
// 实际就是将扫描到的接口包装成MapperFactoryBean的实现类
definition.setBeanClass(MapperFactoryBean.class);
definition.getPropertyValues().add("addToConfig", this.addToConfig);
boolean explicitFactoryUsed = false;
// 注入sqlSessionFactory对象,这个也很重要
if (StringUtils.hasText(this.sqlSessionFactoryBeanName)) {
definition.getPropertyValues().add("sqlSessionFactory", new RuntimeBeanReference(this.sqlSessionFactoryBeanName));
explicitFactoryUsed = true;
} else if (this.sqlSessionFactory != null) {
definition.getPropertyValues().add("sqlSessionFactory", this.sqlSessionFactory);
explicitFactoryUsed = true;
}
if (StringUtils.hasText(this.sqlSessionTemplateBeanName)) {
if (explicitFactoryUsed) {
logger.warn("Cannot use both: sqlSessionTemplate and sqlSessionFactory together. sqlSessionFactory is ignored.");
}
definition.getPropertyValues().add("sqlSessionTemplate", new RuntimeBeanReference(this.sqlSessionTemplateBeanName));
explicitFactoryUsed = true;
} else if (this.sqlSessionTemplate != null) {
if (explicitFactoryUsed) {
logger.warn("Cannot use both: sqlSessionTemplate and sqlSessionFactory together. sqlSessionFactory is ignored.");
}
definition.getPropertyValues().add("sqlSessionTemplate", this.sqlSessionTemplate);
explicitFactoryUsed = true;
}
if (!explicitFactoryUsed) {
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
}
}
return beanDefinitions;
}
这段代码其实主要就是根据 basePackage 属性的配置,扫描相应的接口类,并且注册到 Spring 中,并且定义此对象的 FactoryBean 为:MapperFactoryBean ,而 MapperFactoryBean 将返回如下对象。
public T getObject() throws Exception {
return getSqlSession().getMapper(this.mapperInterface);
}
public <T> T getMapper(Class<T> type) {
return getConfiguration().getMapper(type, this);
}
最终其实就是生成 Handler 为 MapperProxy,接口为 mapperInterface 属性指定(业务的 Mapper 接口)的代理类。同时添加属性:sqlSessionFactory,这个操作很重要,在后面有核心应用。
MapperFactoryBean
这里让我们看下 MapperFactoryBean 类,这个类继承自 SqlSessionDaoSupport 在 SqlSessionDaoSupport 中有如下方法:
public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) {
if (!this.externalSqlSession) {
this.sqlSession = new SqlSessionTemplate(sqlSessionFactory);
}
}
也就是上面调用的添加 sqlSessionFactory 属性的 set 操作,在这个方法中初始化了 SqlSession 对象,用的是 SqlSessionTemplate 实现类。接下来让我们看下 SqlSessionTemplate 的初始化过程:
SqlSessionTemplate
public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required");
notNull(executorType, "Property 'executorType' is required");
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
this.sqlSessionProxy = (SqlSession) newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class },
new SqlSessionInterceptor());
}
SqlSessionTemplate 其实实现了 SqlSession 接口的,在初始化的时候将生成一个接口为 SqlSession,名为 sqlSessionProxy 代理对象,可以看到 SqlSessionTemplate 里面的所有与数据库相关的操作都是通过sqlSessionProxy 这个代理对象实现的。
接着看下 sqlSessionProxy 代理对象的实际 handler:
SqlSessionInterceptor
private class SqlSessionInterceptor implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 获取 SqlSession 对象
SqlSession sqlSession = getSqlSession(
SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType,
SqlSessionTemplate.this.exceptionTranslator);
try {
// 实际 SQL 的执行过程
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
invoke() 方法中首先需要获取一个 SqlSession 对象:
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
if (holder != null && holder.isSynchronizedWithTransaction()) {
if (holder.getExecutorType() != executorType) {
throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction");
}
holder.requested();
return holder.getSqlSession();
}
SqlSession session = sessionFactory.openSession(executorType);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
}
return session;
}
这里将首先去判断下 SqlSessionHolder 是否已经存在,如果不存在将会初始化一个新的,我们这里只分析第一次调用过程,也就是将会执行到 SessionFactory#openSession() 方法,这个方法里面接着会调用 openSessionFromDataSource() 方法。
SessionFactory#openSession()
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType, autoCommit);
return new DefaultSqlSession(configuration, executor);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
这里与 Spring 事务管理关联起来的核心代码,让我们仔细分析下,首先这里将通过 getTransactionFactoryFromEnvironment() 方法获取 TransactionFactory。这个操作将得到我们之前初始化时候注入的 SpringManagedTransactionFactory 对象。然后将执行 TransactionFactory#newTransaction() 方法,初始化 MyBatis 的 Transaction。
再下面将通过 Configuration.newExecutor() 生成一个 Executor。由于在之前指定了 execType 为Simple,所以在这里将生成一个 SimpleExecutor: executor = new SimpleExecutor(this, transaction),并将刚初始化的 Transaction 加入属性。
实际语句执行
到这里SqlSession的初始化也就完成了,接下来就是看下实际方法的执行了,也就是 Object result = method.invoke(sqlSession, args)。
以一个 update() 方法执行来举例:
public int update(String statement, Object parameter) {
try {
dirty = true;
MappedStatement ms = configuration.getMappedStatement(statement);
return executor.update(ms, wrapCollection(parameter));
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error updating database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
首先将从 Configuration 中根据操作的 statement 获取映射内容 MappedStatement。getMappedStatement() 方法中其实就是从 Map 对象中根据 key 取出之前缓存的数据。
Executor#update()
接着将执行 Executor#update() 方法,也就是实际的数据库操作了,记得之前初始化的 Executor 么,这里就是那个 SimpleExecutor。
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) throw new ExecutorException("Executor was closed.");
clearLocalCache();
return doUpdate(ms, parameter);
}
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
主要是看 prepareStatement() 方法,看到 Connection 的获取了吧?
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection);
handler.parameterize(stmt);
return stmt;
}
然后接着看 getConnection() 方法:
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog);
} else {
return connection;
}
}
到这里终于看到了真正的 Connection 获取过程:Transaction#getConnection(),也就是通过之前注入的Transaction 来获取 Connection,而这个 Transaction 也就是 SpringManagedTransaction,他其实调用了 openConnection() 方法。
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
openConnection() 方法中主要通过调用 DataSourceUtils#getConnection() 方法来获取一个Connection。继续看 DataSourceUtils#getConnection() 方法,实际调用的又是 doGetConnection() 方法。
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
//从TransactionSynchronizationManager中获取ConnectionHolder
// 这个对象也就是之前我们上一次分析 Spring Transaction 的时候
// 持有ConnectionHolder的对象了
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
// 由于在前面的切面中已经开启事务
// 并且初始化了ConnectionHolder所以这里将直接返回ConnectionHolder中的connection
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
是不是感觉这段代码很眼熟?对,因为这里有我们上一篇里面非常熟悉的TransactionSynchronizationManager,在 Spring Transaction 中也是通过这个类中的 resources 属性(ThreadLocal对象)对 ConnectionHolder 进行持有的。
在这里将获取到 Spring 开启事务时候持有的 ConnectionHolder 对象,自然获取到的 Connection 对象也是 Srping 开启事务时候创建的对象,这样就保证了Spring Transaction 中控制的 Connection 跟在 MyBatis 中执行 SQL 语句用的 Connection 为同一个 Connection,也就可以通过之前 Spring 事务管理机制进行事务管理了。
后续的对数据的操作有兴趣的可以自己读一下,感觉 MyBatis 的源码没有 Spring 的那么清晰,还是需要仔细分析下才能看的明白。
其他
注意事项
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
//从TransactionSynchronizationManager中获取ConnectionHolder
// 这个对象也就是之前我们上一次分析 Spring Transaction 的时候
// 持有ConnectionHolder的对象了
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
// 由于在前面的切面中已经开启事务
// 并且初始化了ConnectionHolder所以这里将直接返回ConnectionHolder中的connection
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
我们再看下 DataSourceUtils#doGetConnection() 方法,如果 conHolder 为空,也就是没有开启事务情况下,将执行 Connection con = dataSource.getConnection(),也就是说没有开启事务情况下,每次的数据库操作都将从连接池拿一个新的 Connection,而不是第一次获取后就与线程绑定。
这样做可能是从系统并行度方面考虑的,因为连接是比较稀缺的资源,在不开启事务情况下,应遵循随用随拿、用完即还的原则。如果连接跟线程绑定,当线程执行完数据库操作,又执行了其他耗时操作,那么与其绑定的连接将无法得到复用,大大降低了系统的并行度(受连接池中连接数量限制)。