spark

Spark SQL原理之Aggregate实现原理

2020-02-03  本文已影响0人  分裂四人组

聚合函数的分类

聚合缓冲区和聚合模式

在当前的实现中,这几个模式的分类其实并不是很好,可以参考AggregationIterator中的注释:

/**

* The following combinations of AggregationMode are supported:

* - Partial

* - PartialMerge (for single distinct)

* - Partial and PartialMerge (for single distinct)

* - Final

* - Complete (for SortAggregate with functions that does not support Partial)

* - Final and Complete (currently not used)

*

* TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression

* could have a flag to tell it's final or not.

*/

planAggregateWithoutDistinct:不带有distinct方法的聚合实现

create temporary view data as select * from values

  (1, 1),

  (1, 2),

  (2, 1),

  (2, 2),

  (3, 1),

  (3, 2)

  as data(a, b);

explain select sum(b) from data group by a;

== Physical Plan ==

*HashAggregate(keys=[a#10], functions=[sum(cast(b#11 as bigint))])

+- Exchange hashpartitioning(a#10, 200)

   +- *HashAggregate(keys=[a#10], functions=[partial_sum(cast(b#11 as bigint))])

      +- LocalTableScan [a#10, b#11]

planAggregateWithOneDistinct: 带有distinct方法的聚合实现

create temporary view data as select * from values

  (1, 1),

  (1, 2),

  (2, 1),

  (2, 2),

  (3, 1),

  (3, 2)

  as data(a, b);

explain select sum(b), sum(distinct b) from data group by a;

== Physical Plan ==

*HashAggregate(keys=[a#10], functions=[sum(cast(b#11 as bigint)), sum(distinct cast(b#11 as bigint)#94L)])

+- Exchange hashpartitioning(a#10, 200)

   +- *HashAggregate(keys=[a#10], functions=[merge_sum(cast(b#11 as bigint)), partial_sum(distinct cast(b#11 as bigint)#94L)]) // step3

      +- *HashAggregate(keys=[a#10, cast(b#11 as bigint)#94L], functions=[merge_sum(cast(b#11 as bigint))]) // step2

         +- Exchange hashpartitioning(a#10, cast(b#11 as bigint)#94L, 200)

            +- *HashAggregate(keys=[a#10, cast(b#11 as bigint) AS cast(b#11 as bigint)#94L], functions=[partial_sum(cast(b#11 as bigint))]) // step1

               +- LocalTableScan [a#10, b#11]

参考:

上一篇下一篇

猜你喜欢

热点阅读