aggregate和aggregateByKey算子理解
一.aggregate和aggregateByKey参数
aggregate和aggregateByKey的参数是一样的,作用也一样,只不过aggregateByKey多了key而已。
def aggregate[U: ClassTag](zeroValue:U)(seqOp: (U, T) =>U, combOp: (U, U) =>U)
zeroValue:U ---> 初始值
(U, T) =>U ----> 相同partition中值的合并逻辑
(U, U) =>U ----> partition之间的结果合并逻辑
def aggregateByKey[U: ClassTag](zeroValue:U)(seqOp: (U, V) =>U, combOp: (U, U) =>U)
zeroValue:U ---> 初始值
(U, T) =>U ----> 相同partition中值的合并逻辑
(U, U) =>U ----> partition之间的结果合并逻辑
二.通过demo来展示各个参数的作用
本着理论吹的烟雪起闻者听之如放屁中心思想,我们通过简单的实例来讲解各个参数的作用。
1.aggregate
// 创建一份数据,并设置两个partition
// partition_1 (1,2,3) | partition_2 (4,5,6)
val data = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
// 初始值为0,每个partition内部聚合函数为 取最大值,partition之间的聚合函数为相加
val i: Int = data.aggregate(0)(math.max(_, _), _ + _)
// 打印结果 9
println(i)
现在我们分析一下计算过程:
1.首先我们有2个partition (1,2,3)和(4,5,6),以(1,2,3) 为例,触发math.max函数
max(0,1) => 1 初始值和第一个元素取最大值 得到 1
max(1,2) => 2 同上
max(2,3) => 3 同上
partition_1 (1,2,3) 得到的聚合结果为3
同样的计算逻辑我们得到
partition_2 (4,5,6) 得到的聚合结果为6
2.在计算partition之间的结果,触发 (_ + _) 函数
由上步可以得到 partition_1 ----- 3
由上步可以得到 partition_2 ----- 6
(0 + 3) = 3
(3 + 6) = 9
刚才的例子中我们有两个partition,假如我们有三个partition的话结果就会不同,
partition内部
p_1(1,2) --->2
p_2(3,4) ---->4
p_3(5,6) ---->6
partition之间
(0+2),
(2+4),
(6+6) =12
不信的小伙伴可以去试试,赌包辣条,就是这样,喵~~~~
2.aggregateByKey
运算逻辑和aggregate相似这里只展示测试代码和结果
val data = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)),2)
val value = data.aggregateByKey(0)(math.max(_, _), _ + _)
//运行结果k_v => (2,3) ,(1,7) 可以尝试不同的分区对结果的影响,加深理解。