ShardingJdbc 2.0读写分离源码浅析
2018-01-17 本文已影响0人
特仑苏纯牛乳
“Sharding-JDBC是一个开源的适用于微服务的分布式数据访问基础类库,它始终以云原生的基础开发套件为目标。”这些概述类的大家自己到官网看吧,这个项目是用TDD方式开发的项目,可以先从单元测试入手了解读写分离的实现
读写分离单元测试 MasterSlaveDataSourceFactoryTest :
@Test
public void assertCreateDataSourceForMultipleSlaves() throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(3, 1);
dataSourceMap.put("master_ds", new TestDataSource("master_ds"));
dataSourceMap.put("slave_ds_0", new TestDataSource("slave_ds_0"));
dataSourceMap.put("slave_ds_1", new TestDataSource("slave_ds_1"));
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration();
masterSlaveRuleConfig.setName("logic_ds");
masterSlaveRuleConfig.setMasterDataSourceName("master_ds");
masterSlaveRuleConfig.setSlaveDataSourceNames(Arrays.asList("slave_ds_0", "slave_ds_1"));
masterSlaveRuleConfig.setLoadBalanceAlgorithmClassName(MasterSlaveLoadBalanceAlgorithmType.ROUND_ROBIN.name());
Map<String, Object> configMap = new ConcurrentHashMap<>();
configMap.put("key1", "value1");
assertThat(MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveRuleConfig, configMap), instanceOf(MasterSlaveDataSource.class));
MatcherAssert.assertThat(ConfigMapContext.getInstance().getMasterSlaveConfig(), is(configMap));
}
根据提供的标准UT,接下来我们需要搞定两个问题,1.shardingJdbc怎样实现的主从路由 2.slave节点之间的路由算法
先来看第一个问题,shardingJdbc怎样实现的主从路由:
首先找到入口,同样是参照TDD的思想:AbstractMasterSlaveOnlyTest,以这个单元测试为入口
这个看起来还是非常痛苦的,过程自己享受吧。
PreparedStatement里存储了这个节点类型。
SQLJudgeEngine(sql).judge()---->SQLStatement.SQLType
核心代码是一个词法解析器:Lexer
while (true) {
TokenType tokenType = lexer.getCurrentToken().getType();
if (tokenType instanceof Keyword) {
if (DefaultKeyword.SELECT == tokenType) {
return new SelectStatement();
}
if (DefaultKeyword.INSERT == tokenType || DefaultKeyword.UPDATE == tokenType || DefaultKeyword.DELETE == tokenType) {
return new DMLStatement();
}
if (DefaultKeyword.CREATE == tokenType || DefaultKeyword.ALTER == tokenType || DefaultKeyword.DROP == tokenType || DefaultKeyword.TRUNCATE == tokenType) {
return new DDLStatement();
}
}
if (tokenType instanceof Assist && Assist.END == tokenType) {
throw new SQLParsingException("Unsupported SQL statement: [%s]", sql);
}
lexer.nextToken();
}
第二个问题 2.slave节点之间的路由算法
masterSlaveRuleConfig.setLoadBalanceAlgorithmClassName(MasterSlaveLoadBalanceAlgorithmType.ROUND_ROBIN.name());
就是这个设置 他本来的示例代码里没有我特意加了一个。
路由轮询核心代码
public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
private static final ConcurrentHashMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>();
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
AtomicInteger count = COUNT_MAP.containsKey(name) ? COUNT_MAP.get(name) : new AtomicInteger(0);
COUNT_MAP.putIfAbsent(name, count);
count.compareAndSet(slaveDataSourceNames.size(), 0);
return slaveDataSourceNames.get(count.getAndIncrement() % slaveDataSourceNames.size());
}
}
以上的四行核心代码很是牛逼
MasterSlaveConnection.getConnections(…).masterSlaveDataSource.getDataSource