Mycat路由

2018-05-06  本文已影响0人  tony_0c73

路由接口

io.mycat.route.RouteService
方法:

RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
            int sqlType, String stmt, String charset, ServerConnection sc)

计算流程概述

image.png image.png

conditions 为<表名,字段名,字段值> 3元组

单表计算流程


image.png

代码分析

AbstractRouteStrategy

    @Override
    public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
            String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
        //对应schema标签checkSQLschema属性,把表示schema的字符去掉
        if (schema.isCheckSQLSchema()) {
            origSQL = RouterUtil.removeSchema(origSQL, schema.getName());
        }

        /**
     * 处理一些路由之前的逻辑
     * 全局序列号,父子表插入
     */
        if ( beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
            return null;
        }

        /**
         * SQL 语句拦截
         */
        String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);
        if (!origSQL.equals(stmt) && LOGGER.isDebugEnabled()) {
            LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
        }
        RouteResultset rrs = new RouteResultset(stmt, sqlType);

        /**
         * 优化debug loaddata输出cache的日志会极大降低性能
         */
        if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
            rrs.setCacheAble(false);
        }
        /**
         * rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
         * select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
         */
        if (sc != null ) {
            rrs.setAutocommit(sc.isAutocommit());
        }
        /**
         * DDL 语句的路由
         */
        if (ServerParse.DDL == sqlType) {
            return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
        }

        /**
         * 检查是否有分片
         */
        if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
            rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
        } else {
            RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
            if (returnedSet == null) {
                rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
            }
        }
        return rrs;
    }

从上面代码获知路由计算是在方法 routeNormalSqlWithAST中,这个是个抽象方法由子类来实现,目前是DruidMycatRouteStrategy

DruidMycatRouteStrategy.routeNormalSqlWithAST 路由是通过以下方法进行的

/**
     *  直接结果路由
     */
    private RouteResultset directRoute(RouteResultset rrs,DruidShardingParseInfo ctx,SchemaConfig schema,
                                        DruidParser druidParser,SQLStatement statement,LayerCachePool cachePool) throws SQLNonTransientException{
        
        //改写sql:如insert语句主键自增长, 在直接结果路由的情况下,进行sql 改写处理
        druidParser.changeSql(schema, rrs, statement,cachePool);
        /**
         * DruidParser 解析过程中已完成了路由的直接返回
         */
        if ( rrs.isFinishedRoute() ) {
            return rrs;
        }
        /**
         * 没有from的select语句或其他
         */
        if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty()))
        {
            return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql());
        }
        //如果没有路由计算单元,设置一个
        if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) {
            RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
            druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit);
        }
        
        SortedSet<RouteResultsetNode> nodeSet = new TreeSet<RouteResultsetNode>();
        boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema);
        //对sql解析出来的每个路由计算单元进行路由计算,计算结果合并存在nodeSet
        //nodeset是一个set会自动去掉重复元素
        for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) {
            RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool);
            if(rrsTmp != null&&rrsTmp.getNodes()!=null) {
                for(RouteResultsetNode node :rrsTmp.getNodes()) {
                    nodeSet.add(node);
                }
            }
            if(isAllGlobalTable) {//都是全局表时只计算一遍路由
                break;
            }
        }
        RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()];
        int i = 0;
        for (RouteResultsetNode aNodeSet : nodeSet) {
            nodes[i] = aNodeSet;
             //如果是insert语句,并且只是单表,是注册过的表,并且是slot的sql,修改语句
              if(statement instanceof MySqlInsertStatement &&ctx.getTables().size()==1&&schema.getTables().containsKey(ctx.getTables().get(0))) {
                  RuleConfig rule = schema.getTables().get(ctx.getTables().get(0)).getRule();
                  if(rule!=null&&  rule.getRuleAlgorithm() instanceof SlotFunction){
                     //修改语句
                     aNodeSet.setStatement(ParseUtil.changeInsertAddSlot(aNodeSet.getStatement(),aNodeSet.getSlot()));
                  }
              }
            i++;
        }       
        rrs.setNodes(nodes);
        //分表
        /**
         *  subTables="t_order$1-2,t_order3"
         *目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。
         */
        if(rrs.isDistTable()){
            return this.routeDisTable(statement,rrs);
        }
        return rrs;
    }

路由通过tryRouteForTables 计算获取,代码如下

/**
     * 多表路由
     */
    public static RouteResultset tryRouteForTables(SchemaConfig schema, DruidShardingParseInfo ctx,
            RouteCalculateUnit routeUnit, RouteResultset rrs, boolean isSelect, LayerCachePool cachePool)
            throws SQLNonTransientException {
        List<String> tables = ctx.getTables();
        if(schema.isNoSharding()||(tables.size() >= 1&&isNoSharding(schema,tables.get(0)))) {
            return routeToSingleNode(rrs, schema.getDataNode(), ctx.getSql());
        }
        //只有一个表的
        if(tables.size() == 1) {
            return RouterUtil.tryRouteForOneTable(schema, ctx, routeUnit, tables.get(0), rrs, isSelect, cachePool);
        }
        Set<String> retNodesSet = new HashSet<String>();
        //每个表对应的路由映射
        Map<String,Set<String>> tablesRouteMap = new HashMap<String,Set<String>>();

        //分库解析信息不为空
        Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions = routeUnit.getTablesAndConditions();
        if(tablesAndConditions != null && tablesAndConditions.size() > 0) {
            //为分库表找路由
            RouterUtil.findRouteWithcConditionsForTables(schema, rrs, tablesAndConditions, tablesRouteMap, ctx.getSql(), cachePool, isSelect);
            if(rrs.isFinishedRoute()) {
                return rrs;
            }
        }
        //为全局表和单库表找路由
        for(String tableName : tables) {
            TableConfig tableConfig = schema.getTables().get(tableName.toUpperCase());
            if(tableConfig == null) {
                //add 如果表读取不到则先将表名从别名中读取转化后再读取
                String alias = ctx.getTableAliasMap().get(tableName);
                if(!StringUtil.isEmpty(alias)){
                    tableConfig = schema.getTables().get(alias.toUpperCase());
                }
                if(tableConfig == null){
                    String msg = "can't find table define in schema "+ tableName + " schema:" + schema.getName();
                    LOGGER.warn(msg);
                    throw new SQLNonTransientException(msg);
                }
                
            }
            if(tableConfig.isGlobalTable()) {//全局表
                if(tablesRouteMap.get(tableName) == null) {
                    tablesRouteMap.put(tableName, new HashSet<String>());
                }
                tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
            } else if(tablesRouteMap.get(tableName) == null) { //余下的表都是单库表
                tablesRouteMap.put(tableName, new HashSet<String>());
                tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
            }
        }

        boolean isFirstAdd = true;
        for(Map.Entry<String, Set<String>> entry : tablesRouteMap.entrySet()) {
            if(entry.getValue() == null || entry.getValue().size() == 0) {
                throw new SQLNonTransientException("parent key can't find any valid datanode ");
            } else {
                if(isFirstAdd) {
                    retNodesSet.addAll(entry.getValue());
                    isFirstAdd = false;
                } else {
                    retNodesSet.retainAll(entry.getValue());
                    if(retNodesSet.size() == 0) {//两个表的路由无交集
                        String errMsg = "invalid route in sql, multi tables found but datanode has no intersection "
                                + " sql:" + ctx.getSql();
                        LOGGER.warn(errMsg);
                        throw new SQLNonTransientException(errMsg);
                    }
                }
            }
        }

        if(retNodesSet != null && retNodesSet.size() > 0) {
            String tableName = tables.get(0);
            TableConfig tableConfig = schema.getTables().get(tableName.toUpperCase());
            if(tableConfig.isDistTable()){
                routeToDistTableNode(tableName,schema, rrs, ctx.getSql(), tablesAndConditions, cachePool, isSelect);
                return rrs;
            }

            if(retNodesSet.size() > 1 && isAllGlobalTable(ctx, schema)) {
                // mulit routes ,not cache route result
                if (isSelect) {
                    rrs.setCacheAble(false);
                    routeToSingleNode(rrs, retNodesSet.iterator().next(), ctx.getSql());
                }
                else {//delete 删除全局表的记录
                    routeToMultiNode(isSelect, rrs, retNodesSet, ctx.getSql(),true);
                }

            } else {
                routeToMultiNode(isSelect, rrs, retNodesSet, ctx.getSql());
            }
        }
        return rrs;
    }

对sql解析出来的每个路由计算单元进行路由计算

路由计算单元

类:DefaultDruidParser

/**
conditionList  是从sql中提取的查询条件包含信息<表名,字段名,值,操作符>, 如果values为多值 操作符为between,建构RangeValue。
一个sql语句中是用or来分割的,每个List<Condition>为一个or分割的条件list

**/
private List<RouteCalculateUnit> buildRouteCalculateUnits(SchemaStatVisitor visitor, List<List<Condition>> conditionList) {
        List<RouteCalculateUnit> retList = new ArrayList<RouteCalculateUnit>();
        //遍历condition ,找分片字段
        for(int i = 0; i < conditionList.size(); i++) {
            RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
            for(Condition condition : conditionList.get(i)) {
                List<Object> values = condition.getValues();
                if(values.size() == 0) {
                    continue;  
                }
                if(checkConditionValues(values)) {
                    String columnName = StringUtil.removeBackquote(condition.getColumn().getName().toUpperCase());
                    String tableName = StringUtil.removeBackquote(condition.getColumn().getTable().toUpperCase());
                    
                    if(visitor.getAliasMap() != null && visitor.getAliasMap().get(tableName) != null 
                            && !visitor.getAliasMap().get(tableName).equals(tableName)) {
                        tableName = visitor.getAliasMap().get(tableName);
                    }

                    if(visitor.getAliasMap() != null && visitor.getAliasMap().get(StringUtil.removeBackquote(condition.getColumn().getTable().toUpperCase())) == null) {//子查询的别名条件忽略掉,不参数路由计算,否则后面找不到表
                        continue;
                    }
                    
                    String operator = condition.getOperator();
                    
                    //只处理between ,in和=3中操作符
                    if(operator.equals("between")) {
                        RangeValue rv = new RangeValue(values.get(0), values.get(1), RangeValue.EE);
                                routeCalculateUnit.addShardingExpr(tableName.toUpperCase(), columnName, rv);
                    } else if(operator.equals("=") || operator.toLowerCase().equals("in")){ //只处理=号和in操作符,其他忽略
                                routeCalculateUnit.addShardingExpr(tableName.toUpperCase(), columnName, values.toArray());
                    }
                }
            }
            retList.add(routeCalculateUnit);
        }
        return retList;
    }
上一篇下一篇

猜你喜欢

热点阅读