从源码看ShardingSphere设计-路由引擎篇
无论是分库分表、还是读写分离,一个SQL在DB上执行前都需要经过特定规则运算获得运行的目标库表信息。路由引擎的职责定位就是计算SQL应该在哪个数据库、哪个表上执行。前者结果会传给后续执行引擎,然后根据其数据库标识获取对应的数据库连接;后者结果则会传给改写引擎在SQL执行前进行表名的改写,即替换为正确的物理表名。
计算哪个数据库依据的算法是要用户配置的库路由规则,计算哪个表依据的算法是用户配置的表路由规则。目前在ShardingSphere中需要进行路由的功能模块有两个:分库分表sharding与读写分离master-slave。
代码调用分析
再回到BasePrepareEngine类中,在进行路由操作前先进行了路由装饰器的注册
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#registerRouteDecorator
private void registerRouteDecorator() {
for (Class<? extends RouteDecorator> each : OrderedRegistry.getRegisteredClasses(RouteDecorator.class)) {
RouteDecorator routeDecorator = createRouteDecorator(each);
Class<?> ruleClass = (Class<?>) routeDecorator.getType();
// FIXME rule.getClass().getSuperclass() == ruleClass for orchestration, should decouple extend between orchestration rule and sharding rule
rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
.forEach(rule -> router.registerDecorator(rule, routeDecorator));
}
}
之后开始路由的真正流程,路由入口类是DataNodeRouter
org.apache.shardingsphere.underlying.route.DataNodeRouter
/**
* Data node router.
*/
@RequiredArgsConstructor
public final class DataNodeRouter {
private final ShardingSphereMetaData metaData;
private final ConfigurationProperties properties;
private final SQLParserEngine parserEngine;
private final Map<BaseRule, RouteDecorator> decorators = new LinkedHashMap<>();
private SPIRoutingHook routingHook = new SPIRoutingHook();
/**
* Register route decorator.
*
* @param rule rule
* @param decorator route decorator
*/
public void registerDecorator(final BaseRule rule, final RouteDecorator decorator) {
decorators.put(rule, decorator);
}
/**
* Route SQL.
*
* @param sql SQL
* @param parameters SQL parameters
* @param useCache whether cache SQL parse result
* @return route context
*/
public RouteContext route(final String sql, final List<Object> parameters, final boolean useCache) {
routingHook.start(sql);
try {
RouteContext result = executeRoute(sql, parameters, useCache);//进行路由计算,生成路由结果
routingHook.finishSuccess(result, metaData.getSchema());
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
routingHook.finishFailure(ex);
throw ex;
}
}
@SuppressWarnings("unchecked")
private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
RouteContext result = createRouteContext(sql, parameters, useCache);
for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
}
return result;
}
private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
SQLStatement sqlStatement = parserEngine.parse(sql, useCache);//解析SQL,生成SQL对应AST
try {
SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);// 生成SQL Statement上下文,相当于一部分语义分析
return new RouteContext(sqlStatementContext, parameters, new RouteResult());
// TODO should pass parameters for master-slave
} catch (final IndexOutOfBoundsException ex) {
return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
}
}
}
由上可以看到,在调用解析引擎拿到SQLStatement实例后,先通过SQLStatementContext工厂类SQLStatementContextFactory创建了SQLStatementContext对象,这部分逻辑已在SQL解析章节进行了介绍这里就不展开。RouteDecorator的实现类目前只有两个分别对应数据分片ShardingRouteDecorator和主从MasterSlaveRouteDecorator。
数据分片路由装饰器
我们先看下最常见的数据分片路由修饰器ShardingRouteDecorator
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator
/**
* Sharding route decorator.
*/
public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> {
@SuppressWarnings("unchecked")
@Override
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
List<Object> parameters = routeContext.getParameters();
// 对SQL进行验证,主要用于判断一些不支持的SQL,在分片功能中不支持INSERT INTO .... ON DUPLICATE KEY、不支持更新sharding key
ShardingStatementValidatorFactory.newInstance(
sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
// 获取SQL的条件信息
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
// 检查所有Sharding值(表、列、值)是不是相同,如果不相同则抛出异常
checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
//剔除重复的sharding条件信息
mergeShardingConditions(shardingConditions);
}
// 创建分片路由引擎
ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
// 进行路由,生成路由结果
RouteResult routeResult = shardingRouteEngine.route(shardingRule);
if (needMergeShardingValues) {
Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
}
return new RouteContext(sqlStatementContext, parameters, routeResult);
}
private ShardingConditions getShardingConditions(final List<Object> parameters,
final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData, final ShardingRule shardingRule) {
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
if (sqlStatementContext instanceof InsertStatementContext) {
// 根据insert values中信息创建分片条件信息
return new ShardingConditions(new InsertClauseShardingConditionEngine(shardingRule).createShardingConditions((InsertStatementContext) sqlStatementContext, parameters));
}
// 根据where条件中信息创建分片条件信息
return new ShardingConditions(new WhereClauseShardingConditionEngine(shardingRule, schemaMetaData).createShardingConditions(sqlStatementContext, parameters));
}
return new ShardingConditions(Collections.emptyList());
}
从代码可以看到内部逻辑:
- 首先会到SQLStatement进行一些校验,主要判断SQL支持范围。
- 创建分片条件对象ShardingCondition,对应上面getShardingConditions方法,该方法完成分片条件ShardingCondition对象的创建,展开看下最常见的Where分片条件创造的逻辑。
org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine
/**
* Sharding condition engine for where clause.
*/
@RequiredArgsConstructor
public final class WhereClauseShardingConditionEngine {
private final ShardingRule shardingRule;
private final SchemaMetaData schemaMetaData;
/**
* Create sharding conditions.
*
* @param sqlStatementContext SQL statement context
* @param parameters SQL parameters
* @return sharding conditions
*/
public List<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final List<Object> parameters) {
if (!(sqlStatementContext instanceof WhereAvailable)) {
return Collections.emptyList();
}
List<ShardingCondition> result = new ArrayList<>();
Optional<WhereSegment> whereSegment = ((WhereAvailable) sqlStatementContext).getWhere();
if (whereSegment.isPresent()) {
result.addAll(createShardingConditions(sqlStatementContext, whereSegment.get().getAndPredicates(), parameters));
}
return result;
}
private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {
Collection<ShardingCondition> result = new LinkedList<>();
for (AndPredicate each : andPredicates) {
Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);// 得到where中各列对应的路由值,只有在配置中配置的列才会添加
if (routeValueMap.isEmpty()) {
return Collections.emptyList();
}
result.add(createShardingCondition(routeValueMap));// 根据列与路由值map创建分片条件对象,其中会合并重复的路由值
}
return result;
}
private Map<Column, Collection<RouteValue>> createRouteValueMap(final SQLStatementContext sqlStatementContext, final AndPredicate andPredicate, final List<Object> parameters) {
Map<Column, Collection<RouteValue>> result = new HashMap<>();
for (PredicateSegment each : andPredicate.getPredicates()) {
Optional<String> tableName = sqlStatementContext.getTablesContext().findTableName(each.getColumn(), schemaMetaData);
if (!tableName.isPresent() || !shardingRule.isShardingColumn(each.getColumn().getIdentifier().getValue(), tableName.get())) {
continue;
}
Column column = new Column(each.getColumn().getIdentifier().getValue(), tableName.get());
// 根据运算符创建对应的路由值,=、in为ListRouteValue类型,>、<、between等范围型为RangeRouteValue类型
Optional<RouteValue> routeValue = ConditionValueGeneratorFactory.generate(each.getRightValue(), column, parameters);
if (!routeValue.isPresent()) {
continue;
}
if (!result.containsKey(column)) {
result.put(column, new LinkedList<>());
}
result.get(column).add(routeValue.get());
}
return result;
}
private ShardingCondition createShardingCondition(final Map<Column, Collection<RouteValue>> routeValueMap) {
ShardingCondition result = new ShardingCondition();
for (Entry<Column, Collection<RouteValue>> entry : routeValueMap.entrySet()) {
try {
RouteValue routeValue = mergeRouteValues(entry.getKey(), entry.getValue());
if (routeValue instanceof AlwaysFalseRouteValue) {
return new AlwaysFalseShardingCondition();
}
result.getRouteValues().add(routeValue);
} catch (final ClassCastException ex) {
throw new ShardingSphereException("Found different types for sharding value `%s`.", entry.getKey());
}
}
return result;
}
- 然后分片路由引擎工厂类创建对应的分片路由引擎实例ShardingRouteEngine,看下引擎工厂类
org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngineFactory
/**
* Sharding routing engine factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingRouteEngineFactory {
/**
* Create new instance of routing engine.
*
* @param shardingRule sharding rule
* @param metaData meta data of ShardingSphere
* @param sqlStatementContext SQL statement context
* @param shardingConditions shardingConditions
* @param properties sharding sphere properties
* @return new instance of routing engine
*/
public static ShardingRouteEngine newInstance(final ShardingRule shardingRule,
final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext,
final ShardingConditions shardingConditions, final ConfigurationProperties properties) {
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
if (sqlStatement instanceof TCLStatement) {// 事务控制类SQL(commit、rollback、savepoint、set transaction),库广播类路由
return new ShardingDatabaseBroadcastRoutingEngine();
}
if (sqlStatement instanceof DDLStatement) {// DDL SQL(create、alter、drop、truncate...),表广播类路由
return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext);
}
if (sqlStatement instanceof DALStatement) {// DAL SQL (show database、show tables... ),根据SQL类型选择库广播、表路由或者默认库路由
return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
}
if (sqlStatement instanceof DCLStatement) {// DCL 采用表广播路由或者主库路由
return getDCLRoutingEngine(sqlStatementContext, metaData);
}
if (shardingRule.isAllInDefaultDataSource(tableNames)) {// 如果都是表名都配置默认数据源,则采用默认库路由
return new ShardingDefaultDatabaseRoutingEngine(tableNames);
}
if (shardingRule.isAllBroadcastTables(tableNames)) {// 如果都属于配置中的广播表,查询采用单一路由,随机选择配置的数据源
return sqlStatement instanceof SelectStatement ? new ShardingUnicastRoutingEngine(tableNames) : new ShardingDatabaseBroadcastRoutingEngine();
}
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && tableNames.isEmpty() && shardingRule.hasDefaultDataSourceName()) {// DML SQL 如果表名为空而且配置了默认库,则采用默认库路由
return new ShardingDefaultDatabaseRoutingEngine(tableNames);
}
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty() || !shardingRule.tableRuleExists(tableNames)) {// DML SQL如果表名为空或者未配置sharding规则,则采用单一路由,随机选择数据源
return new ShardingUnicastRoutingEngine(tableNames);
}
// 其它采用标准路由或者复杂路由
return getShardingRoutingEngine(shardingRule, sqlStatementContext, shardingConditions, tableNames, properties);
}
private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardingRule, final SQLStatement sqlStatement, final Collection<String> tableNames) {
if (sqlStatement instanceof UseStatement) {// Use SQL忽略类路由
return new ShardingIgnoreRoutingEngine();
}
if (sqlStatement instanceof SetStatement || sqlStatement instanceof ResetParameterStatement || sqlStatement instanceof ShowDatabasesStatement) {// Set、reset、show database 库广播类路由
return new ShardingDatabaseBroadcastRoutingEngine();
}
if (!tableNames.isEmpty() && !shardingRule.tableRuleExists(tableNames) && shardingRule.hasDefaultDataSourceName()) {// 如果表名在sharding规则中未配置的使用默认库路由
return new ShardingDefaultDatabaseRoutingEngine(tableNames);
}
if (!tableNames.isEmpty()) {//如果表名不为空,采用单一路由
return new ShardingUnicastRoutingEngine(tableNames);
}
return new ShardingDataSourceGroupBroadcastRoutingEngine();// 采用数据库群组路由
}
private static ShardingRouteEngine getDCLRoutingEngine(final SQLStatementContext sqlStatementContext, final ShardingSphereMetaData metaData) {
return isDCLForSingleTable(sqlStatementContext)
? new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext) : new ShardingMasterInstanceBroadcastRoutingEngine(metaData.getDataSources());
}
private static boolean isDCLForSingleTable(final SQLStatementContext sqlStatementContext) {
if (sqlStatementContext instanceof TableAvailable) {
TableAvailable tableSegmentsAvailable = (TableAvailable) sqlStatementContext;
return 1 == tableSegmentsAvailable.getAllTables().size() && !"*".equals(tableSegmentsAvailable.getAllTables().iterator().next().getTableName().getIdentifier().getValue());
}
return false;
}
private static ShardingRouteEngine getShardingRoutingEngine(final ShardingRule shardingRule, final SQLStatementContext sqlStatementContext,
final ShardingConditions shardingConditions, final Collection<String> tableNames, final ConfigurationProperties properties) {
Collection<String> shardingTableNames = shardingRule.getShardingLogicTableNames(tableNames);
if (1 == shardingTableNames.size() || shardingRule.isAllBindingTables(shardingTableNames)) {// 只有一张逻辑表或者都是绑定表,采用标准路由
return new ShardingStandardRoutingEngine(shardingTableNames.iterator().next(), sqlStatementContext, shardingConditions, properties);
}
// TODO config for cartesian set
return new ShardingComplexRoutingEngine(tableNames, sqlStatementContext, shardingConditions, properties);
}
}
可以看到不同的SQL类型,对应不同的路由策略,展开看下最常见的分片标准路由引擎
org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine
/**
* Sharding standard routing engine.
*/
@RequiredArgsConstructor
public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {
…
@Override
public RouteResult route(final ShardingRule shardingRule) {
if (isDMLForModify(sqlStatementContext) && 1 != ((TableAvailable) sqlStatementContext).getAllTables().size()) {// 判断SQL中涉及的表,insert、update、delete不支持多张表
throw new ShardingSphereException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
}
return generateRouteResult(getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName)));
}
private boolean isDMLForModify(final SQLStatementContext sqlStatementContext) {
return sqlStatementContext instanceof InsertStatementContext || sqlStatementContext instanceof UpdateStatementContext || sqlStatementContext instanceof DeleteStatementContext;
}
// 根据数据节点生成路由结果
private RouteResult generateRouteResult(final Collection<DataNode> routedDataNodes) {
RouteResult result = new RouteResult();
result.getOriginalDataNodes().addAll(originalDataNodes);
for (DataNode each : routedDataNodes) {
result.getRouteUnits().add(
new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName()))));
}
return result;
}
// 计算数据节点,计算路由的核心方法
private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
if (isRoutingByHint(shardingRule, tableRule)) {// 库表路由都是hint方式,通过hint路由
return routeByHint(shardingRule, tableRule);
}
if (isRoutingByShardingConditions(shardingRule, tableRule)) {// 库表路由都不是通过hint方式,则通过sharding条件进行路由
return routeByShardingConditions(shardingRule, tableRule);
}
return routeByMixedConditions(shardingRule, tableRule);// 库表路由中既有hint,又有sharding条件,则进行混合路由
}
…
private List<RouteValue> getDatabaseShardingValues(final ShardingRule shardingRule, final TableRule tableRule, final ShardingCondition shardingCondition) {
ShardingStrategy dataBaseShardingStrategy = shardingRule.getDatabaseShardingStrategy(tableRule);
return isGettingShardingValuesFromHint(dataBaseShardingStrategy)
? getDatabaseShardingValuesFromHint() : getShardingValuesFromShardingConditions(shardingRule, dataBaseShardingStrategy.getShardingColumns(), shardingCondition);
}
private List<RouteValue> getTableShardingValues(final ShardingRule shardingRule, final TableRule tableRule, final ShardingCondition shardingCondition) {
ShardingStrategy tableShardingStrategy = shardingRule.getTableShardingStrategy(tableRule);
return isGettingShardingValuesFromHint(tableShardingStrategy)
? getTableShardingValuesFromHint() : getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), shardingCondition);
}
// 获取通过hint设置的库路由值(通过HintManager.addDatabaseShardingValue或setDatabaseShardingValue设置)
private List<RouteValue> getDatabaseShardingValuesFromHint() {
return getRouteValues(HintManager.isDatabaseShardingOnly() ? HintManager.getDatabaseShardingValues() : HintManager.getDatabaseShardingValues(logicTableName));
}
// 获取通过hint设置的表路由值(通过HintManager.addTableShardingValue设置)
private List<RouteValue> getTableShardingValuesFromHint() {
return getRouteValues(HintManager.getTableShardingValues(logicTableName));
}
…
private Collection<DataNode> route0(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
Collection<String> routedDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues);// 计算应该路由到哪些库
Collection<DataNode> result = new LinkedList<>();
for (String each : routedDataSources) {
result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues));// 计算应该路由到哪些表
}
return result;
}
// 执行配置的库路由计算方法,得到路由到数据库标识
private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
if (databaseShardingValues.isEmpty()) {
return tableRule.getActualDatasourceNames();
}
Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
Preconditions.checkState(!result.isEmpty(), "no database route info");
Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result),
"Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
return result;
}
// 执行配置的表路由计算方法,得到实际表名
private Collection<DataNode> routeTables(final ShardingRule shardingRule, final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
: shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties));
Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
result.add(new DataNode(routedDataSource, each));
}
return result;
}
}
可以看到,标准分片路由引擎会根据分片规则ShardingRule配置的库分片策略databaseShardingStrategy属性和表tableShardingStrategy属性,执行它们的doSharding方法计算应该路由的库(数据源名称)与表(真正的物理表名)。
再继续看下标准分片策略类StandardShardingStrategy,其创建时需要提供StandardShardingStrategyConfiguration实例,即应用负责的分片策略配置类(如果采用spring xml或者yaml方式,ShardingSphere会负责自动创建该类,如果基于JAVA API方式,则需要应用自行进行创建此类)
org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy
/**
* Standard sharding strategy.
*/
public final class StandardShardingStrategy implements ShardingStrategy {
private final String shardingColumn;
private final PreciseShardingAlgorithm preciseShardingAlgorithm;
private final RangeShardingAlgorithm rangeShardingAlgorithm;
public StandardShardingStrategy(final StandardShardingStrategyConfiguration standardShardingStrategyConfig) {
Preconditions.checkNotNull(standardShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
Preconditions.checkNotNull(standardShardingStrategyConfig.getPreciseShardingAlgorithm(), "precise sharding algorithm cannot be null.");
shardingColumn = standardShardingStrategyConfig.getShardingColumn();
preciseShardingAlgorithm = standardShardingStrategyConfig.getPreciseShardingAlgorithm();// 应用要提供实现PreciseShardingAlgorithm接口的类
rangeShardingAlgorithm = standardShardingStrategyConfig.getRangeShardingAlgorithm();// 应用要提供实现RangeShardingAlgorithm接口的类
}
@Override
public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
RouteValue shardingValue = shardingValues.iterator().next();
Collection<String> shardingResult = shardingValue instanceof ListRouteValue
? doSharding(availableTargetNames, (ListRouteValue) shardingValue) : doSharding(availableTargetNames, (RangeRouteValue) shardingValue);
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
result.addAll(shardingResult);
return result;
}
@SuppressWarnings("unchecked")
private Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeRouteValue<?> shardingValue) {
if (null == rangeShardingAlgorithm) {
throw new UnsupportedOperationException("Cannot find range sharding strategy in sharding rule.");
}
return rangeShardingAlgorithm.doSharding(availableTargetNames,
new RangeShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), shardingValue.getValueRange()));
}
@SuppressWarnings("unchecked")
private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListRouteValue<?> shardingValue) {
Collection<String> result = new LinkedList<>();
for (Comparable<?> each : shardingValue.getValues()) {
String target = preciseShardingAlgorithm.doSharding(availableTargetNames, new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), each));
if (null != target) {
result.add(target);
}
}
return result;
}
…
}
总结下路由引擎的整个流程:
- DataNodeRouter会先调用解析引擎解析SQL,得到对应的SQLStatement(此处与解析模块进行了耦合,应该剥离出去,让外围编排去调用,或者统一放在prepare流程中,5.x版本中已优化);
- 通过SQLStatementContext工厂类根据SQLStatement创建SQLStatementContext实例;
- 初始化一个RouteContext,与ShardingRule一起传给RouteDecorator的实现类
- 经过RouteDecorator的路由计算后,创建真正的RouteContext返回。
主从路由装饰器
看完数据分片的路由装饰器,最后看下主从路由装饰器MasterSlaveRouteDecorator
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator
/**
* Route decorator for master-slave.
*/
public final class MasterSlaveRouteDecorator implements RouteDecorator<MasterSlaveRule> {
@Override
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
// 获取路由的数据源名称
String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
RouteResult routeResult = new RouteResult();
routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));
return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);
}
// 分库分表+读写分离模式下,在计算完数据分片库路由(数据源实际是主从规则名称)后,
// 还需要根据主从配置,替换为真实的数据源,因此需要先进行删除,再添加真实数据源
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Collection<RouteUnit> toBeAdded = new LinkedList<>();
for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
toBeRemoved.add(each);
String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
}
}
routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);
routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);
return routeContext;
}
可以看到其中主要是通过MasterSlaveDataSourceRouter这个类来计算路由到具体哪个数据库,在数据分片+读写分离混合模式下,还需要进行真实的数据源替换。
org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter
/**
* Data source router for master-slave.
*/
@RequiredArgsConstructor
public final class MasterSlaveDataSourceRouter {
private final MasterSlaveRule masterSlaveRule;
/**
* Route.
*
* @param sqlStatement SQL statement
* @return data source name
*/
public String route(final SQLStatement sqlStatement) {
if (isMasterRoute(sqlStatement)) {// 需要路由到主库(SQL中包含锁例如select for update、非select、通过hint指定主库路由)
MasterVisitedManager.setMasterVisited();
return masterSlaveRule.getMasterDataSourceName();
}
return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(// 根据负载均衡算法计算要访问的数据源
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));
}
private boolean isMasterRoute(final SQLStatement sqlStatement) {
return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
}
private boolean containsLockSegment(final SQLStatement sqlStatement) {
return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();
}
}
可以看到真正负责数据源选择的是masterSlaveRule.getLoadBalanceAlgorithm()即MasterSlaveLoadBalanceAlgorithm接口的getDataSource方法。该接口ShardingSphere内置的实现有两个,开发者也可以根据需要进行扩展。
org.apache.shardingsphere.core.strategy.masterslave.RoundRobinMasterSlaveLoadBalanceAlgorithm
/**
* Round-robin slave database load-balance algorithm.
*/
@Getter
@Setter
public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new ConcurrentHashMap<>();
private Properties properties = new Properties();
@Override
public String getType() {
return "ROUND_ROBIN";
}
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
COUNTS.putIfAbsent(name, count);
count.compareAndSet(slaveDataSourceNames.size(), 0);// 记录当前规则访问的次数,达到从库数量后从重置为0
return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size());// 通过访问次数取模从库数量,这样会依次获取到各从库
}
}
org.apache.shardingsphere.core.strategy.masterslave.RandomMasterSlaveLoadBalanceAlgorithm
/**
* Random slave database load-balance algorithm.
*/
@Getter
@Setter
public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
private Properties properties = new Properties();
@Override
public String getType() {
return "RANDOM";
}
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
return slaveDataSourceNames.get(ThreadLocalRandom.current().nextInt(slaveDataSourceNames.size()));//通过随机数选择其中一个从库
}
}
最后画一个路由引擎总体流程架构图:
路由引擎流程架构图
关于路由引擎更多的功能介绍,可参见官网文档https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/route/