Spark Shuffle 模块② - Hash Based S

2016-09-21  本文已影响279人  牛肉圆粉不加葱

Spark 2.0 中已经移除 Hash Based Shuffle,但作为曾经的默认 Shuffle 机制,还是值得进行分析

Spark 最开始只有 Hash Based Shuffle,因为在很多场景中并不需要排序,在这些场景中多余的排序反而会损耗性能。

Hash Based Shuffle Write

该过程实现的核心是在 HashShuffleWriter#write(records: Iterator[Product2[K, V]]): Unit 其主要流程如下:

该函数的输入是一个 Shuffle Map Task 计算得到的结果(对应的迭代器),若在宽依赖中定义了 map 端的聚合则会先进行聚合,随后对于迭代器(若要聚合则为聚合后的迭代器)的每一项先通过计算 key 的 hash 值来确定要写到哪个文件,然后将 key、value 写入文件。

写入的文件名的格式是:shuffle_$shuffleId_$mapId_$reduceId。写入时,若文件已存在会删除会创建新文件。

上图描述了如何处理一个 Shuffle Map Task 计算结果,在实际应用中,往往有很多 Shuffle Map Tasks 及下游 tasks,即如下情况(图摘自:JerryLead/SparkInternals-Shuffle 过程):

存在的问题

这种简单的实现会有几个问题,为说明方便,这里设 M = Shuffle Map Task 数量R = 下游 tasks 数量

改进:Shuffle Consolidate Writer

在上面提到的几个问题,Spark 提供了 Shuffle Consolidate Files 机制进行优化。该机制的手段是减少 Shuffle 过程产生的文件,若使用这个功能,则需要置 spark.shuffle.consolidateFilestrue,其实现可用下图来表示(图摘自:JerryLead/SparkInternals-Shuffle 过程

即:对于运行在同一个 core 的 Shuffle Map Tasks,对于将要被同一个 reducer read 的数据,第一个 Shuffle Map Task 会创建一个文件,之后的就会将数据追加到这个文件而不是新建一个文件(相当于同一个 core 上的 Shuffle Map Task 写了文件不同的部分)。因此文件数就从原来的 M * R 个变成了 cores * R 个。当 M / cores 的值越大,减少文件数的效果越显著。需要注意的是,该机制虽然在很多时候能缓解上述的几个问题,但是并不能彻底解决。

参考


欢迎关注我的微信公众号:FunnyBigData

FunnyBigData
上一篇下一篇

猜你喜欢

热点阅读