Spark_Flink_Hadoopspark

浅谈Spark SQL语句解析与基于规则优化(RBO)

2019-11-08  本文已影响0人  LittleMagic

前言

近些年来,大数据领域“SQL化开发”的理念蔚然成风,这是因为SQL是一种通用、学习成本低的语言,并且还有较强的数据描述能力。不少大数据框架早已支持了SQL化开发,如Spark、Flink、Kafka等。

之前笔者操刀的多数Spark程序都是用传统的RDD API写的,Spark SQL用得很少,Flink也如是。最近抽空对这两门“SQL”做了一些了解,并且断断续续研究了Spark SQL的部分原理,了解到它的内部也是存在基于规则优化(Rule-based optimization, RBO)和基于代价优化(Cost-based optimization, CBO)的,与传统关系型数据库和大数据领域的有些组件(Hive/Presto等)异曲同工。

Spark SQL的核心是Catalyst,即它专属的查询解析和优化器。下面是Spark SQL原始论文中给出的Catalyst执行流程图。

可见主要分为语句解析、逻辑计划与优化、物理计划与优化、代码生成4个阶段,前3个阶段都由Catalyst负责。其中,逻辑计划的优化采用RBO的思路,物理计划的优化则采用CBO的思路。本文只来看RBO,顺便也介绍一下它之前的语句解析、逻辑计划过程,并不会具体到源码分析的级别。物理计划与CBO比起逻辑计划与RBO更加灵活和复杂,等忙过这一阵之后择期再写。

SQL语句解析

Catalyst使用开源的语法分析器Antlr解析SQL语句,并生成未解析的逻辑计划(Unresolved Logical Plan),对应到源码中的类为SqlBaseLexer和SqlBaseParser。

未解析的逻辑计划其实就是一棵原生的抽象语法树(Abstract Syntax Tree, AST),只与语句本身有关,而与表的元数据没有任何关系。用以下基于TPC-H数据集的查询为例,在其Q3查询的基础上简化而来。TPC-H数据集的导入可以参见这篇文章

select avg(revenue) from (
  select l_extendedprice * (100 - 99 - l_discount) as revenue
  from tpch.customer c 
  join tpch.orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey 
  join tpch.lineitem l on l.l_orderkey = o.o_orderkey
  where o_orderdate <= '1995-03-17' and l_shipdate >= '1995-03-18'
) temp;

调用SparkSession.sql().explain(true)方法,查看执行计划。下面就是未解析的逻辑计划的全貌。

== Parsed Logical Plan ==
'Project [unresolvedalias('avg('revenue), None)]
+- 'SubqueryAlias temp
   +- 'Project [('l_extendedprice * ((100 - 99) - 'l_discount)) AS revenue#0]
      +- 'Filter (('o_orderdate <= 1995-03-17) && ('l_shipdate >= 1995-03-18))
         +- 'Join Inner, ('l.l_orderkey = 'o.o_orderkey)
            :- 'Join Inner, (('c.c_mktsegment = BUILDING) && ('c.c_custkey = 'o.o_custkey))
            :  :- 'SubqueryAlias c
            :  :  +- 'UnresolvedRelation `tpch`.`customer`
            :  +- 'SubqueryAlias o
            :     +- 'UnresolvedRelation `tpch`.`orders`
            +- 'SubqueryAlias l
               +- 'UnresolvedRelation `tpch`.`lineitem`

如果这样不容易阅读的话,我们手动将这棵抽象语法树画出来,就简明得多。别名逻辑操作符(SubqueryAlias)就不画了。

由上图可见,所有的表都用UnresolvedRelation来表示,也就是仅仅知道它们是表,而对其他信息(表的结构、数据类型、存储位置等等)都一无所知,Project、Filter等操作符中的列名对应的信息自然也是不清楚的。这些东西都要在生成逻辑计划的同时弄明白。

逻辑计划生成

逻辑计划的生成由Analyzer类来实现,它利用SessionCatalog(具体到这里就是Hive的Catalog,即元数据集合)将上一节AST中所有Unresolved的东西解析出来。解析完毕的逻辑计划如下所示。

== Analyzed Logical Plan ==
avg(revenue): double
Aggregate [avg(revenue#0) AS avg(revenue)#72]
+- SubqueryAlias temp
   +- Project [(l_extendedprice#60 * (cast((100 - 99) as double) - l_discount#61)) AS revenue#0]
      +- Filter ((cast(o_orderdate#50 as string) <= 1995-03-17) && (l_shipdate#65 >= 1995-03-18))
         +- Join Inner, (l_orderkey#55 = o_orderkey#46)
            :- Join Inner, ((c_mktsegment#44 = BUILDING) && (c_custkey#38 = o_custkey#47))
            :  :- SubqueryAlias c
            :  :  +- SubqueryAlias customer
            :  :     +- HiveTableRelation `tpch`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#38, c_name#39, c_address#40, c_nationkey#41, c_phone#42, c_acctbal#43, c_mktsegment#44, c_comment#45]
            :  +- SubqueryAlias o
            :     +- SubqueryAlias orders
            :        +- HiveTableRelation `tpch`.`orders`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [o_orderkey#46, o_custkey#47, o_orderstatus#48, o_totalprice#49, o_orderdate#50, o_orderpriority#51, o_clerk#52, o_shippriority#53, o_comment#54]
            +- SubqueryAlias l
               +- SubqueryAlias lineitem
                  +- HiveTableRelation `tpch`.`lineitem`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [l_orderkey#55, l_partkey#56, l_suppkey#57, l_linenumber#58, l_quantity#59, l_extendedprice#60, l_discount#61, l_tax#62, l_returnflag#63, l_linestatus#64, l_shipdate#65, l_commitdate#66, l_receiptdate#67, l_shipinstruct#68, l_shipmode#69, l_comment#70]

解析出来的东西有:

用树形结构表示如下图。

接下来就要靠RBO对这棵树进行优化了。

基于规则优化

所谓基于规则优化,就是指通过一系列预先定义好的规则(Rule)对逻辑计划进行等价转换,以提高查询效率。

RBO的两个主要思路是:减少参与计算的数据量、降低重复计算的代价。RBO相对于CBO而言要成熟得多,常用的规则都基于经验制定,可以覆盖大部分查询场景,并且方便扩展。其缺点则是不够灵活,毕竟这个阶段对物理上的特征(如表的底层存储形式和真正的数据量)还没有感知。

下面先列出文章开头的查询优化过的逻辑计划。

== Optimized Logical Plan ==
Aggregate [avg(revenue#0) AS avg(revenue)#72]
+- Project [(l_extendedprice#60 * (1.0 - l_discount#61)) AS revenue#0]
   +- Join Inner, (l_orderkey#55 = o_orderkey#46)
      :- Project [o_orderkey#46]
      :  +- Join Inner, (c_custkey#38 = o_custkey#47)
      :     :- Project [c_custkey#38]
      :     :  +- Filter ((isnotnull(c_mktsegment#44) && (c_mktsegment#44 = BUILDING)) && isnotnull(c_custkey#38))
      :     :     +- HiveTableRelation `tpch`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#38, c_name#39, c_address#40, c_nationkey#41, c_phone#42, c_acctbal#43, c_mktsegment#44, c_comment#45]
      :     +- Project [o_orderkey#46, o_custkey#47]
      :        +- Filter (((isnotnull(o_orderdate#50) && (cast(o_orderdate#50 as string) <= 1995-03-17)) && isnotnull(o_custkey#47)) && isnotnull(o_orderkey#46))
      :           +- HiveTableRelation `tpch`.`orders`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [o_orderkey#46, o_custkey#47, o_orderstatus#48, o_totalprice#49, o_orderdate#50, o_orderpriority#51, o_clerk#52, o_shippriority#53, o_comment#54]
      +- Project [l_orderkey#55, l_extendedprice#60, l_discount#61]
         +- Filter ((isnotnull(l_shipdate#65) && (l_shipdate#65 >= 1995-03-18)) && isnotnull(l_orderkey#55))
            +- HiveTableRelation `tpch`.`lineitem`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [l_orderkey#55, l_partkey#56, l_suppkey#57, l_linenumber#58, l_quantity#59, l_extendedprice#60, l_discount#61, l_tax#62, l_returnflag#63, l_linestatus#64, l_shipdate#65, l_commitdate#66, l_receiptdate#67, l_shipinstruct#68, l_shipmode#69, l_comment#70]

优化过的逻辑计划与原本的逻辑计划相比有了很大变化。为了对比清晰,将两棵树都画在下面了。

上面的图中包含了3种最常见也是最有效的RBO方式,分别简单阐述一下。英文名称是Spark SQL源码中的字段名称。

To be continued

又多了一个坑 _(:з」∠)_

并且Spark源码的专栏也好久没写了啊~

上一篇下一篇

猜你喜欢

热点阅读