spark||flink||scala

数据治理篇-元数据-血缘分析: queryparser概述

2020-06-07  本文已影响0人  larluo_罗浩

前言

  1. 数据字典 DataDictionary
  2. 数据血缘 DataLineage
  3. 元数据触发器 MetaTrigger

一. 血缘分析的导推形式

  1. sql
  2. kafka streams
  3. spark rdd
  4. flink datastream

二. 血缘分析的技术方案分析。

  1. 通过调度器反向推导血缘关系。
  2. 通过计算引擎系统提供的血缘分析接口进行收集。
  3. 通过计算引擎系统的解析过程源码进行提取
  4. 通用的sql解析器工具

三. queryparser内功心法.

  1. 语法解析过程
  2. 信息传播过程
  3. 特定的计算模型
  4. 如何新增新的数据库方言。

前言

最近在元数据的项目建设中,主要涉及了三方面的基础工作分析。

一. 血缘分析的导推形式

血缘分析在这里主要是我们前文所说的DagConduit的推导过程。这个推导过程很大依赖于Datapipe。

因为元数据是个通用平台,可推导的范围还是非常多的,我们这里仅从实用角度考虑并介绍。

常见的etl过程主要分为sql, kafka streams, spark rdd, flink datastream,这几个。

sql是最常用的Datapipe,但是其变种也是最多的。
kafka streams, spark rdd, flink datastream则是另一种形式,会生成相应的topology相关的lineage,由于灵活度不高,可以使用对应的库进行提取。

这里我们重点关注sql血缘分析,因为sql这种dsl是最有表达力,最符合元数据模型的。
甚至kafka, spark, flink也绝大多数会提供sql接口。

由于本人大多时间关注于sql推导模型,未对其它模型做过多探索,将于后续交流及使用后补充相关内容。

二. 血缘分析的技术方案分析。

我们常见的血缘分析实现方式主要分为四种:

  1. 通用的sql解析器工具
    由于前面的问题,所以大家都想要一套通用的sql解析工具去专门解决sql分析的问题。但是这个里面涉及一个非常严重的问题,就是成熟度。由于sql解析过程有非常大的工作量,需要对语法规则非常熟练,写起来繁杂的工作量及系统多样性很容易让人退缩。所以市面上一直找不到成熟的通用的sql解析器。

    所以这里面就回归到了我们的主角,queryparser由此而生。queryparser是uber公司开发的,成熟的多年应用到了生产系统。目前支持hive, queryparser, vertica, teradata。对于pg系统,有由专用的gpu数据库公司SQream员工开发的解析器hssqlppp。这两套系统采用通用的解析器内核parsec。

    那么在这里放出地址给各位:

    这个queryparser强大在哪里呢?
    一看吓死人,核心代码只要7000多行。而sql结构定义就占了3000多行,也就是说逻辑只有4000行左右,擅抖吧,奥利给!

[nix-shell:~/my-repo/queryparser/src]$ cloc .
      20 text files.
      20 unique files.                              
       0 files ignored.

github.com/AlDanial/cloc v 1.84  T=0.08 s (251.9 files/s, 121135.2 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Haskell                         20           1724            562           7332
-------------------------------------------------------------------------------
SUM:                            20           1724            562           7332
-------------------------------------------------------------------------------

[nix-shell:~/my-repo/queryparser/src]$ find . -name "*.hs" | xargs -I {} wc -l {}  | sort -nr
2334 ./Database/Sql/Type/Query.hs
1125 ./Database/Sql/Type.hs
836 ./Database/Sql/Util/Scope.hs
647 ./Database/Sql/Type/Names.hs
641 ./Database/Sql/Type/Schema.hs
505 ./Database/Sql/Util/Columns.hs
438 ./Database/Sql/Util/Tables.hs
403 ./Database/Sql/Util/Eval.hs
389 ./Database/Sql/Info.hs
383 ./Database/Sql/Util/Joins.hs
372 ./Database/Sql/Util/Lineage/ColumnPlus.hs
372 ./Database/Sql/Type/Scope.hs
361 ./Database/Sql/Util/Schema.hs
216 ./Database/Sql/Util/Eval/Concrete.hs
159 ./Database/Sql/Util/Lineage/Table.hs
147 ./Database/Sql/Type/TableProps.hs
109 ./Database/Sql/Position.hs
66 ./Database/Sql/Pretty.hs
64 ./Database/Sql/Helpers.hs
51 ./Database/Sql/Type/Unused.hs

三. queryparser内功心法.

平复一下内心的平静,让我们来了解一上queryparser到底是怎么实现的。

queryparser包含了核心逻辑的实现(大部分就是标准sql逻辑),每个方言有各自特写的扩展,我们这里以queryparser-hive为例。

queryparser-hive将一些特定sql的功能处理掉,核心功能交给 queryparser来处理。由于我们接触的大部分是标准sql,我们在这里分析标准的sql语句,在后续源码篇里面我们会进一步介绍详细介绍queryparser-hive与queryparser的整体过程。

整个解析过程分为三个步骤:

  1. parseManyAll 语法解析过程
    将文本打碎(词法分析)并组装成结构化(语法分析)
  2. resolveHiveStatement 信息传播过程
    遍历数据结构,进行外部的catalog关联,以及相应信息的传递,属于通用的信息加工逻辑阶段。
  3. getColumnLineage 特定的计算模型
    计算模型分为很多种,有各自的需求定义。在提供的接口中,用户实现特定的逻辑完成所需的功能。
    queryparser提供了两种模型,一种是字段血缘模型,一种是数据计算模型。当然我们可以随意实现接口定制自己的模型。
    比如通过实现不同的sql逻辑处理方法将sql语句转换成对应的http restful请求。
    一个类似的功能可参照:
SELECT status, content_type, content::json->>'data' AS data
  FROM http_patch('http://httpbin.org/patch', '{"this":"that"}', 'application/json');

https://github.com/pramsey/pgsql-http

1. 语法解析过程

语法解析常用的实现分为两种:
一种是parser generator,就是你写好规则,自动帮你生成解析代码,比如常见的antlr就是。这种方式的特写是性能高,效率高。缺点则是定制化扩展弱,异常信息非常难读懂。
另一种是parser combinator,这种就是你自己手写语法规则,非常灵活,效率会低一点,可控性比较强。

queryparser就是使用了parser combinator方式进行解析,所以我们可以自由定制。

语法解析的目地是生成AST,简单来说就是一个递归的数据结构,有点类似json。在语法解析过程中要做的工作就是,定义数据结构,然后把信息塞进去。其实难度并不大,但是由于sql规则的复杂度,整个工作量并不轻松。

语法规则的过程分为两部分:词法解析及语法解析。
为什么需要词法解析呢?因为sql跟平常我们所说的csv文件并不一样,支持可以使用分隔符来定义结构。有些东西在不同的环境下有不同的意义。比如任何事物在注释里面它就没有效果了,里面可能还有变量声明,关键字信息等。

所以词法分析器就是按sql的基本单元进行拆分,然后语法分析器将其组装起来形成递归的结构体。为了便于理解,我们给出实际代码里面的词法结构及逻辑片段。

https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Token.hs

data Token = TokWord !Bool !Text
           | TokString !ByteString
           | TokNumber !Text
           | TokSymbol !Text
           | TokVariable !Text VariableName  -- the Text is for namespace, the
                                             -- Token is the param name which
                                             -- may be another TokVariable!
           | TokError !String
             deriving (Show, Eq)

可以看出,hive的基本单元分为了TokWord(通用名称), TokString(字符), TokNumber(数字), TokSymbol(符号), TokVariable(变量)

这个解析的过程叫做scanner,就是常用的字符处理生成Token流。
https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Scanner.hs

有了Token流,我们进行组合生成AST。
那么我们可以看HIVE AST的结构定义:
https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Type.hs

data HiveStatement r a = HiveStandardSqlStatement (Statement Hive r a)
                         | HiveUseStmt (Use a)
                         | HiveAnalyzeStmt (Analyze r a)
                         | HiveInsertDirectoryStmt (InsertDirectory r a)
                         | HiveTruncatePartitionStmt (TruncatePartition r a)
                         | HiveAlterTableSetLocationStmt (AlterTableSetLocation r a)
                         | HiveAlterPartitionSetLocationStmt (AlterPartitionSetLocation r a)
                         | HiveSetPropertyStmt (SetProperty a)
                         | HiveUnhandledStatement a

然后hive的HiveStandardSqlStatement包包含的Statement定义在queryparser核心结构里面,由于sql结构定义query部分比较多,单独拎出来形成了两个文件:

https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type.hs

data Statement
    d -- sql dialect
    r -- resolution level (raw or resolved)
    a -- per-node parameters - typically Range or ()
        = QueryStmt (Query r a)
        | InsertStmt (Insert r a)
        | UpdateStmt (Update r a)
        | DeleteStmt (Delete r a)
        | TruncateStmt (Truncate r a)
        | CreateTableStmt (CreateTable d r a)
        | AlterTableStmt (AlterTable r a)
        | DropTableStmt (DropTable r a)
        | CreateViewStmt (CreateView r a)
        | DropViewStmt (DropView r a)
        | CreateSchemaStmt (CreateSchema r a)
        | GrantStmt (Grant a)
        | RevokeStmt (Revoke a)
        | BeginStmt a
        | CommitStmt a
        | RollbackStmt a
        | ExplainStmt a (Statement d r a)
        | EmptyStmt a

https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type/Query.hs

data Query r a
    = QuerySelect a (Select r a)
    | QueryExcept a (ComposedQueryColumns r a) (Query r a) (Query r a)
    | QueryUnion a Distinct (ComposedQueryColumns r a) (Query r a) (Query r a)
    | QueryIntersect a (ComposedQueryColumns r a) (Query r a) (Query r a)
    | QueryWith a [CTE r a] (Query r a)
    | QueryOrder a [Order r a] (Query r a)
    | QueryLimit a (Limit a) (Query r a)
    | QueryOffset a (Offset a) (Query r a)

接下来就是把token组装起来。
由于每个方言组装逻辑语法规则不一样,所以各自有自己的实现,核心接口并未提供。

https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Parser.hs

statementParser :: Parser (HiveStatement RawNames Range)
statementParser = do
    maybeStmt <- optionMaybe $ choice
        [ HiveUseStmt <$> useP
        , HiveAnalyzeStmt <$> analyzeP
        , do
              let options =
                    -- this list is hive-specific statement types that may be
                    -- preceded by an optional `WITH` and an optional inverted
                    -- `FROM`
                    [ (void insertDirectoryPrefixP, fmap HiveInsertDirectoryStmt . insertDirectoryP)
                    ]
                  prefixes = map fst options
                  baseParsers = map snd options
              _ <- try $ P.lookAhead $ optional withP >> invertedFromP >> choice prefixes
              with <- option id withP
              invertedFrom <- invertedFromP
              let parsers = map ($ (with, invertedFrom)) baseParsers
              choice $ parsers
        , try $ HiveTruncatePartitionStmt <$> truncatePartitionStatementP
        , HiveUnhandledStatement <$> describeP
        , HiveUnhandledStatement <$> showP
        , do
              _ <- try $ P.lookAhead createFunctionPrefixP
              HiveUnhandledStatement <$> createFunctionP
        , do
              _ <- try $ P.lookAhead dropFunctionPrefixP
              HiveUnhandledStatement <$> dropFunctionP
        , HiveStandardSqlStatement <$> statementP
        , try $ HiveAlterTableSetLocationStmt <$> alterTableSetLocationP
        , try $ HiveUnhandledStatement <$> alterTableSetTblPropertiesP
        , alterPartitionP
        , HiveSetPropertyStmt <$> setP
        , HiveUnhandledStatement <$> reloadFunctionP
        ]
    case maybeStmt of
        Just stmt -> terminator >> return stmt
        Nothing -> HiveStandardSqlStatement <$> emptyStatementP
  where
    terminator = (Tok.semicolonP <|> eof) -- normal statements may be terminated by `;` or eof
    emptyStatementP = EmptyStmt <$> Tok.semicolonP  -- but we don't allow eof here. `;` is the
    -- only way to write the empty statement, i.e. `` (empty string) is not allowed.

解析过程并不复杂,就是按结构体深度递归,跟json解析差不了多少。
整个过程就是,至上而下解析,解析一项,如果失败了则尝试其它项,深度递归下去。
体力活比较重,因为规则多,每一项都要写规则尝试匹配。
就这样,整个结构体就组装起来了。

2. 信息传播过程

其实queryparser的信息传播过程非常简单,就是单纯从catalog里面去查询表信息,关联到表生成的字段,然后将以前的表名转换成表信息,生成引用信息。

https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type/Scope.hs

具体内容我们可以看前后结果对比

data RawNames
deriving instance Data RawNames
instance Resolution RawNames where
    type TableRef RawNames = OQTableName
    type TableName RawNames = OQTableName
    type CreateTableName RawNames = OQTableName
    type DropTableName RawNames = OQTableName
    type SchemaName RawNames = OQSchemaName
    type CreateSchemaName RawNames = OQSchemaName
    type ColumnRef RawNames = OQColumnName
    type NaturalColumns RawNames = Unused
    type UsingColumn RawNames = UQColumnName
    type StarReferents RawNames = Unused
    type PositionExpr RawNames = Unused
    type ComposedQueryColumns RawNames = Unused

instance Resolution ResolvedNames where
    type TableRef ResolvedNames = RTableRef
    type TableName ResolvedNames = RTableName
    type CreateTableName ResolvedNames = RCreateTableName
    type DropTableName ResolvedNames = RDropTableName
    type SchemaName ResolvedNames = FQSchemaName
    type CreateSchemaName ResolvedNames = RCreateSchemaName
    type ColumnRef ResolvedNames = RColumnRef
    type NaturalColumns ResolvedNames = RNaturalColumns
    type UsingColumn ResolvedNames = RUsingColumn
    type StarReferents ResolvedNames = StarColumnNames
    type PositionExpr ResolvedNames = Expr ResolvedNames
    type ComposedQueryColumns ResolvedNames = ColumnAliasList

data RTableName a = RTableName (FQTableName a) SchemaMember
    deriving (Generic, Data, Eq, Ord, Show, Functor, Foldable, Traversable)

data SchemaMember = SchemaMember
    { tableType :: TableType
    , persistence :: Persistence ()
    , columnsList :: [UQColumnName ()]
    , viewQuery :: Maybe (Query ResolvedNames ())  -- this will always be Nothing for tables
    } deriving (Generic, Data, Eq, Ord, Show)


可以看到,sql解析之前表名,列名信息就是简单的字符OQTableName, UQColumnName。
解析之后就映射成了实际的各种形式。resolve出的RTableName就包含了从catalog里面查询出来的表的所有列员。
这个信息可以用来解析insert into时不带字段名,以及select *时的一些模糊逻辑。
当然本人觉得如果sql够标准的话,表的字段信息是完成可以从传播过程中自动推导的,而不用依赖于外部提供的catalog模式信息,这个也是本人正在尝试加强信息传播过程中优化的方向之一。

整个resolve过程代码:
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Scope.hs

由于是概述篇,就不展开讲述,后续将在源码篇里详细介绍。

3. 特定的计算模型

有了前面两部分过程之后,我们的结构体经过遍历关联处理有了更详细的信息。
最后一步则是通过这个结构体去做对应的计算。
因为整个遍历过程跟处理过程,相似度比较高。
所以queryparser提供了一个标准计算模型,应用则可以定制化特定部分进行逻辑处理, 只需要编写对应的函数,不需要自己去编写整个过程。
当然如果你有兴趣,自己定义重写或者扩展一套计算模型也是非常方便的。
我们可以看一下这个计算逻辑的定义:

class (Monad (EvalRow e), Monad (EvalMonad e), Traversable (EvalRow e)) => Evaluation e where
    type EvalValue e :: *
    type EvalRow e :: * -> *
    type EvalMonad e :: * -> *
    addItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
    removeItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
    unionItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
    intersectItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
    distinctItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e]
    offsetItems :: Proxy e -> Int -> RecordSet e -> RecordSet e
    limitItems :: Proxy e -> Int -> RecordSet e -> RecordSet e
    filterBy :: Expr ResolvedNames Range -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
    inList :: EvalValue e -> [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    inSubquery :: EvalValue e -> EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    existsSubquery :: EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    atTimeZone :: EvalValue e -> EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleConstant :: Proxy e -> Constant a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleCases :: Proxy e -> [(Expr ResolvedNames Range, Expr ResolvedNames Range)] -> Maybe (Expr ResolvedNames Range) -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleFunction :: Proxy e -> FunctionName Range -> Distinct -> [Expr ResolvedNames Range] -> [(ParamName Range, Expr ResolvedNames Range)] -> Maybe (Filter ResolvedNames Range) -> Maybe (OverSubExpr ResolvedNames Range) -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleGroups ::  [RColumnRef ()] -> EvalRow e ([EvalValue e], EvalRow e [EvalValue e]) -> EvalRow e (RecordSet e)
    handleLike :: Proxy e -> Operator a -> Maybe (Escape ResolvedNames Range) -> Pattern ResolvedNames Range -> Expr ResolvedNames Range -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleOrder :: Proxy e -> [Order ResolvedNames Range] -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
    handleSubquery :: EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleJoin :: Proxy e -> JoinType a -> JoinCondition ResolvedNames Range -> RecordSet e -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
    handleStructField :: Expr ResolvedNames Range -> StructFieldName a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    handleTypeCast :: CastFailureAction -> Expr ResolvedNames Range -> DataType a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    binop :: Proxy e -> TL.Text -> Maybe (EvalValue e -> EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e))
    unop :: Proxy e -> TL.Text -> Maybe (EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e))

queryparser定义了两种计算模型:

https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Eval/Concrete.hs

这里面的细节由于跟本文关联不大,就不一一介绍了,如有兴趣,可以源码篇作更进一步介绍。

字段血缘模型就是用得比较多的了,简单来讲呢,依旧是深度遍历到列级别,然后组装起来。

我们可以简单看看计算模型的组装过程分为哪些。
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Eval.hs

最基础的内容是常量与(列)表达式,表结构

instance Evaluation e => Evaluate e (Constant a) where
    type EvalResult e (Constant a) = EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    eval p constant = handleConstant p constant

instance Evaluation e => Evaluate e (Expr ResolvedNames Range) where
    type EvalResult e (Expr ResolvedNames Range) = EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
...
    eval _ (ColumnExpr _ col) = do
        row <- asks evalRow
        case M.lookup (void col) row of
            Just x -> pure x
            Nothing -> throwError $ "failure looking up column: " ++ show (void col) ++ " in " ++ show (M.keys row)
...

instance Evaluation e => Evaluate e (Tablish ResolvedNames Range) where
    type EvalResult e (Tablish ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
...
    eval _ (TablishTable _ _ (RTableRef tableName table)) = asks evalFromTable <*> pure (RTableName tableName table) >>= \case
        Nothing -> throwError $ "missing table: " ++ show (void tableName)
        Just result -> pure result
    eval _ (TablishTable _ _ (RTableAlias (TableAlias _ aliasName alias))) = asks (M.lookup alias . evalAliasMap) >>= \case
        Nothing -> throwError $ "missing table alias: " ++ show aliasName
        Just result -> pure result
...

表结构向上传播到TablishJoin , TablishLateralView , SelectFrom

列表达式通过表达式一层层往上计算。
然后到达SelectWhere, JoinCondition, SelectTimeseries , SelectGroup, SelectHaving

然后组合形成了基本的select语句

instance Evaluation e => Evaluate e (Select ResolvedNames Range) where
    type EvalResult e (Select ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
    eval p Select{..} = do
        -- nb. if we handle named windows at resolution time (T637160)
        -- then we shouldn't need to do anything with them here
        unfiltered <- maybe (pure $ emptyRecordSet p) (eval p) selectFrom
        filtered <- maybe pure (eval p) selectWhere unfiltered
        interpolated <- maybe pure (eval p) selectTimeseries filtered
        groups <- maybe (const $ pure . pure) (eval p) selectGroup selectCols interpolated
        having <- maybe pure (eval p) selectHaving groups
        records <- mapM (eval p selectCols) having
        let rows = recordSetItems =<< records
            labels = map void $ selectionNames =<< selectColumnsList selectCols
            indistinct = makeRecordSet p labels rows

        pure $ case selectDistinct of
             Distinct True -> indistinct { recordSetItems = distinctItems p $ recordSetItems indistinct }
             Distinct False -> indistinct

然后加上CTE功能就形成了完整的Query,然后在SelectFrom里面可以递归到这个Query进行递归组合。

instance Evaluation e => Evaluate e (Query ResolvedNames Range) where
    type EvalResult e (Query ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
    eval p (QuerySelect _ select) = eval p select
    eval p (QueryExcept _ (ColumnAliasList cs) lhs rhs) = do
        exclude <- recordSetItems <$> eval p rhs
        RecordSet{recordSetItems = unfiltered, ..} <- eval p lhs
        let labels = map (RColumnAlias . void) cs
        makeRecordSet p labels <$> removeItems p exclude unfiltered

    eval p (QueryUnion _ (Distinct False) (ColumnAliasList cs) lhs rhs) = do
        RecordSet{recordSetItems = lhsRows, ..} <- eval p lhs
        RecordSet{recordSetItems = rhsRows} <- eval p rhs
        let labels = map (RColumnAlias . void) cs
        makeRecordSet p labels <$> unionItems p lhsRows rhsRows

    eval p (QueryUnion info (Distinct True) cs lhs rhs) = do
        result@RecordSet{recordSetItems} <- eval p (QueryUnion info (Distinct False) cs lhs rhs)
        pure $ result{recordSetItems = distinctItems p recordSetItems}

    eval p (QueryIntersect _ (ColumnAliasList cs) lhs rhs) = do
        RecordSet{recordSetItems = litems, ..} <- eval p lhs
        ritems <- recordSetItems <$> eval p rhs
        let labels = map (RColumnAlias . void) cs
        makeRecordSet p labels <$> intersectItems p litems ritems

    eval p (QueryWith _ [] query) = eval p query
    eval p (QueryWith info (CTE{..}:ctes) query) = do
        RecordSet{..} <- eval p cteQuery
        columns <- override cteColumns recordSetLabels
        let result = makeRecordSet p columns recordSetItems
        introduceAlias p (void cteAlias) result $ eval p $ QueryWith info ctes query
      where
        override [] ys = pure ys
        override (alias:xs) (_:ys) = do
            ys' <- override xs ys
            pure $ (RColumnAlias $ void alias) : ys'
        override _ [] = throwError "more aliases than columns in CTE"

    eval p (QueryLimit _ limit query) = eval p limit <$> eval p query
    eval p (QueryOffset _ offset query) = eval p offset <$> eval p query
    eval p (QueryOrder _ orders query) = eval p query >>= handleOrder p orders

当然这个过程我们会在源码篇详细讲解,对于普通读者,可以了解一下大致的计算过程。
整个过程有个EvalContext,
evalAliasMap里面会记录表别名对应的信息,
evalFromTable里面会记录表对应的信息,
evalRow里面会对应字段对应的信息

data EvalContext e = EvalContext
    { evalAliasMap :: Map TableAliasId (RecordSet e)
    , evalFromTable :: RTableName Range -> Maybe (RecordSet e)
    , evalRow :: Map (RColumnRef ()) (EvalValue e)
    }

data RecordSet e = RecordSet
    { recordSetLabels :: [RColumnRef ()]
    , recordSetItems :: EvalRow e [EvalValue e]
    }

这个信息在字段血缘分析的过程中表体表现为ColumnPlusSet:

instance Evaluation ColumnLineage where
    type EvalValue ColumnLineage = ColumnPlusSet
    type EvalRow ColumnLineage = Writer ColumnPlusSet
    type EvalMonad ColumnLineage = Identity

data ColumnPlusSet = ColumnPlusSet
    { columnPlusColumns :: Map FQCN (Map FieldChain (Set Range))
    , columnPlusTables :: Map FQTN (Set Range)
    } deriving (Eq, Show)

ColumnPlusSet的主要信息就是相关的列信息与表信息。

type ColumnLineagePlus = Map (Either FQTN FQCN) ColumnPlusSet

最终解析出来的结果有两种形式:
对于每个表FQTN, 有对应的ColumnPlusSet(表依赖集及字段依赖集)
对于表的每个字段FQCN,有对应的ColumnPlusSet(表依赖集及字段依赖集)

有人可能奇怪了,在什么时候,字段会依赖于表?
这个在select count(1) as cnt from b的时候cnt就是依赖于表的。当然其它情况,我会进一步整理后补充。

4. 如何新增新的数据库方言。

我们前面讲过,整个血缘分析分为三部分。
第一部分sql解析的这个词法解析及语法解析比较依赖于sql的语法规则,所以需要手写,难度并不大,只是工作量比较大。比如支持存储过程之类的也是语法规则问题。
第二部分信息传播,基本上是通用的。对于除标准sql以外的信息,我们大部分不需要处理,特定的需要处理的工作量也非常少,可以简单抄抄改改。比如支持存储过程之类的大部分也是sql逻辑处理后组合起来。
第三部分计算模型,也基本上是通用的,由于对于血缘分析来说,所涉及到关注面会更少,所需要改动是最小的。。。

5. SQL解析潮流从此开始

所以我们可以看到,queryparser仅仅就干好了一件事情,就几千行代码,并且干得简单。扩展性也很强。
当然我们也注意到了,它的整个列信息推理过程比较原始,直接查询了catalog,但是也是非常方便后续扩展的。

它的计算模型对比其它解析器是非常大的一个亮点,你可以基于各种方言写支持各种生态的扩展。只需要将对应方言规则的sql解析后,实现对应计算模型里的方法即可,给了我们相当大的想象力。
你可以配置sql去做http restful递归请求,配置sql去做etl数据加载。。。

上一篇下一篇

猜你喜欢

热点阅读