MySQL

数据库中间件 Sharding-JDBC 源码分析 —— SQL

2019-07-29  本文已影响11人  habit_learning

1. 概述

上篇文章《词法解析》分享了词法解析器 Lexer 是如何解析 SQL 里的词法。本文分享 SQL 解析引擎是如何解析与理解 SQL 的。因为本文建立在《词法解析》之上,你需要阅读它后再开始这段旅程。

Parser 有两个组件:

SQLParsingEngine 调用 SQLParserFactory 生成 SQLParser,SQLParser 调用 LexerEngine(封装了 Lexer) 解析 SQL 词法。

2. SQLParsingEngine

SQLParsingEngine,SQL 解析引擎。其parse()方法作为 SQL 解析入口,本身不带复杂逻辑,通过调用 SQL 对应的 SQLParser 进行 SQL 解析。
核心代码如下:

// SQLParsingEngine.java
public SQLStatement parse() {
        LexerEngine lexerEngine = LexerEngineFactory.newInstance(dbType, sql);
        lexerEngine.nextToken();
        return SQLParserFactory.newInstance(dbType, lexerEngine.getCurrentToken().getType(), shardingRule, lexerEngine).parse();
    }

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class LexerEngineFactory {
    
    /**
     * Create lexical analysis engine instance.
     * 
     * @param dbType database type
     * @param sql SQL
     * @return lexical analysis engine instance
     */
    public static LexerEngine newInstance(final DatabaseType dbType, final String sql) {
        switch (dbType) {
            case H2:
            case MySQL:
                return new LexerEngine(new MySQLLexer(sql));
            case Oracle:
                return new LexerEngine(new OracleLexer(sql));
            case SQLServer:
                return new LexerEngine(new SQLServerLexer(sql));
            case PostgreSQL:
                return new LexerEngine(new PostgreSQLLexer(sql));
            default:
                throw new UnsupportedOperationException(String.format("Cannot support database [%s].", dbType));
        }
    }
}

主要流程为:

  1. 根据 db 类型和 sql 语句,生成对应的Lexer,并作为创建LexerEngine的构造参数。目前支持的 db 类型为 H2、MySQL、Oracle、SQLServer、PostgreSQL。
  2. 调用lexerEngine.nextToken()方法,生成第一个 Token。以查询语句为例,第一个 Token 的词法字面量为“select”,其类型为DefaultKeyword#SELECT
  3. 根据第一个 Token 的类型,以及 db 类型,获取对应的 SQLParse,如MySQLSelectParser
// SQLParserFactory.java
 public static SQLParser newInstance(final DatabaseType dbType, final TokenType tokenType, final ShardingRule shardingRule, final LexerEngine lexerEngine) {
        if (!(tokenType instanceof DefaultKeyword)) {
            throw new SQLParsingUnsupportedException(tokenType);
        }
        switch ((DefaultKeyword) tokenType)  {
            case SELECT:
                return SelectParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case INSERT:
                return InsertParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case UPDATE:
                return UpdateParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case DELETE:
                return DeleteParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case CREATE:
                return CreateParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case ALTER:
                return AlterParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case DROP:
                return DropParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case TRUNCATE:
                return TruncateParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            case SET:
            case COMMIT:
            case ROLLBACK:
            case SAVEPOINT:
            case BEGIN:
                return TCLParserFactory.newInstance(dbType, shardingRule, lexerEngine);
            default:
                throw new SQLParsingUnsupportedException(lexerEngine.getCurrentToken().getType());
        }
    }

// SelectParserFactory.java
 public static AbstractSelectParser newInstance(final DatabaseType dbType, final ShardingRule shardingRule, final LexerEngine lexerEngine) {
        switch (dbType) {
            case H2:
            case MySQL:
                return new MySQLSelectParser(shardingRule, lexerEngine);
            case Oracle:
                return new OracleSelectParser(shardingRule, lexerEngine);
            case SQLServer:
                return new SQLServerSelectParser(shardingRule, lexerEngine);
            case PostgreSQL:
                return new PostgreSQLSelectParser(shardingRule, lexerEngine);
            default:
                throw new UnsupportedOperationException(String.format("Cannot support database [%s].", dbType));
        }
    }

最后,调用SQLParser#parse方法,对 SQL 进行解析。下面,我们就以 MySQL 的查询语句为例,探讨其解析流程。

3. 查询 SQL (MySQL) 解析流程

查询 SQL 解析主流程如下:


// AbstractSelectParser.java
public final SelectStatement parse() {
        SelectStatement result = parseInternal();
        if (result.containsSubQuery()) {
            result = result.mergeSubQueryStatement();
        }
        // TODO move to rewrite
        appendDerivedColumns(result);
        appendDerivedOrderBy(result);
        return result;
    }

3.1 SelectStatement

SelectStatement,查询语句解析结果对象。

public final class SelectStatement extends DQLStatement {
    
    // 是否是“*”
    private boolean containStar;
    // 最后一个查询项下一个 Token 的开始位置
    private int selectListLastPosition;
    // 最后一个分组项下一个 Token 的开始位置
    private int groupByLastPosition;
    // 查询项
    private final Set<SelectItem> items = new HashSet<>();
    // 分组项
    private final List<OrderItem> groupByItems = new LinkedList<>();
    // 排序项
    private final List<OrderItem> orderByItems = new LinkedList<>();
    // 分页信息
    private Limit limit;
}

3.2 AbstractSQLStatement

增删改查解析结果对象的抽象父类

public abstract class AbstractSQLStatement implements SQLStatement {
    
    // SQL 类型
    private final SQLType type;
    // 表名
    private final Tables tables = new Tables();
    // 过滤条件。只有对路由结果有影响的条件,才添加进数组
    private final Conditions conditions = new Conditions();
    // SQL标记对象
    private final List<SQLToken> sqlTokens = new LinkedList<>();
}

这里需要注意的是,conditions属性存放的是对路由结果有影响的条件,即分片键的过滤条件。

3.3 SQLToken

SQLToken,SQL标记对象接口,记录着标记对象的起始位置。下面都是它的实现类:

说明
GeneratedKeyToken 自增主键标记对象
TableToken 表标记对象
ItemsToken 选择项标记对象
OffsetToken 分页偏移量标记对象
OrderByToken 排序标记对象
RowCountToken 分页长度标记对象

3.4 解析流程分析

我们以 MySQL 的查询语句为例,直接看AbstractSelectParser#parseInternal()的源码:

  // AbstractSelectParser.java
  private SelectStatement parseInternal() {
        SelectStatement result = new SelectStatement();
        lexerEngine.nextToken();
        parseInternal(result);
        return result;
    }

    // MySQLSelectParser.java
    @Override
    protected void parseInternal(final SelectStatement selectStatement) {
        parseDistinct();
        parseSelectOption();
        parseSelectList(selectStatement, getItems());
        parseFrom(selectStatement);
        parseWhere(getShardingRule(), selectStatement, getItems());
        parseGroupBy(selectStatement);
        parseHaving();
        parseOrderBy(selectStatement);
        parseLimit(selectStatement);
        parseSelectRest();
    }

该方法调用lexerEngine对 SQL 进行词法解析,并生产SelectStatement对象。

这里有一点我们需要注意,SQLParser 并不是等 Lexer 解析完词法( Token ),再根据词法去理解 SQL。而是,在理解 SQL 的过程中,调用 Lexer 进行分词。

3.4.1 #parseDistinct()

解析 DISTINCT、DISTINCTROW 谓语。
核心代码DistinctClauseParser#parse

  /**
   * Parse distinct.
   */
  public final void parse() {
        lexerEngine.skipAll(DefaultKeyword.ALL);
        Collection<Keyword> distinctKeywords = new LinkedList<>();
        distinctKeywords.add(DefaultKeyword.DISTINCT);
        distinctKeywords.addAll(Arrays.asList(getSynonymousKeywordsForDistinct()));
        lexerEngine.unsupportedIfEqual(distinctKeywords.toArray(new Keyword[distinctKeywords.size()]));
    }

  public class MySQLDistinctClauseParser extends DistinctClauseParser {
    
    public MySQLDistinctClauseParser(final LexerEngine lexerEngine) {
        super(lexerEngine);
    }
    
    @Override
    protected Keyword[] getSynonymousKeywordsForDistinct() {
        return new Keyword[] {MySQLKeyword.DISTINCTROW};
    }
}

此处 DISTINCT 和 DISTINCT(字段) 不同,它是针对查询结果做去重,即整行重复。举个例子:

mysql> SELECT item_id, order_id FROM t_order_item;
+---------+----------+
| item_id | order_id |
+---------+----------+
| 1 | 1 |
| 1 | 1 |
+---------+----------+
2 rows in set (0.03 sec)
mysql> SELECT DISTINCT item_id, order_id FROM t_order_item;
+---------+----------+
| item_id | order_id |
+---------+----------+
| 1 | 1 |
+---------+----------+
1 rows in set (0.02 sec)

3.4.2 #parseSelectList()

将 SQL 查询字段 按照逗号( , )切割成多个选择项( SelectItem)。核心代码如下SelectListClauseParser#parse

public void parse(final SelectStatement selectStatement, final List<SelectItem> items) {
        do {
            selectStatement.getItems().add(parseSelectItem(selectStatement));
        } while (lexerEngine.skipIfEqual(Symbol.COMMA));
        selectStatement.setSelectListLastPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
        items.addAll(selectStatement.getItems());
    }

private SelectItem parseSelectItem(final SelectStatement selectStatement) {
        lexerEngine.skipIfEqual(getSkippedKeywordsBeforeSelectItem());
        SelectItem result;
        if (isRowNumberSelectItem()) {
           // 是否是 ROW_NUMBER 关键字(SQLServer 才有)
            result = parseRowNumberSelectItem(selectStatement);       
        } else if (isStarSelectItem()) {
           // 是否是全表查询“*”
            selectStatement.setContainStar(true);
            result = parseStarSelectItem();
        } else if (isAggregationSelectItem()) {
           // 聚合函数查询,如 SUM、AVG 等
            result = parseAggregationSelectItem(selectStatement);
            parseRestSelectItem(selectStatement);
        } else {
            // 普通查询
            result = new CommonSelectItem(SQLUtil.getExactlyValue(parseCommonSelectItem(selectStatement) + parseRestSelectItem(selectStatement)), aliasExpressionParser.parseSelectItemAlias());
        }
        return result;
    }

该方法会解析 select 字面量后面的查询选项,并赋值SelectStatement#items

3.4.3 #parseFrom()

解析表以及表连接关系。如 JOIN ON、子查询,解析过程中获得的表名存入AbstractSQLStatement#tables属性中,以及表对应的标识对象TableToken存入AbstractSQLStatement#sqlTokens属性中。
核心代码为TableReferencesClauseParser#parseTableFactor:

protected final void parseTableFactor(final SQLStatement sqlStatement, final boolean isSingleTableOnly) {
        final int beginPosition = lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length();
        String literals = lexerEngine.getCurrentToken().getLiterals();
        lexerEngine.nextToken();
        if (lexerEngine.equalAny(Symbol.DOT)) {
            throw new UnsupportedOperationException("Cannot support SQL for `schema.table`");
        }
        // 获取表名
        String tableName = SQLUtil.getExactlyValue(literals);
        if (Strings.isNullOrEmpty(tableName)) {
            return;
        }
        // 解析别名
        Optional<String> alias = aliasExpressionParser.parseTableAlias();
        if (isSingleTableOnly || shardingRule.tryFindTableRule(tableName).isPresent() || shardingRule.findBindingTableRule(tableName).isPresent()
                || shardingRule.getDataSourceMap().containsKey(shardingRule.getDefaultDataSourceName())) {
            sqlStatement.getSqlTokens().add(new TableToken(beginPosition, literals));
            sqlStatement.getTables().add(new Table(tableName, alias));
        }
        // 解析联表查询
        parseJoinTable(sqlStatement);
        if (isSingleTableOnly && !sqlStatement.getTables().isSingleTable()) {
            throw new UnsupportedOperationException("Cannot support Multiple-Table.");
        }
    }

3.4.4 #parseWhere()

解析 WHERE 条件。将对路由结果有影响的条件,即分片键的过滤条件,存入AbstractSQLStatement#conditions中。
核心代码为WhereClauseParser#parseComparisonCondition

private void parseComparisonCondition(final ShardingRule shardingRule, final SQLStatement sqlStatement, final List<SelectItem> items) {
        lexerEngine.skipIfEqual(Symbol.LEFT_PAREN);
        SQLExpression left = basicExpressionParser.parse(sqlStatement);
        if (lexerEngine.skipIfEqual(Symbol.EQ)) {
            // 解析 = 条件
            parseEqualCondition(shardingRule, sqlStatement, left);
            lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
            return;
        }
        if (lexerEngine.skipIfEqual(DefaultKeyword.IN)) {
            // 解析 in 条件
            parseInCondition(shardingRule, sqlStatement, left);
            lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
            return;
        }
        if (lexerEngine.skipIfEqual(DefaultKeyword.BETWEEN)) {
            // 解析 Between And 条件,即区间条件
            parseBetweenCondition(shardingRule, sqlStatement, left);
            lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
            return;
        }
        if (sqlStatement instanceof SelectStatement && isRowNumberCondition(items, left)) {
            // ROW_NUMBER 的查询语句(MySQL 没有)
            if (lexerEngine.skipIfEqual(Symbol.LT)) {
                parseRowCountCondition((SelectStatement) sqlStatement, false);
                return;
            }
            if (lexerEngine.skipIfEqual(Symbol.LT_EQ)) {
                parseRowCountCondition((SelectStatement) sqlStatement, true);
                return;
            }
            if (lexerEngine.skipIfEqual(Symbol.GT)) {
                parseOffsetCondition((SelectStatement) sqlStatement, false);
                return;
            }
            if (lexerEngine.skipIfEqual(Symbol.GT_EQ)) {
                parseOffsetCondition((SelectStatement) sqlStatement, true);
                return;
            }
        }
        // 其他条件查询,如<,<=,>,>=,!= 等
        List<Keyword> otherConditionOperators = new LinkedList<>(Arrays.asList(getCustomizedOtherConditionOperators()));
        otherConditionOperators.addAll(
                Arrays.asList(Symbol.LT, Symbol.LT_EQ, Symbol.GT, Symbol.GT_EQ, Symbol.LT_GT, Symbol.BANG_EQ, Symbol.BANG_GT, Symbol.BANG_LT, DefaultKeyword.LIKE, DefaultKeyword.IS));
        if (lexerEngine.skipIfEqual(otherConditionOperators.toArray(new Keyword[otherConditionOperators.size()]))) {
            lexerEngine.skipIfEqual(DefaultKeyword.NOT);
            parseOtherCondition(sqlStatement);
        }
        if (lexerEngine.skipIfEqual(DefaultKeyword.NOT)) {
            lexerEngine.nextToken();
            lexerEngine.skipIfEqual(Symbol.LEFT_PAREN);
            parseOtherCondition(sqlStatement);
            lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
        }
        lexerEngine.skipIfEqual(Symbol.RIGHT_PAREN);
    }

3.4.5 #parseGroupBy()

解析分组条件,实现上比较类似 #parseSelectList,会更加简单一些。
解析出来的分组信息存入SelectStatement#groupByItems属性中。
核心代码为GroupByClauseParser#parse:

 public final void parse(final SelectStatement selectStatement) {
        if (!lexerEngine.skipIfEqual(DefaultKeyword.GROUP)) {
            return;
        }
        lexerEngine.accept(DefaultKeyword.BY);
        while (true) {
            // 解析分组表达式,得到 OrderItem,并存入 SelectStatement#groupByItems 属性中
            addGroupByItem(basicExpressionParser.parse(selectStatement), selectStatement);
            if (!lexerEngine.equalAny(Symbol.COMMA)) {
                break;
            }
            lexerEngine.nextToken();
        }
        lexerEngine.skipAll(getSkippedKeywordAfterGroupBy());
        selectStatement.setGroupByLastPosition(lexerEngine.getCurrentToken().getEndPosition() - lexerEngine.getCurrentToken().getLiterals().length());
    }

3.4.6 #parseHaving()

目前 Sharding-JDBC 不支持 Having 条件。
核心代码为HavingClauseParser#parse

public void parse() {
        lexerEngine.unsupportedIfEqual(DefaultKeyword.HAVING);
    }
// lexerEngine.java
public void unsupportedIfEqual(final TokenType... tokenTypes) {
        if (equalAny(tokenTypes)) {
            throw new SQLParsingUnsupportedException(lexer.getCurrentToken().getType());
        }
    }

3.4.7 #parseOrderBy()

解析排序条件。实现逻辑类似 #parseGroupBy(),这里就跳过,有兴趣的同学可以去看看。

3.4.8 #parseLimit()

解析分页 Limit 条件。相对简单,这里就跳过,有兴趣的同学可以去看看。注意下,分成 3 种情况:

解析出来的分页信息存入SelectStatement#limit属性中。

public final class Limit {
    
    // 数据库类型
    private final DatabaseType databaseType;
    // offset
    private LimitValue offset;
    // row
    private LimitValue rowCount;
    
}

当分页位置为非占位符,即为数字时,会生成 OffsetToken 和 RowCountToken。

3.4.9 appendDerived 等方法

因为 Sharding-JDBC 对表做了分片,在 AVG , GROUP BY , ORDER BY 需要对 SQL 进行一些改写,以达到能在内存里对结果做进一步处理,例如求平均值、分组、排序等。

3.4.9.1 #appendAvgDerivedColumns()

解决 AVG 查询。
核心代码为AbstractSelectParser#appendAvgDerivedColumns:

private void appendAvgDerivedColumns(final ItemsToken itemsToken, final SelectStatement selectStatement) {
        int derivedColumnOffset = 0;
        for (SelectItem each : selectStatement.getItems()) {
            if (!(each instanceof AggregationSelectItem) || AggregationType.AVG != ((AggregationSelectItem) each).getType()) {
                continue;
            }
            AggregationSelectItem avgItem = (AggregationSelectItem) each;
            // COUNT 字段
            String countAlias = String.format(DERIVED_COUNT_ALIAS, derivedColumnOffset);
            AggregationSelectItem countItem = new AggregationSelectItem(AggregationType.COUNT, avgItem.getInnerExpression(), Optional.of(countAlias));
            // SUM 字段
            String sumAlias = String.format(DERIVED_SUM_ALIAS, derivedColumnOffset);
            AggregationSelectItem sumItem = new AggregationSelectItem(AggregationType.SUM, avgItem.getInnerExpression(), Optional.of(sumAlias));
            // AggregationSelectItem 设置
            avgItem.getDerivedAggregationSelectItems().add(countItem);
            avgItem.getDerivedAggregationSelectItems().add(sumItem);
            // TODO 将AVG列替换成常数,避免数据库再计算无用的AVG函数
            itemsToken.getItems().add(countItem.getExpression() + " AS " + countAlias + " ");
            itemsToken.getItems().add(sumItem.getExpression() + " AS " + sumAlias + " ");
            derivedColumnOffset++;
        }
    }

针对 AVG 聚合字段,增加推导字段,将 AVG 改写成 SUM 和 COUNT 查询,内存计算出 AVG = SUM / COUNT 结果。

3.4.9.2 #appendDerivedOrderColumns()

解决 GROUP BY , ORDER BY。
核心代码为AbstractSelectParser#appendDerivedOrderColumns:

private void appendDerivedOrderColumns(final ItemsToken itemsToken, final List<OrderItem> orderItems, final String aliasPattern, final SelectStatement selectStatement) {
        int derivedColumnOffset = 0;
        for (OrderItem each : orderItems) {
            if (!isContainsItem(each, selectStatement)) {
                String alias = String.format(aliasPattern, derivedColumnOffset++);
                each.setAlias(Optional.of(alias));
                itemsToken.getItems().add(each.getQualifiedName().get() + " AS " + alias + " ");
            }
        }
    }

private boolean isContainsItem(final OrderItem orderItem, final SelectStatement selectStatement) {
        if (selectStatement.isContainStar()) {
            return true;
        }
        for (SelectItem each : selectStatement.getItems()) {
            if (-1 != orderItem.getIndex()) {
                return true;
            }
            if (each.getAlias().isPresent() && orderItem.getAlias().isPresent() && each.getAlias().get().equalsIgnoreCase(orderItem.getAlias().get())) {
                return true;
            }
            if (!each.getAlias().isPresent() && orderItem.getQualifiedName().isPresent() && each.getExpression().equalsIgnoreCase(orderItem.getQualifiedName().get())) {
                return true;
            }
        }
        return false;
    }

针对 GROUP BY 或 ORDER BY 字段,增加推导字段。

如果该字段不在查询字段里,需要额外查询该字段,这样才能在内存里 GROUP BY 或 ORDER BY。

3.4.9.3 #appendDerivedOrderBy()

当无 Order By 条件时,使用 Group By 作为排序条件。
核心代码为AbstractSelectParser#appendDerivedOrderBy:

private void appendDerivedOrderBy(final SelectStatement selectStatement) {
        if (!selectStatement.getGroupByItems().isEmpty() && selectStatement.getOrderByItems().isEmpty()) {
            selectStatement.getOrderByItems().addAll(selectStatement.getGroupByItems());
            selectStatement.getSqlTokens().add(new OrderByToken(selectStatement.getGroupByLastPosition()));
        }
    }

3.4.10 ItemsToken

选择项标记对象,属于分片上下文信息,目前有 3 个情况会创建:

  1. AVG 查询额外 COUNT 和 SUM: #appendAvgDerivedColumns()
  2. GROUP BY 不在 查询字段,额外查询该字段 : #appendDerivedOrderColumns()
  3. ORDER BY 不在 查询字段,额外查询该字段 : #appendDerivedOrderColumns()
public final class ItemsToken implements SQLToken {
    /**
     * SQL 开始位置
     */
    private final int beginPosition;
    /**
     * 字段名数组
     */
    private final List<String> items = new LinkedList<>();
}

4. 结语

查询语句的 SQL 解析已经讲解完毕,其他的 INSERT,UPDATE,DELETE 就更简单了,感兴趣的同学可以自行去了解。那么,我们拿到 SQL 解析的结果SQLStatement,就可以进行下一步的路由操作了,于是下一篇,我们将讨论 Sharding-JDBC 的路由流程,尽请关注!

上一篇下一篇

猜你喜欢

热点阅读