大数据领域精选

【Spark 精选】SQL 执行计划详解

2023-07-05  本文已影响0人  熊本极客

1.Spark SQL 执行流程

image.png

2.案例分析

# stu 表
CREATE TABLE stu (
  id INT,
  name STRING,
  age INT
);
# stu 表中的数据
+------+--------+-------+
|id   |name   | age   |
+------+--------+-------+
|0     |John    |10    |
|1     |Mike    |11    |
|2     |Lisa    |12    |
+------+--------+-------+

# score 表
CREATE TABLE score (
  id INT,
  xueke STRING,
  score INT
);
# score 表中的数据
+------+--------+-------+
|id   |xueke   |score   |
+------+--------+-------+
|0     |Chinese    |80   |
|0     |Math       |100  |
|0     |English    |99   |
|1     |Chinese    |40   |
|1     |Math       |50   |
|1     |English    |60   |
|2     |Chinese    |70   |
|2     |Math       |80   |
|2     |English    |90   |
+------+--------+-------+

# 查看指定 sql 语句的 Parsed Logical Plan、Analyzed Logical Plan、Optimized Logical Plan 和 Physical Plan
EXPLAIN EXTENDED SELECT sum(v), name FROM 
  (SELECT stu.id, 100+10+score.score AS v, name FROM stu JOIN score 
   WHERE stu.id = score.id AND stu.age >= 11) AS tmp 
GROUP BY name;

== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias tmp
   +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
      +- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
         +- 'Join Inner
            :- 'UnresolvedRelation [stu], [], false
            +- 'UnresolvedRelation [score], [], false

== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- SubqueryAlias tmp
   +- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
      +- Filter ((id#103 = id#106) AND (age#105 >= 11))
         +- Join Inner
            :- SubqueryAlias spark_catalog.default.stu
            :  +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
            +- SubqueryAlias spark_catalog.default.score
               +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

== Optimized Logical Plan ==
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- Project [(110 + score#108) AS v#97, name#104]
   +- Join Inner, (id#103 = id#106)
      :- Project [id#103, name#104]
      :  +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
      :     +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
      +- Project [id#106, score#108]
         +- Filter isnotnull(id#106)
            +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
   +- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
      +- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
         +- Project [(110 + score#108) AS v#97, name#104]
            +- SortMergeJoin [id#103], [id#106], Inner
               :- Sort [id#103 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
               :     +- Project [id#103, name#104]
               :        +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
               :           +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
               +- Sort [id#106 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                     +- Filter isnotnull(id#106)
                        +- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

2.1 Parser 阶段

sql 语句如下所示,经过 Parser 解析后变成抽象语法树即 Parsed Logical Plan,其中叶子节点 UnresolvedRelation 表示该节点还没有被解析,而下一步在 Analyzer 阶段进行处理,并解析这些 UnresolvedRelation 节点。

== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias tmp
   +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
      +- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
         +- 'Join Inner
            :- 'UnresolvedRelation [stu], [], false
            +- 'UnresolvedRelation [score], [], false
image.JPG

上面树中的节点都是 LogicalPlan 类型的,即进行各种操作的 Operator,如下所示是部分 Operator

名称 功能描述
Project(projectList: Seq[NamedExpression], child: LogicalPlan) select 语句输出操作,其中 projectList 是输出对象,每个元素都是一个 expression
Filter(condition: Expression, child: LogicalPlan) 根据 condition 对 child 的输入 rows 进行过滤
Aggregate(groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) 对 child 输出 rows 进行 aggregate 操作,例如 groupby
Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression]) left 和 right 的输出结果进行 join 操作
Union(children: Seq[LogicalPlan]) 将 children 计算结果进行 Union 联合
Distinct(child: LogicalPlan) 对 child 输出 rows 取重操作
Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) 对 child 的输出进行 sort 排序
SubqueryAlias(alias: Alias, child: LogicalPlan) 对 child 取别名
GlobalLimit(limitExpr: Expression, child: LogicalPlan) 对 child 输出的数据进行 Limit 限制
Window(windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: LogicalPlan) 输入 child 进行窗口操作,windowExpressions 表示窗口函数的列表,每个窗口函数都是一个 NamedExpression,指定了要计算的聚合操作;partitionSpec 是表达式的列表,用于指定窗口的分区方式;orderSpec 是排序规则的列表,用于指定窗口内行的排序方式

2.2 Analyzer 阶段

分析器会根据上下文和元数据信息,将 UnresolvedRelation 解析成 ResolvedRelation,并自动创建一个 SubqueryAlias 别名的子查询。当分析器解析 Hive 表时,会将 HiveTableRelation 作为 ResolvedRelation 的一部分,其中包含了 Hive 表的元数据信息,如表的名称、列的类型等。因此 UnresolvedRelation 就会自动变成 SubqueryAlias 别名信息,并包含 HiveTableRelation 即 Hive 表的元数据信息。

== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- SubqueryAlias tmp
   +- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
      +- Filter ((id#103 = id#106) AND (age#105 >= 11))
         +- Join Inner
            :- SubqueryAlias spark_catalog.default.stu
            :  +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
            +- SubqueryAlias spark_catalog.default.score
               +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
image.png

2.3 Optimizer 阶段

部分逻辑优化的规则,如下所示。

名称 功能描述 案例
ConstantFolding 对常量表达式进行折叠 select 100+10+a from t 转换为 select 110+a from t
PushDownPredicate 将谓词下推到适当的位置,以尽早过滤掉不必要的行 例如 Filter 操作,select rand(), a from (select * from t) where a>1 转换为 select rand(), a from (select * from t where a>1)
ColumnPruning 去除不需要的列,即删除 child 无用的 output 字段 select a from (select a, b from t) 转换为 select a from (select a from t)
BooleanSimplification 对布尔表达式进行简化,主要是针对 where 语句中的 and/or 组合逻辑 true or a=b 转换为 true
SimplifyCasts 简化类型转换 cast 操作。如果 cast 前后数据类型没有变化,即可以删除 cast 操作 select cast(a as int) from t 转换为 select a from t
SimplifyCaseConversionExpressions 简化大小写转换操作,如果对字符串进行连续多次 Upper/Lower 操作,只需要进行最后一次操作即可 select lower(upper(lower(a))) as c from 转换成 select lower(a) as c from t;
CollapseProject 合并相邻的投影操作,将 Project 与子 Project 或者子 Aggregate 进行合并 select c+1 from (select a+b as c from t) 转换为 select a+b+1 as c+1 from t
CollapseRepartition 合并相邻的 Repartition 重新分区操作 Repartion(numPartitions, shuffle, Repartition(_, _, child)) 转换为 Repartion(numPartitions, shuffle, child)

左图 Analyzed Logical Plan 使用了如下规则,优化成右图 Optimized Logical Plan。

image.JPG
== Optimized Logical Plan ==
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- Project [(110 + score#108) AS v#97, name#104]
   +- Join Inner, (id#103 = id#106)
      :- Project [id#103, name#104]
      :  +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
      :     +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
      +- Project [id#106, score#108]
         +- Filter isnotnull(id#106)
            +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

2.4 Planner 阶段

左图 Optimized Logical Plan 最终化成右图 Physical Plan,如下所示。

image.jpg
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
   +- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
      +- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
         +- Project [(110 + score#108) AS v#97, name#104]
            +- SortMergeJoin [id#103], [id#106], Inner
               :- Sort [id#103 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
               :     +- Project [id#103, name#104]
               :        +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
               :           +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
               +- Sort [id#106 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                     +- Filter isnotnull(id#106)
                        +- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

说明:spark-sql 中,join 操作是根据各种条件选择不同的 join 策略,包括 BroadcastHashJoinSortMergeJoinShuffleHashJoin

上一篇 下一篇

猜你喜欢

热点阅读