多线程对spring数据库事务影响

2020-06-06  本文已影响0人  定金喜

1.spring事务的配置

以mysql为例:

package com.renlijia.config;

import com.alibaba.druid.filter.Filter;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.TransactionManagementConfigurer;
import org.springframework.transaction.support.TransactionTemplate;

import javax.annotation.Resource;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
  * @author well
  * @date 2019-05-07
  */
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
@EnableTransactionManagement
@MapperScan(basePackages = {"com.renlijia.mybatis.dao"})
public class DruidConfig implements TransactionManagementConfigurer {

    private Logger logger = LoggerFactory.getLogger(DruidConfig.class);

    private String url;

    private String username;

    private String password;

    private String driverClassName = "com.mysql.jdbc.Driver";

    private int initialSize = 5;

    private int minIdle = 5;

    private int maxActive = 200;

    private int maxWait = 30000;

    private int timeBetweenEvictionRunsMillis = 60000;

    private int minEvictableIdleTimeMillis = 300000;

    private String validationQuery = "SELECT 1 FROM DUAL";

    private boolean testWhileIdle = true;

    private boolean testOnBorrow = false;

    private boolean testOnReturn = false;

    private boolean poolPreparedStatements = true;

    private int maxPoolPreparedStatementPerConnectionSize = 20;

    private String filters = "stat,wall";

    private String connectionProperties = "druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000";

    @Resource
    private WallFilter wallFilter;

    @Bean(name = "sqlSessionFactory")
    @Primary
    public SqlSessionFactoryBean createSqlSessionFactoryBean(DataSourceTransactionManager transactionManager) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(transactionManager.getDataSource());
        sqlSessionFactoryBean.setConfigLocation(new ClassPathResource("sqlmap-config.xml"));
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        sqlSessionFactoryBean.setMapperLocations(resolver.getResources("classpath*:/mybatis/**/*.xml"));
        sqlSessionFactoryBean.setVfs(SpringBootVFS.class);
        sqlSessionFactoryBean.setTypeAliasesPackage("com.renlijia.mybatis.model");
        return sqlSessionFactoryBean;
    }

    @Bean
    @Primary
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    @Bean
    @Primary
    @Override
    public DataSourceTransactionManager annotationDrivenTransactionManager() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setDriverClassName(driverClassName);
        dataSource.setInitialSize(initialSize);
        dataSource.setMinIdle(minIdle);
        dataSource.setMaxActive(maxActive);
        dataSource.setMaxWait(maxWait);
        dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        dataSource.setValidationQuery(validationQuery);
        dataSource.setTestWhileIdle(testWhileIdle);
        dataSource.setTestOnBorrow(testOnBorrow);
        dataSource.setTestOnReturn(testOnReturn);
        dataSource.setPoolPreparedStatements(poolPreparedStatements);
        dataSource.setConnectionProperties(connectionProperties);
        dataSource.setConnectionInitSqls(Arrays.asList("SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci;"));
        dataSource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize);

        //配置数据库批量更新,需要WallConfig设置multiStatementAllow=true
        List<Filter> proxyFilters = new ArrayList<>();
        proxyFilters.add(wallFilter);
        dataSource.setProxyFilters(proxyFilters);

        try {
            dataSource.setFilters(filters);
        } catch (SQLException e) {
            logger.error("druid configuration initialization filter", e);
        }
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "wallConfig")
    WallConfig wallFilterConfig() {
        WallConfig wc = new WallConfig();
        wc.setMultiStatementAllow(true);
        return wc;
    }

    @Bean(name = "wallFilter")
    @DependsOn("wallConfig")
    WallFilter wallFilter(WallConfig wallConfig) {
        WallFilter wfilter = new WallFilter();
        wfilter.setConfig(wallConfig);
        return wfilter;
    }

    @Bean
    public TransactionTemplate transactionTemplate(DataSourceTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }

    public void setInitialSize(int initialSize) {
        this.initialSize = initialSize;
    }

    public void setMinIdle(int minIdle) {
        this.minIdle = minIdle;
    }

    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;
    }

    public void setMaxWait(int maxWait) {
        this.maxWait = maxWait;
    }

    public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
        this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
    }

    public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
    }

    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }

    public void setTestWhileIdle(boolean testWhileIdle) {
        this.testWhileIdle = testWhileIdle;
    }

    public void setTestOnBorrow(boolean testOnBorrow) {
        this.testOnBorrow = testOnBorrow;
    }

    public void setTestOnReturn(boolean testOnReturn) {
        this.testOnReturn = testOnReturn;
    }

    public void setPoolPreparedStatements(boolean poolPreparedStatements) {
        this.poolPreparedStatements = poolPreparedStatements;
    }

    public void setMaxPoolPreparedStatementPerConnectionSize(int maxPoolPreparedStatementPerConnectionSize) {
        this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
    }

    public void setFilters(String filters) {
        this.filters = filters;
    }

    public void setConnectionProperties(String connectionProperties) {
        this.connectionProperties = connectionProperties;
    }
}

2.编写测试例子

package com.renlijia.concurrent;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.renlijia.mybatis.dao.PersonDAO;
import com.renlijia.mybatis.model.PersonDO;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author: ding
 * @Date: 2020-06-06 14:11
 */
@RestController
public class Controller {

    @Resource
    private PersonDAO personDAO;

    @Resource
    private TransactionTemplate transactionTemplate;

    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;


    static {
        // 初始化线程池
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("track-log-%d").build();
        THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }

    @RequestMapping("/concurrent/multithread")
    public String multiThreadTest(){

        transactionTemplate.execute((status)->{
            PersonDO personDO = new PersonDO();
            personDO.setName("黄晓明");
            personDO.setAge(35);
            personDAO.insert(personDO);

            THREAD_POOL_EXECUTOR.execute(()->{
                PersonDO personDO2 = new PersonDO();
                personDO2.setName("周迅");
                personDO2.setAge(48);
                personDAO.insert(personDO2);
            });

            PersonDO personDO3 = new PersonDO();
            personDO3.setName("刘德华");
            personDO3.setAge(55);
            personDAO.insert(personDO3);
            throw new RuntimeException();
        });

        return null;
    }
}

此例子中直接抛出RuntimeException会导致该线程所对应的事务回滚,所以黄晓明和刘德华这两条数据不会插入到数据库,但是因为建立一个线程池THREAD_POOL_EXECUTOR执行了一个新的线程插入了一条周迅的数据,该数据成功入库没有回滚,所以得出结论,事务只对当前线程有效,我们从源码进行分析。

3.源码分析

获取数据连接
org.mybatis.spring.transaction.SpringManagedTransaction#openConnection

private void openConnection() throws SQLException {
        this.connection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.connection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
        }
    }

跟踪代码到org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource

@Nullable
    private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = (Map)resources.get();
        if (map == null) {
            return null;
        } else {
            Object value = map.get(actualKey);
            if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
                map.remove(actualKey);
                if (map.isEmpty()) {
                    resources.remove();
                }

                value = null;
            }

            return value;
        }
    }

resources定义为:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
resources为ThreadLocal类型,ThreadLocal类型只针对当前线程有效,所以当在事务中新建一个线程后,获取的数据库连接不同,而事务不能跨连接,所以上述场景只有主线程的事务会回滚。如果需要新的线程回滚,则在新线程里也加事务,代码改为:

package com.renlijia.concurrent;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.renlijia.mybatis.dao.PersonDAO;
import com.renlijia.mybatis.model.PersonDO;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author: ding
 * @Date: 2020-06-06 14:11
 */
@RestController
public class Controller {

    @Resource
    private PersonDAO personDAO;

    @Resource
    private TransactionTemplate transactionTemplate;

    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;


    static {
        // 初始化线程池
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("track-log-%d").build();
        THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }

    @RequestMapping("/concurrent/multithread")
    public String multiThreadTest(){

        transactionTemplate.execute((status)->{
            PersonDO personDO = new PersonDO();
            personDO.setName("黄晓明");
            personDO.setAge(35);
            personDAO.insert(personDO);


            THREAD_POOL_EXECUTOR.execute(()->{
               transactionTemplate.execute((transactionStatus)->{
                    PersonDO personDO2 = new PersonDO();
                    personDO2.setName("周迅");
                    personDO2.setAge(48);
                    personDAO.insert(personDO2);
                    return true;
                });
            });

            PersonDO personDO3 = new PersonDO();
            personDO3.setName("刘德华");
            personDO3.setAge(55);
            personDAO.insert(personDO3);
            throw new RuntimeException();
        });

        return null;
    }
}

参考文章:
https://www.cnblogs.com/dongying/p/4142476.html

上一篇下一篇

猜你喜欢

热点阅读