sparksql窗口函数原理

2020-02-24  本文已影响0人  曾二爷耶

一、窗口函数是啥

在单表数据操作中,一般有下面两种操作范式:

  1. 针对单条数据的映射操作,例如每条数据加一的时候。
  2. 将数据分组后的聚合操作,例如进行分组统计的时候。

在第一种范式中有这样一种情况,当你要生成某条目标数据的时候你需要用到前后N条数据参与计算,例如当你需要基于每天的用户访问数,来计算七天的移动平均访问数,就需要按照时间排序,每一条数据的计算都需要前面6条数据一起参与计算。


二、窗口函数的使用范式

一般窗口函数都是这样用的

SELECT window_func(args)
OVER (
  [PARTITION BY col_name, col_name, ...] 
  [ORDER BY col_name, col_name, ...]
  [ROWS | RANGE BETWEEN (CURRENT ROW | (UNBOUNDED |[num]) PRECEDING) AND (CURRENT ROW | ( UNBOUNDED | [num]) FOLLOWING)]
)

上面是sql的语法,相信大家比较难看懂
举个例子:我们常用的row_number()来说

select row_number() 
over(
  partition by col1
  order by col2
) 
from table

上面的语句主要分两部分

  1. window函数部分(window_func)
  2. 窗口定义部分

2.1 window函数部分

windows函数部分就是所要在窗口上执行的函数,spark支持三中类型的窗口函数:

  1. 聚合函数 (aggregate functions)
  2. 排序函数(Ranking functions)
  3. 分析窗口函数(Analytic functions)

第一种都比较熟悉就是常用的count 、sum、avg等
第二种就是row_number、rank这样的排序函数
第三种专门为窗口而生的函数比如:cume_dist函数计算当前值在窗口中的百分位数

2.2 窗口定义部分

这部分就是over里面的内容了
里面也有三部分

  1. partition by
  2. order by
  3. ROWS | RANGE BETWEEN

前两部分就是把数据分桶然后桶内排序,排好了序才能很好的定位出你需要向前或者向后取哪些数据来参与计算。
这第三部分就是确定你需要哪些数据了。

spark提供了两种方式一种是ROWS BETWEEN也就是按照距离来取
例如

  1. ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    就是取从最开始到当前这一条数据,row_number()这个函数就是这样取的
  2. ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING代表取前面两条和后面两条数据参与计算,比如计算前后五天内的移动平均就可以这样算.

还有一种方式是
RANGE BETWEEN 这种就是以当前值为锚点进行计算。
比如RANGE BETWEEN 20 PRECEDING AND 10 FOLLOWING
当前值为50的话就去前后的值在30到60之间的数据。

三、windows实现原理

我们从最终的执行层面来看看数据是怎么流转的

3.1 执行层面

例子:

df = spark.range(10).selectExpr("id","id%3 as flag")
df.selectExpr("""sum(id)
over(
  partition by flag 
  order by id 
  ROWS BETWEEN 1 PRECEDING and  1 FOLLOWING
)  as s""").explain()

我们看看它的执行计划是咋样的?

== Physical Plan ==
*(3) Project [x#266L]
+- Window [sum(id#261L) windowspecdefinition(flag#263L, id#261L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, 1)) AS x#266L], [flag#263L], [id#261L ASC NULLS FIRST]
   +- *(2) Sort [flag#263L ASC NULLS FIRST, id#261L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(flag#263L, 60)
         +- *(1) Project [id#261L, (id#261L % 3) AS flag#263L]
            +- *(1) Range (0, 10, step=1, splits=60)

可以看出是先按照partitionby的字段进行了重分区,把桶内的数据都聚集到一起。然后再进行排序。最后执行window函数。

四、总结

这篇文章介绍了窗口函数的使用以及简单的数据流转原理。
如果你是spark的设计者,你会怎么来设计窗口函数的实现呢,留言告诉我,我可能会在下期介绍下spark源码是如何来实现它的。

上一篇 下一篇

猜你喜欢

热点阅读