springboot-多数据源原理

2020-12-21  本文已影响0人  麦大大吃不胖

by shihang.mai

1. 为什么需要动态数据源

我们来查看源码

  1. mapper.getXXXX
  2. MapperProxy的invoke()
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (Object.class.equals(method.getDeclaringClass())) {
            try {
                return method.invoke(this, args);
            } catch (Throwable var5) {
                throw ExceptionUtil.unwrapThrowable(var5);
            }
        } else {
            MapperMethod mapperMethod = this.cachedMapperMethod(method);
            //-----------看这-------------
            return mapperMethod.execute(this.sqlSession, args);
        }
    }
  1. MapperMethod的execute()
public Object execute(SqlSession sqlSession, Object[] args) {
        Object param;
        Object result;
        if (SqlCommandType.INSERT == this.command.getType()) {
            param = this.method.convertArgsToSqlCommandParam(args);
            result = this.rowCountResult(sqlSession.insert(this.command.getName(), param));
        } else if (SqlCommandType.UPDATE == this.command.getType()) {
            param = this.method.convertArgsToSqlCommandParam(args);
            result = this.rowCountResult(sqlSession.update(this.command.getName(), param));
        } else if (SqlCommandType.DELETE == this.command.getType()) {
            param = this.method.convertArgsToSqlCommandParam(args);
            result = this.rowCountResult(sqlSession.delete(this.command.getName(), param));
        } else if (SqlCommandType.SELECT == this.command.getType()) {
            if (this.method.returnsVoid() && this.method.hasResultHandler()) {
                this.executeWithResultHandler(sqlSession, args);
                result = null;
            } else if (this.method.returnsMany()) {
                result = this.executeForMany(sqlSession, args);
            } else if (this.method.returnsMap()) {
                result = this.executeForMap(sqlSession, args);
            } else {
                param = this.method.convertArgsToSqlCommandParam(args);
                //-----------看这-------------
                result = sqlSession.selectOne(this.command.getName(), param);
            }
        } else {
            if (SqlCommandType.FLUSH != this.command.getType()) {
                throw new BindingException("Unknown execution method for: " + this.command.getName());
            }

            result = sqlSession.flushStatements();
        }

        if (result == null && this.method.getReturnType().isPrimitive() && !this.method.returnsVoid()) {
            throw new BindingException("Mapper method '" + this.command.getName() + " attempted to return null from a method with a primitive return type (" + this.method.getReturnType() + ").");
        } else {
            return result;
        }
    }
  1. SqlSessionTemplate的selectOne()
public <T> T selectOne(String statement, Object parameter) {
        return this.sqlSessionProxy.selectOne(statement, parameter);
    }
  1. SqlSessionTemplate内部类SqlSessionInterceptor.invoke()
private class SqlSessionInterceptor implements InvocationHandler {
        private SqlSessionInterceptor() {
        }

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //--------------2.看这---------------
            SqlSession sqlSession = SqlSessionUtils.getSqlSession(SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);

            Object unwrapped;
            try {
                //看序号7 DefaultSqlSession
                Object result = method.invoke(sqlSession, args);
                if (!SqlSessionUtils.isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
                    sqlSession.commit(true);
                }

                unwrapped = result;
            } catch (Throwable var11) {
                unwrapped = ExceptionUtil.unwrapThrowable(var11);
                if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                    SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
                    sqlSession = null;
                    Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException)unwrapped);
                    if (translated != null) {
                        unwrapped = translated;
                    }
                }

                throw (Throwable)unwrapped;
            } finally {
                if (sqlSession != null) {
                    //-----------1.看这---------
                    SqlSessionUtils.closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
                }

            }

            return unwrapped;
        }
    }
  1. SqlSessionUtils.closeSqlSession
//1
public static void closeSqlSession(SqlSession session, SqlSessionFactory sessionFactory) {
        Assert.notNull(session, "No SqlSession specified");
        Assert.notNull(sessionFactory, "No SqlSessionFactory specified");
        SqlSessionHolder holder = (SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);
        if (holder != null && holder.getSqlSession() == session) {
            //开启事务,没关闭session
            if (logger.isDebugEnabled()) {
                logger.debug("Releasing transactional SqlSession [" + session + "]");
            }

            holder.released();
        } else {
            //没开启事务走这个分支,关闭session
            if (logger.isDebugEnabled()) {
                logger.debug("Closing non transactional SqlSession [" + session + "]");
            }

            session.close();
        }

    }
//2 发现sqlSession会先从缓存中获取。
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
        Assert.notNull(sessionFactory, "No SqlSessionFactory specified");
        Assert.notNull(executorType, "No ExecutorType specified");
        SqlSessionHolder holder = (SqlSessionHolder)TransactionSynchronizationManager.getResource(sessionFactory);
        if (holder != null && holder.isSynchronizedWithTransaction()) {
            if (holder.getExecutorType() != executorType) {
                throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction");
            } else {
                holder.requested();
                if (logger.isDebugEnabled()) {
                    logger.debug("Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
                }

                return holder.getSqlSession();
            }
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Creating a new SqlSession");
            }

            SqlSession session = sessionFactory.openSession(executorType);
            if (TransactionSynchronizationManager.isSynchronizationActive()) {
                Environment environment = sessionFactory.getConfiguration().getEnvironment();
                if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Registering transaction synchronization for SqlSession [" + session + "]");
                    }

                    holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
                    TransactionSynchronizationManager.bindResource(sessionFactory, holder);
                    TransactionSynchronizationManager.registerSynchronization(new SqlSessionUtils.SqlSessionSynchronization(holder, sessionFactory));
                    holder.setSynchronizedWithTransaction(true);
                    holder.requested();
                } else {
                    if (TransactionSynchronizationManager.getResource(environment.getDataSource()) != null) {
                        throw new TransientDataAccessResourceException("SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
                    }

                    if (logger.isDebugEnabled()) {
                        logger.debug("SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional");
                    }
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug("SqlSession [" + session + "] was not registered for synchronization because synchronization is not active");
            }

            return session;
        }
    }

解释一大轮,就是如果开启了事务,那么只要是一个service中的所有数据库操作都属于一个sqlsession,反之则不是

  1. DefaultSqlSession的selectOne
public <T> T selectOne(String statement, Object parameter) {
        List<T> list = this.selectList(statement, parameter);
        if (list.size() == 1) {
            return list.get(0);
        } else if (list.size() > 1) {
            throw new TooManyResultsException("Expected one result (or null) to be returned by selectOne(), but found: " + list.size());
        } else {
            return null;
        }
    }
public <E> List<E> selectList(String statement, Object parameter) {
        return this.selectList(statement, parameter, RowBounds.DEFAULT);
    }
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
        List var5;
        try {
            MappedStatement ms = this.configuration.getMappedStatement(statement);
            //-------看这--------
            var5 = this.executor.query(ms, this.wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
        } catch (Exception var9) {
            throw ExceptionFactory.wrapException("Error querying database.  Cause: " + var9, var9);
        } finally {
            ErrorContext.instance().reset();
        }

        return var5;
    }
  1. BaseExecutor的query
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
        BoundSql boundSql = ms.getBoundSql(parameter);
        CacheKey key = this.createCacheKey(ms, parameter, rowBounds, boundSql);
        //---------看这----------
        return this.query(ms, parameter, rowBounds, resultHandler, key, boundSql);
    }
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
        ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
        if (this.closed) {
            throw new ExecutorException("Executor was closed.");
        } else {
            if (this.queryStack == 0 && ms.isFlushCacheRequired()) {
                this.clearLocalCache();
            }

            List list;
            try {
                ++this.queryStack;
                list = resultHandler == null ? (List)this.localCache.getObject(key) : null;
                if (list != null) {
                    this.handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
                } else {
                    //---------看这------
                    list = this.queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
                }
            } finally {
                --this.queryStack;
            }

            if (this.queryStack == 0) {
                Iterator i$ = this.deferredLoads.iterator();

                while(i$.hasNext()) {
                    BaseExecutor.DeferredLoad deferredLoad = (BaseExecutor.DeferredLoad)i$.next();
                    deferredLoad.load();
                }

                this.deferredLoads.clear();
                if (this.configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
                    this.clearLocalCache();
                }
            }

            return list;
        }
    }
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
        this.localCache.putObject(key, ExecutionPlaceholder.EXECUTION_PLACEHOLDER);

        List list;
        try {
            list = this.doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
        } finally {
            this.localCache.removeObject(key);
        }

        this.localCache.putObject(key, list);
        if (ms.getStatementType() == StatementType.CALLABLE) {
            this.localOutputParameterCache.putObject(key, parameter);
        }

        return list;
    }
  1. SimpleExecutor的doQuery
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
        Statement stmt = null;

        List var9;
        try {
            Configuration configuration = ms.getConfiguration();
            StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
            //---------看这---------
            stmt = this.prepareStatement(handler, ms.getStatementLog());
            var9 = handler.query(stmt, resultHandler);
        } finally {
            this.closeStatement(stmt);
        }

        return var9;
    }
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
        //-------看这-----------
        Connection connection = this.getConnection(statementLog);
        Statement stmt = handler.prepare(connection);
        handler.parameterize(stmt);
        return stmt;
    }
protected Connection getConnection(Log statementLog) throws SQLException {
        //-------看这----------
        Connection connection = this.transaction.getConnection();
        return statementLog.isDebugEnabled() ? ConnectionLogger.newInstance(connection, statementLog, this.queryStack) : connection;
    }
  1. SpringManagedTransaction的getConnection()
//事务管理器中如果有连接,直接返回,否则从数据源DataSource获取
public class SpringManagedTransaction implements Transaction {
    private final DataSource dataSource;
    private Connection connection;
    private boolean isConnectionTransactional;
    private boolean autoCommit;

    public Connection getConnection() throws SQLException {
        if (this.connection == null) {
            this.openConnection();
        }

        return this.connection;
    }


    private void openConnection() throws SQLException {
        this.connection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.connection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
        if (logger.isDebugEnabled()) {
            logger.debug("JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
        }

    }
}


一个数据源只能对应一个数据库,这时我们有多个数据库即多个数据源,可是事务管理器中只有一个数据源的引用,所以我们需要动态数据源

2. 如何做(只写核心部分)

  1. 事务管理器中只有一个数据源的引用,所以我们可以自定义一个类 DynamicDataSource 来实现 DataSource
  2. DynamicDataSource 中存储我们配置的多数据源,然后将 DynamicDataSource 的实例配置给事务管理器
  3. 当从事务管理器获取 Connection 对象的时候,会从 DynamicDataSource 实例获取,然后再由 DynamicDataSource 根据 routeKey 路由到某个具体的数据源,从中获取 Connection。
多数据源

Spring 也考虑到了这一点,提供了一个抽象类AbstractRoutingDataSource

public class DynamicDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        //使用DatabaseContextHolder获取当前线程的DatabaseType
        return DatabaseContextHolder.getDatabaseType();
    }
}

3. 动态数据源原理图

多数据源原理
上一篇 下一篇

猜你喜欢

热点阅读