人人快速开发平台 renren-fast 源码分析(四)多数据源
项目中可以根据注解声明的数据库,在特定的方法中切换数据源。下面就看看它是怎么做到的(尽管我不知道这么做有什么用)
首先看 Springboot 的 config 类有没有数据源相关的,从RenrenApplication
找一下子就找到了
/**
* 配置多数据源
* @author chenshun
* @email sunlightcs@gmail.com
* @date 2017/8/19 0:41
*/
@Configuration
public class DynamicDataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.druid.first")
public DataSource firstDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.second")
public DataSource secondDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@Primary
public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
return new DynamicDataSource(firstDataSource, targetDataSources);
}
}
根据数据源的配置,找到datasources
包有个DynamicDataSource
类
/**
* 动态数据源
* @author chenshun
* @email sunlightcs@gmail.com
* @date 2017/8/19 1:03
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
super.setDefaultTargetDataSource(defaultTargetDataSource);
super.setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
@Override
protected Object determineCurrentLookupKey() {
return getDataSource();
}
public static void setDataSource(String dataSource) {
contextHolder.set(dataSource);
}
public static String getDataSource() {
return contextHolder.get();
}
public static void clearDataSource() {
contextHolder.remove();
}
}
光是看这个类其实看不出什么东西,只知道有个ThreadLocal
常量,并且可以用静态方法配置它。那么我们看看父类AbstractRoutingDataSource
/**
* Abstract {@link javax.sql.DataSource} implementation that routes {@link #getConnection()}
* calls to one of various target DataSources based on a lookup key. The latter is usually
* (but not necessarily) determined through some thread-bound transaction context.
*
* @author Juergen Hoeller
* @since 2.0.1
* @see #setTargetDataSources
* @see #setDefaultTargetDataSource
* @see #determineCurrentLookupKey()
*/
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
@Nullable
private Map<Object, Object> targetDataSources;
@Nullable
private Object defaultTargetDataSource;
private boolean lenientFallback = true;
private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
@Nullable
private Map<Object, DataSource> resolvedDataSources;
@Nullable
private DataSource resolvedDefaultDataSource;
/**
* Specify the map of target DataSources, with the lookup key as key.
* The mapped value can either be a corresponding {@link javax.sql.DataSource}
* instance or a data source name String (to be resolved via a
* {@link #setDataSourceLookup DataSourceLookup}).
* <p>The key can be of arbitrary type; this class implements the
* generic lookup process only. The concrete key representation will
* be handled by {@link #resolveSpecifiedLookupKey(Object)} and
* {@link #determineCurrentLookupKey()}.
*/
public void setTargetDataSources(Map<Object, Object> targetDataSources) {
this.targetDataSources = targetDataSources;
}
/**
* Specify the default target DataSource, if any.
* <p>The mapped value can either be a corresponding {@link javax.sql.DataSource}
* instance or a data source name String (to be resolved via a
* {@link #setDataSourceLookup DataSourceLookup}).
* <p>This DataSource will be used as target if none of the keyed
* {@link #setTargetDataSources targetDataSources} match the
* {@link #determineCurrentLookupKey()} current lookup key.
*/
public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
this.defaultTargetDataSource = defaultTargetDataSource;
}
/**
* Specify whether to apply a lenient fallback to the default DataSource
* if no specific DataSource could be found for the current lookup key.
* <p>Default is "true", accepting lookup keys without a corresponding entry
* in the target DataSource map - simply falling back to the default DataSource
* in that case.
* <p>Switch this flag to "false" if you would prefer the fallback to only apply
* if the lookup key was {@code null}. Lookup keys without a DataSource
* entry will then lead to an IllegalStateException.
* @see #setTargetDataSources
* @see #setDefaultTargetDataSource
* @see #determineCurrentLookupKey()
*/
public void setLenientFallback(boolean lenientFallback) {
this.lenientFallback = lenientFallback;
}
/**
* Set the DataSourceLookup implementation to use for resolving data source
* name Strings in the {@link #setTargetDataSources targetDataSources} map.
* <p>Default is a {@link JndiDataSourceLookup}, allowing the JNDI names
* of application server DataSources to be specified directly.
*/
public void setDataSourceLookup(@Nullable DataSourceLookup dataSourceLookup) {
this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup());
}
@Override
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property 'targetDataSources' is required");
}
this.resolvedDataSources = new HashMap<>(this.targetDataSources.size());
this.targetDataSources.forEach((key, value) -> {
Object lookupKey = resolveSpecifiedLookupKey(key);
DataSource dataSource = resolveSpecifiedDataSource(value);
this.resolvedDataSources.put(lookupKey, dataSource);
});
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
/**
* Resolve the given lookup key object, as specified in the
* {@link #setTargetDataSources targetDataSources} map, into
* the actual lookup key to be used for matching with the
* {@link #determineCurrentLookupKey() current lookup key}.
* <p>The default implementation simply returns the given key as-is.
* @param lookupKey the lookup key object as specified by the user
* @return the lookup key as needed for matching
*/
protected Object resolveSpecifiedLookupKey(Object lookupKey) {
return lookupKey;
}
/**
* Resolve the specified data source object into a DataSource instance.
* <p>The default implementation handles DataSource instances and data source
* names (to be resolved via a {@link #setDataSourceLookup DataSourceLookup}).
* @param dataSource the data source value object as specified in the
* {@link #setTargetDataSources targetDataSources} map
* @return the resolved DataSource (never {@code null})
* @throws IllegalArgumentException in case of an unsupported value type
*/
protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
if (dataSource instanceof DataSource) {
return (DataSource) dataSource;
}
else if (dataSource instanceof String) {
return this.dataSourceLookup.getDataSource((String) dataSource);
}
else {
throw new IllegalArgumentException(
"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
}
}
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return determineTargetDataSource().getConnection(username, password);
}
@Override
@SuppressWarnings("unchecked")
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isInstance(this)) {
return (T) this;
}
return determineTargetDataSource().unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
}
/**
* Retrieve the current target DataSource. Determines the
* {@link #determineCurrentLookupKey() current lookup key}, performs
* a lookup in the {@link #setTargetDataSources targetDataSources} map,
* falls back to the specified
* {@link #setDefaultTargetDataSource default target DataSource} if necessary.
* @see #determineCurrentLookupKey()
*/
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
/**
* Determine the current lookup key. This will typically be
* implemented to check a thread-bound transaction context.
* <p>Allows for arbitrary keys. The returned key needs
* to match the stored lookup key type, as resolved by the
* {@link #resolveSpecifiedLookupKey} method.
*/
@Nullable
protected abstract Object determineCurrentLookupKey();
}
根据注释可以看出,这个类重写了getConnection()
方法。而getConnection()
方法实现的逻辑就是从determineCurrentLookupKey()
中获取数据源,然后打开连接。结合自己实现的DynamicDataSource
我们不难得出一个结论:项目是根据ThreadLocal
常量来获取数据库连接的。这个我们后面再证实,先来看看项目中是什么时候切换数据库的。
datasources
包下还有个aspect
包,一看就让人觉得是通过 aop 来切换数据源的了。直接来看代码
DataSourceAspect
/**
* 多数据源,切面处理类
* @author chenshun
* @email sunlightcs@gmail.com
* @date 2017/9/16 22:20
*/
@Aspect
@Component
public class DataSourceAspect implements Ordered {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Pointcut("@annotation(io.renren.datasources.annotation.DataSource)")
public void dataSourcePointCut() {
}
@Around("dataSourcePointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
DataSource ds = method.getAnnotation(DataSource.class);
if(ds == null){
DynamicDataSource.setDataSource(DataSourceNames.FIRST);
logger.debug("set datasource is " + DataSourceNames.FIRST);
}else {
DynamicDataSource.setDataSource(ds.name());
logger.debug("set datasource is " + ds.name());
}
try {
return point.proceed();
} finally {
DynamicDataSource.clearDataSource();
logger.debug("clean datasource");
}
}
@Override
public int getOrder() {
return 1;
}
}
果不其然,是根据 aop 来取出调用的方法的@DataSource
中的数据源字符串值,决定用什么数据源的。结合上面的代码,可以得出结论:是在调用有@DataSource
修饰的方法时,修改的当前线程数据源。印象中 Servlet 默认是一个请求一条线程处理,所以是这个请求内的数据源都改变了。
全局搜索一下@DataSource
,发现只有一个测试的 service 有用到
/**
* 测试多数据源
*
* @author Mark sunlightcs@gmail.com
* @since 3.1.0 2018-01-28
*/
@Service
public class DataSourceTestService {
@Autowired
private SysUserService sysUserService;
public SysUserEntity queryUser(Long userId){
return sysUserService.selectById(userId);
}
@DataSource(name = DataSourceNames.SECOND)
public SysUserEntity queryUser2(Long userId){
return sysUserService.selectById(userId);
}
}
那么切换数据源这部分,在项目源码中就到此为止了。
我们探究一下在 Mybatis 中是什么时候根据数据源来创建连接的。
首先我们要知道 Mybatis 的工作流程。这个读一读官网就知道,Mybatis-Spring 先通过SqlSessionFactoryBean
创建SqlSessionFactory
实例,然后在适当的时候创建SqlSession
,创建SqlSession
的时候应该已经打开数据库连接了,因为要管理事务的。所以我们其实可以直接找到SqlSessionFactory
的实现类:DefaultSqlSessionFactory
public class DefaultSqlSessionFactory implements SqlSessionFactory {
private final Configuration configuration;
public DefaultSqlSessionFactory(Configuration configuration) {
this.configuration = configuration;
}
@Override
public SqlSession openSession() {
return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
}
@Override
public SqlSession openSession(boolean autoCommit) {
return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, autoCommit);
}
@Override
public SqlSession openSession(ExecutorType execType) {
return openSessionFromDataSource(execType, null, false);
}
@Override
public SqlSession openSession(TransactionIsolationLevel level) {
return openSessionFromDataSource(configuration.getDefaultExecutorType(), level, false);
}
@Override
public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) {
return openSessionFromDataSource(execType, level, false);
}
@Override
public SqlSession openSession(ExecutorType execType, boolean autoCommit) {
return openSessionFromDataSource(execType, null, autoCommit);
}
@Override
public SqlSession openSession(Connection connection) {
return openSessionFromConnection(configuration.getDefaultExecutorType(), connection);
}
@Override
public SqlSession openSession(ExecutorType execType, Connection connection) {
return openSessionFromConnection(execType, connection);
}
@Override
public Configuration getConfiguration() {
return configuration;
}
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
try {
boolean autoCommit;
try {
autoCommit = connection.getAutoCommit();
} catch (SQLException e) {
// Failover to true, as most poor drivers
// or databases won't support transactions
autoCommit = true;
}
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
final Transaction tx = transactionFactory.newTransaction(connection);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) {
if (environment == null || environment.getTransactionFactory() == null) {
return new ManagedTransactionFactory();
}
return environment.getTransactionFactory();
}
private void closeTransaction(Transaction tx) {
if (tx != null) {
try {
tx.close();
} catch (SQLException ignore) {
// Intentionally ignore. Prefer previous error.
}
}
}
}
从这个 factory 的源码可以读出,创建SqlSession
调用的是openSessionFromDataSource()
方法。在这个方法中:
- 先是从
configuration
中拿到enviroment
,熟悉 Mybatis 的朋友都知道我们配置数据源就是在enviroment
中 - 然后调用
getTransactionFactoryFromEnvironment()
创建了TransactionFactory
实例 - 根据
TransactionFactory
开启了事务tx
- 根据这个事务
tx
创建了执行类executor
- 最后创建了一个
DefaultSqlSession
,也就是我们用来做对数据库查询操作的类。
一步步来看是什么时候打开的连接吧
创建 TransactionFactory 实例
直接看getTransactionFactoryFromEnvironment()
方法源码。
private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) {
if (environment == null || environment.getTransactionFactory() == null) {
return new ManagedTransactionFactory();
}
return environment.getTransactionFactory();
}
如果没有配置 transactionFactory,那么返回ManagedTransactionFactory
ManagedTransactionFactory.java
public class ManagedTransactionFactory implements TransactionFactory {
private boolean closeConnection = true;
@Override
public void setProperties(Properties props) {
if (props != null) {
String closeConnectionProperty = props.getProperty("closeConnection");
if (closeConnectionProperty != null) {
closeConnection = Boolean.valueOf(closeConnectionProperty);
}
}
}
@Override
public Transaction newTransaction(Connection conn) {
return new ManagedTransaction(conn, closeConnection);
}
@Override
public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
// Silently ignores autocommit and isolation level, as managed transactions are entirely
// controlled by an external manager. It's silently ignored so that
// code remains portable between managed and unmanaged configurations.
return new ManagedTransaction(ds, level, closeConnection);
}
}
根据 TransactionFactory 开启了事务
ManagedTransaction.java
/**
* {@link Transaction} that lets the container manage the full lifecycle of the transaction.
* Delays connection retrieval until getConnection() is called.
* Ignores all commit or rollback requests.
* By default, it closes the connection but can be configured not to do it.
*
* @author Clinton Begin
*
* @see ManagedTransactionFactory
*/
public class ManagedTransaction implements Transaction {
private static final Log log = LogFactory.getLog(ManagedTransaction.class);
private DataSource dataSource;
private TransactionIsolationLevel level;
private Connection connection;
private final boolean closeConnection;
public ManagedTransaction(Connection connection, boolean closeConnection) {
this.connection = connection;
this.closeConnection = closeConnection;
}
public ManagedTransaction(DataSource ds, TransactionIsolationLevel level, boolean closeConnection) {
this.dataSource = ds;
this.level = level;
this.closeConnection = closeConnection;
}
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
@Override
public void commit() throws SQLException {
// Does nothing
}
@Override
public void rollback() throws SQLException {
// Does nothing
}
@Override
public void close() throws SQLException {
if (this.closeConnection && this.connection != null) {
if (log.isDebugEnabled()) {
log.debug("Closing JDBC Connection [" + this.connection + "]");
}
this.connection.close();
}
}
protected void openConnection() throws SQLException {
if (log.isDebugEnabled()) {
log.debug("Opening JDBC Connection");
}
this.connection = this.dataSource.getConnection();
if (this.level != null) {
this.connection.setTransactionIsolation(this.level.getLevel());
}
}
@Override
public Integer getTimeout() throws SQLException {
return null;
}
}
这里有个openConnection()
方法根据dataSource
打开了数据库连接。这跟我们最初自己配的动态数据源连接上了,接下来就看何时调用了。
根据这个事务创建了执行类 executor
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
可以看出这个executor
还用interceptorChain
添加了 plugins,这跟常用的分页插件也有关,这里就不多说了。
创建 DefaultSqlSession
这个类代码太多了,我就仅贴出关键部分
@Override
public Connection getConnection() {
try {
return executor.getTransaction().getConnection();
} catch (SQLException e) {
throw ExceptionFactory.wrapException("Error getting a new connection. Cause: " + e, e);
}
}
这里调用了我们之前创建的事务类的getConnection()
方法,在那个方法中打开了我们自定义的数据源的连接。而在我们自定义的DynamicDataSource
中,getConnection()
又是根据ThreadLocal
常量来获取数据源的,所以只要当前请求的线程中被自定义切面类修改了数据源,那么等到这个线程要用 Mybatis打开数据源的连接的时候,就会打开切换过后的数据源的连接了。
本篇文章内容也不多,不过根据上述的 Mybatis 的工作流程来读它的源码,很容易就可以读出其他功能是怎么实现的,比如说 Mybatis-Spring 中的注入 Mapper,实际上是通过 Mybatis 的动态代理,解析调用的 Mapper 接口的方法对象,获取注解、方法名、参数等等信息,再用SqlSession
来调用。