Spark

Learning Spark [3] - Catalyst Op

2021-01-15  本文已影响0人  屹然1ran

Catalyst Optimizers是Spark SQL的一个重要功能,他会将数据查询转换为执行计划。他分为四个步骤:

  1. 分析
  2. 逻辑优化
  3. 物理规划
  4. 生成代码

例子:

M&Ms例子
两段不同语言代码的执行代码是相同的。所以无论是你使用了什么语言,你的查询和计算会经过相同处理。

# In Python 
count_mnm_df = (mnm_df  
  .select("State", "Color", "Count")   
  .groupBy("State", "Color")   
  .agg(count("Count")   
  .alias("Total"))   
  .orderBy("Total", ascending=False))
-- In SQL 
SELECT State, Color, Count, sum(Count) AS Total 
FROM MNM_TABLE_NAME 
GROUP BY State, Color, Count 
ORDER BY Total DESC

使用count_mnm_df.explain(True)可以查看具体Python Code的详细步骤。(在以后关于Debugging时,我们会更深入的讨论这部分)

count_mnm_df.explain(True)
== Parsed Logical Plan == 
'Sort ['Total DESC NULLS LAST], true 
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]   
  +- Project [State#10, Color#11, Count#12]      
    +- Relation[State#10,Color#11,Count#12] csv
== Analyzed Logical Plan == 
State: string, Color: string, Total: bigint 
Sort [Total#24L DESC NULLS LAST], true 
  +- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]   
    +- Project [State#10, Color#11, Count#12]      
      +- Relation[State#10,Color#11,Count#12] csv
== Optimized Logical Plan == 
Sort [Total#24L DESC NULLS LAST], true 
  +- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]   
    +- Relation[State#10,Color#11,Count#12] csv
== Physical Plan == 
*(3) Sort [Total#24L DESC NULLS LAST], true, 0 
  +- Exchange rangepartitioning(Total#24L DESC NULLS LAST, 200)   
    +- *(2) HashAggregate(keys=[State#10, Color#11], functions=[count(Count#12)], output=[State#10, Color#11, Total#24L])      
      +- Exchange hashpartitioning(State#10, Color#11, 200)         
        +- *(1) HashAggregate(keys=[State#10, Color#11], functions=[partial_count(Count#12)], output=[State#10, Color#11, count#29L])            
          +- *(1) FileScan csv [State#10,Color#11,Count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jules/gits/LearningSpark2.0/chapter2/py/src/... dataset.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<State:string,Color:string,Count:int> 
四个步骤

阶段1:Analysis(分析)

在进行SQL或者Dataframe查询时,Spark SQL生成抽象Abstract Syntax Tree(逻辑树)。在这个阶段,任何行名和列名都会被抹除,取而代之的是一个内部Catalog(日志),里面将会记录所有的列名、行名、数据类型、函数、列表、数据库等等。在所有这些属性都被抹除后,查询就会到下一个阶段。

阶段2:Logical Optimization(逻辑优化)

在上图中可以看到,Logical Optimization分为两个小阶段。首先根据标准的Rule Based Optimization,Catalyst Optimizer会构建一个包含了多个plan的集,然后Cost-based optimizer(CBO)会分配每个plan的消耗。这些plan会被布置呈operator trees。

阶段3:Physical Planning(规划)

Spark SQL会为每个逻辑生成一个最佳规划,使Operators和Spark执行引擎进行匹配。

阶段4:Code Generation(代码生成)

最后一个阶段为生成高效的Java代码,并在每个机器上运行。因为Spark SQL可以对保存在内存里的数据进行操作,所以Spark会使用state-of-the-art编译技术去提升执行效率。总而言之,他的作用类似编译器

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

上一篇下一篇

猜你喜欢

热点阅读