Mycat路由
2018-05-06 本文已影响0人
tony_0c73
路由接口
io.mycat.route.RouteService
方法:
RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
int sqlType, String stmt, String charset, ServerConnection sc)
计算流程概述
image.png image.pngconditions 为<表名,字段名,字段值> 3元组
单表计算流程
image.png
代码分析
AbstractRouteStrategy
@Override
public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
//对应schema标签checkSQLschema属性,把表示schema的字符去掉
if (schema.isCheckSQLSchema()) {
origSQL = RouterUtil.removeSchema(origSQL, schema.getName());
}
/**
* 处理一些路由之前的逻辑
* 全局序列号,父子表插入
*/
if ( beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
return null;
}
/**
* SQL 语句拦截
*/
String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);
if (!origSQL.equals(stmt) && LOGGER.isDebugEnabled()) {
LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
}
RouteResultset rrs = new RouteResultset(stmt, sqlType);
/**
* 优化debug loaddata输出cache的日志会极大降低性能
*/
if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
rrs.setCacheAble(false);
}
/**
* rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
* select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
*/
if (sc != null ) {
rrs.setAutocommit(sc.isAutocommit());
}
/**
* DDL 语句的路由
*/
if (ServerParse.DDL == sqlType) {
return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
}
/**
* 检查是否有分片
*/
if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
} else {
RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
if (returnedSet == null) {
rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
}
}
return rrs;
}
从上面代码获知路由计算是在方法 routeNormalSqlWithAST中,这个是个抽象方法由子类来实现,目前是DruidMycatRouteStrategy
DruidMycatRouteStrategy.routeNormalSqlWithAST 路由是通过以下方法进行的
/**
* 直接结果路由
*/
private RouteResultset directRoute(RouteResultset rrs,DruidShardingParseInfo ctx,SchemaConfig schema,
DruidParser druidParser,SQLStatement statement,LayerCachePool cachePool) throws SQLNonTransientException{
//改写sql:如insert语句主键自增长, 在直接结果路由的情况下,进行sql 改写处理
druidParser.changeSql(schema, rrs, statement,cachePool);
/**
* DruidParser 解析过程中已完成了路由的直接返回
*/
if ( rrs.isFinishedRoute() ) {
return rrs;
}
/**
* 没有from的select语句或其他
*/
if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty()))
{
return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql());
}
//如果没有路由计算单元,设置一个
if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) {
RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit);
}
SortedSet<RouteResultsetNode> nodeSet = new TreeSet<RouteResultsetNode>();
boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema);
//对sql解析出来的每个路由计算单元进行路由计算,计算结果合并存在nodeSet
//nodeset是一个set会自动去掉重复元素
for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) {
RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool);
if(rrsTmp != null&&rrsTmp.getNodes()!=null) {
for(RouteResultsetNode node :rrsTmp.getNodes()) {
nodeSet.add(node);
}
}
if(isAllGlobalTable) {//都是全局表时只计算一遍路由
break;
}
}
RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()];
int i = 0;
for (RouteResultsetNode aNodeSet : nodeSet) {
nodes[i] = aNodeSet;
//如果是insert语句,并且只是单表,是注册过的表,并且是slot的sql,修改语句
if(statement instanceof MySqlInsertStatement &&ctx.getTables().size()==1&&schema.getTables().containsKey(ctx.getTables().get(0))) {
RuleConfig rule = schema.getTables().get(ctx.getTables().get(0)).getRule();
if(rule!=null&& rule.getRuleAlgorithm() instanceof SlotFunction){
//修改语句
aNodeSet.setStatement(ParseUtil.changeInsertAddSlot(aNodeSet.getStatement(),aNodeSet.getSlot()));
}
}
i++;
}
rrs.setNodes(nodes);
//分表
/**
* subTables="t_order$1-2,t_order3"
*目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。
*/
if(rrs.isDistTable()){
return this.routeDisTable(statement,rrs);
}
return rrs;
}
路由通过tryRouteForTables 计算获取,代码如下
/**
* 多表路由
*/
public static RouteResultset tryRouteForTables(SchemaConfig schema, DruidShardingParseInfo ctx,
RouteCalculateUnit routeUnit, RouteResultset rrs, boolean isSelect, LayerCachePool cachePool)
throws SQLNonTransientException {
List<String> tables = ctx.getTables();
if(schema.isNoSharding()||(tables.size() >= 1&&isNoSharding(schema,tables.get(0)))) {
return routeToSingleNode(rrs, schema.getDataNode(), ctx.getSql());
}
//只有一个表的
if(tables.size() == 1) {
return RouterUtil.tryRouteForOneTable(schema, ctx, routeUnit, tables.get(0), rrs, isSelect, cachePool);
}
Set<String> retNodesSet = new HashSet<String>();
//每个表对应的路由映射
Map<String,Set<String>> tablesRouteMap = new HashMap<String,Set<String>>();
//分库解析信息不为空
Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions = routeUnit.getTablesAndConditions();
if(tablesAndConditions != null && tablesAndConditions.size() > 0) {
//为分库表找路由
RouterUtil.findRouteWithcConditionsForTables(schema, rrs, tablesAndConditions, tablesRouteMap, ctx.getSql(), cachePool, isSelect);
if(rrs.isFinishedRoute()) {
return rrs;
}
}
//为全局表和单库表找路由
for(String tableName : tables) {
TableConfig tableConfig = schema.getTables().get(tableName.toUpperCase());
if(tableConfig == null) {
//add 如果表读取不到则先将表名从别名中读取转化后再读取
String alias = ctx.getTableAliasMap().get(tableName);
if(!StringUtil.isEmpty(alias)){
tableConfig = schema.getTables().get(alias.toUpperCase());
}
if(tableConfig == null){
String msg = "can't find table define in schema "+ tableName + " schema:" + schema.getName();
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
}
if(tableConfig.isGlobalTable()) {//全局表
if(tablesRouteMap.get(tableName) == null) {
tablesRouteMap.put(tableName, new HashSet<String>());
}
tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
} else if(tablesRouteMap.get(tableName) == null) { //余下的表都是单库表
tablesRouteMap.put(tableName, new HashSet<String>());
tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
}
}
boolean isFirstAdd = true;
for(Map.Entry<String, Set<String>> entry : tablesRouteMap.entrySet()) {
if(entry.getValue() == null || entry.getValue().size() == 0) {
throw new SQLNonTransientException("parent key can't find any valid datanode ");
} else {
if(isFirstAdd) {
retNodesSet.addAll(entry.getValue());
isFirstAdd = false;
} else {
retNodesSet.retainAll(entry.getValue());
if(retNodesSet.size() == 0) {//两个表的路由无交集
String errMsg = "invalid route in sql, multi tables found but datanode has no intersection "
+ " sql:" + ctx.getSql();
LOGGER.warn(errMsg);
throw new SQLNonTransientException(errMsg);
}
}
}
}
if(retNodesSet != null && retNodesSet.size() > 0) {
String tableName = tables.get(0);
TableConfig tableConfig = schema.getTables().get(tableName.toUpperCase());
if(tableConfig.isDistTable()){
routeToDistTableNode(tableName,schema, rrs, ctx.getSql(), tablesAndConditions, cachePool, isSelect);
return rrs;
}
if(retNodesSet.size() > 1 && isAllGlobalTable(ctx, schema)) {
// mulit routes ,not cache route result
if (isSelect) {
rrs.setCacheAble(false);
routeToSingleNode(rrs, retNodesSet.iterator().next(), ctx.getSql());
}
else {//delete 删除全局表的记录
routeToMultiNode(isSelect, rrs, retNodesSet, ctx.getSql(),true);
}
} else {
routeToMultiNode(isSelect, rrs, retNodesSet, ctx.getSql());
}
}
return rrs;
}
对sql解析出来的每个路由计算单元进行路由计算
路由计算单元
类:DefaultDruidParser
/**
conditionList 是从sql中提取的查询条件包含信息<表名,字段名,值,操作符>, 如果values为多值 操作符为between,建构RangeValue。
一个sql语句中是用or来分割的,每个List<Condition>为一个or分割的条件list
**/
private List<RouteCalculateUnit> buildRouteCalculateUnits(SchemaStatVisitor visitor, List<List<Condition>> conditionList) {
List<RouteCalculateUnit> retList = new ArrayList<RouteCalculateUnit>();
//遍历condition ,找分片字段
for(int i = 0; i < conditionList.size(); i++) {
RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
for(Condition condition : conditionList.get(i)) {
List<Object> values = condition.getValues();
if(values.size() == 0) {
continue;
}
if(checkConditionValues(values)) {
String columnName = StringUtil.removeBackquote(condition.getColumn().getName().toUpperCase());
String tableName = StringUtil.removeBackquote(condition.getColumn().getTable().toUpperCase());
if(visitor.getAliasMap() != null && visitor.getAliasMap().get(tableName) != null
&& !visitor.getAliasMap().get(tableName).equals(tableName)) {
tableName = visitor.getAliasMap().get(tableName);
}
if(visitor.getAliasMap() != null && visitor.getAliasMap().get(StringUtil.removeBackquote(condition.getColumn().getTable().toUpperCase())) == null) {//子查询的别名条件忽略掉,不参数路由计算,否则后面找不到表
continue;
}
String operator = condition.getOperator();
//只处理between ,in和=3中操作符
if(operator.equals("between")) {
RangeValue rv = new RangeValue(values.get(0), values.get(1), RangeValue.EE);
routeCalculateUnit.addShardingExpr(tableName.toUpperCase(), columnName, rv);
} else if(operator.equals("=") || operator.toLowerCase().equals("in")){ //只处理=号和in操作符,其他忽略
routeCalculateUnit.addShardingExpr(tableName.toUpperCase(), columnName, values.toArray());
}
}
}
retList.add(routeCalculateUnit);
}
return retList;
}