多线程对spring数据库事务影响
2020-06-06 本文已影响0人
定金喜
1.spring事务的配置
以mysql为例:
package com.renlijia.config;
import com.alibaba.druid.filter.Filter;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.TransactionManagementConfigurer;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Resource;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author well
* @date 2019-05-07
*/
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
@EnableTransactionManagement
@MapperScan(basePackages = {"com.renlijia.mybatis.dao"})
public class DruidConfig implements TransactionManagementConfigurer {
private Logger logger = LoggerFactory.getLogger(DruidConfig.class);
private String url;
private String username;
private String password;
private String driverClassName = "com.mysql.jdbc.Driver";
private int initialSize = 5;
private int minIdle = 5;
private int maxActive = 200;
private int maxWait = 30000;
private int timeBetweenEvictionRunsMillis = 60000;
private int minEvictableIdleTimeMillis = 300000;
private String validationQuery = "SELECT 1 FROM DUAL";
private boolean testWhileIdle = true;
private boolean testOnBorrow = false;
private boolean testOnReturn = false;
private boolean poolPreparedStatements = true;
private int maxPoolPreparedStatementPerConnectionSize = 20;
private String filters = "stat,wall";
private String connectionProperties = "druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000";
@Resource
private WallFilter wallFilter;
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactoryBean createSqlSessionFactoryBean(DataSourceTransactionManager transactionManager) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(transactionManager.getDataSource());
sqlSessionFactoryBean.setConfigLocation(new ClassPathResource("sqlmap-config.xml"));
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
sqlSessionFactoryBean.setMapperLocations(resolver.getResources("classpath*:/mybatis/**/*.xml"));
sqlSessionFactoryBean.setVfs(SpringBootVFS.class);
sqlSessionFactoryBean.setTypeAliasesPackage("com.renlijia.mybatis.model");
return sqlSessionFactoryBean;
}
@Bean
@Primary
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean
@Primary
@Override
public DataSourceTransactionManager annotationDrivenTransactionManager() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setDriverClassName(driverClassName);
dataSource.setInitialSize(initialSize);
dataSource.setMinIdle(minIdle);
dataSource.setMaxActive(maxActive);
dataSource.setMaxWait(maxWait);
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setValidationQuery(validationQuery);
dataSource.setTestWhileIdle(testWhileIdle);
dataSource.setTestOnBorrow(testOnBorrow);
dataSource.setTestOnReturn(testOnReturn);
dataSource.setPoolPreparedStatements(poolPreparedStatements);
dataSource.setConnectionProperties(connectionProperties);
dataSource.setConnectionInitSqls(Arrays.asList("SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci;"));
dataSource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize);
//配置数据库批量更新,需要WallConfig设置multiStatementAllow=true
List<Filter> proxyFilters = new ArrayList<>();
proxyFilters.add(wallFilter);
dataSource.setProxyFilters(proxyFilters);
try {
dataSource.setFilters(filters);
} catch (SQLException e) {
logger.error("druid configuration initialization filter", e);
}
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "wallConfig")
WallConfig wallFilterConfig() {
WallConfig wc = new WallConfig();
wc.setMultiStatementAllow(true);
return wc;
}
@Bean(name = "wallFilter")
@DependsOn("wallConfig")
WallFilter wallFilter(WallConfig wallConfig) {
WallFilter wfilter = new WallFilter();
wfilter.setConfig(wallConfig);
return wfilter;
}
@Bean
public TransactionTemplate transactionTemplate(DataSourceTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}
public void setUrl(String url) {
this.url = url;
}
public void setUsername(String username) {
this.username = username;
}
public void setPassword(String password) {
this.password = password;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public void setInitialSize(int initialSize) {
this.initialSize = initialSize;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public void setMaxActive(int maxActive) {
this.maxActive = maxActive;
}
public void setMaxWait(int maxWait) {
this.maxWait = maxWait;
}
public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public void setTestWhileIdle(boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
}
public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
}
public void setPoolPreparedStatements(boolean poolPreparedStatements) {
this.poolPreparedStatements = poolPreparedStatements;
}
public void setMaxPoolPreparedStatementPerConnectionSize(int maxPoolPreparedStatementPerConnectionSize) {
this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
}
public void setFilters(String filters) {
this.filters = filters;
}
public void setConnectionProperties(String connectionProperties) {
this.connectionProperties = connectionProperties;
}
}
2.编写测试例子
package com.renlijia.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.renlijia.mybatis.dao.PersonDAO;
import com.renlijia.mybatis.model.PersonDO;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: ding
* @Date: 2020-06-06 14:11
*/
@RestController
public class Controller {
@Resource
private PersonDAO personDAO;
@Resource
private TransactionTemplate transactionTemplate;
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;
static {
// 初始化线程池
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("track-log-%d").build();
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200), threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@RequestMapping("/concurrent/multithread")
public String multiThreadTest(){
transactionTemplate.execute((status)->{
PersonDO personDO = new PersonDO();
personDO.setName("黄晓明");
personDO.setAge(35);
personDAO.insert(personDO);
THREAD_POOL_EXECUTOR.execute(()->{
PersonDO personDO2 = new PersonDO();
personDO2.setName("周迅");
personDO2.setAge(48);
personDAO.insert(personDO2);
});
PersonDO personDO3 = new PersonDO();
personDO3.setName("刘德华");
personDO3.setAge(55);
personDAO.insert(personDO3);
throw new RuntimeException();
});
return null;
}
}
此例子中直接抛出RuntimeException会导致该线程所对应的事务回滚,所以黄晓明和刘德华这两条数据不会插入到数据库,但是因为建立一个线程池THREAD_POOL_EXECUTOR执行了一个新的线程插入了一条周迅的数据,该数据成功入库没有回滚,所以得出结论,事务只对当前线程有效,我们从源码进行分析。
3.源码分析
获取数据连接
org.mybatis.spring.transaction.SpringManagedTransaction#openConnection
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
}
}
跟踪代码到org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = (Map)resources.get();
if (map == null) {
return null;
} else {
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
}
resources定义为:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
resources为ThreadLocal类型,ThreadLocal类型只针对当前线程有效,所以当在事务中新建一个线程后,获取的数据库连接不同,而事务不能跨连接,所以上述场景只有主线程的事务会回滚。如果需要新的线程回滚,则在新线程里也加事务,代码改为:
package com.renlijia.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.renlijia.mybatis.dao.PersonDAO;
import com.renlijia.mybatis.model.PersonDO;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: ding
* @Date: 2020-06-06 14:11
*/
@RestController
public class Controller {
@Resource
private PersonDAO personDAO;
@Resource
private TransactionTemplate transactionTemplate;
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;
static {
// 初始化线程池
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("track-log-%d").build();
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200), threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@RequestMapping("/concurrent/multithread")
public String multiThreadTest(){
transactionTemplate.execute((status)->{
PersonDO personDO = new PersonDO();
personDO.setName("黄晓明");
personDO.setAge(35);
personDAO.insert(personDO);
THREAD_POOL_EXECUTOR.execute(()->{
transactionTemplate.execute((transactionStatus)->{
PersonDO personDO2 = new PersonDO();
personDO2.setName("周迅");
personDO2.setAge(48);
personDAO.insert(personDO2);
return true;
});
});
PersonDO personDO3 = new PersonDO();
personDO3.setName("刘德华");
personDO3.setAge(55);
personDAO.insert(personDO3);
throw new RuntimeException();
});
return null;
}
}