大数据,机器学习,人工智能大数据

ShuffleManager 原理

2019-06-11  本文已影响10人  stone_zhu

在 Spark 的源码中,负责 shuffle 过程的执行、计算、处理的组件主要是 ShuffleManager。

在 Spark 1.2 以前,默认的 shuffle 计算引擎是 HashShuffleManager。该 ShuffleMananger 有一个非常严重的弊端,就是会产生大量的磁盘文件,进而有大量的磁盘 IO 操作,比较影响性能。

因此在 Spark 1.2 之后,默认的 ShuffleManager 改成了 SortShuffleManager。SortShuffleManager 相对来说,有了一定的改进。主要就在于,每个 Task 在 Shuffle Write 操作时,虽然也会产生较大的磁盘文件,但最后会将所有的临时文件合并 (merge) 成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 Stage 的 Shuffle Read Task 拉取自己数据的时候,只要根据索引拉取每个磁盘文件中的部分数据即可。

一,HashShuffleManager 运行原理

普通模式下,在 Shuffle Write 阶段,每个 Task 将数据按照 Key 进行 Hash 计算,然后按照计算结果,将相同的 Key 对应的数据写入内存缓冲区,当内存缓冲区写满之后会直接溢写到磁盘文件。这里需要写多少个磁盘文件,和下一个 stage 的 Shuffle Read Task 的数量一致。

然后,Shuffle Read 阶段的每个 Task 会拉取 Shuffle Write 阶段所有相同 Key 的文件,一遍拉取一遍聚合。每个 Shuffle Read 阶段的 Task 都有自己的缓冲区,每次只能拉取与缓冲区大小一致的数据,然后通过内存中的 Map 进行聚合等操作,聚合完一批再取下一批数据。

比如,当前 Stage 有 5 个 Executor,每个 Executor 分配一个 cpu core,有 50 个 task,每个 Executor 执行 10 个 task;下一个 stage 有100 个 task。那么在 Shuffle Write 阶段每个 task 要创建 100 个磁盘文件,每个 Executor 进程要创建 1000 个文件,一共要创建 1000 * 5 = 5000 个磁盘文件,数量很多。

具体执行原理图如下图所示:

image

针对 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles。这个参数的值默认是 fasle,如果设置成 true 之后就会开启优化机制。

当开启这个参数之后,在 Shuffle Write 阶段写文件的时候会复用文件,每个 task 不会为 Shuffle Read 阶段的 task 都创建一份文件。此时会出现一个 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量和 Shuffle Read 阶段的 task 数量一致。每个 Executor 上有多少个 cpu core 就会并行执行几个 task,每个 task 会创建一个 shuffleFileGroup,然后后续并行执行的 task 会复用前面生成的这个 shuffleFileGroup。

比如,当前 stage 有 5 个 Executor,每个 Executor 分配 3 个 cpu core,一共有 50 个 task,每个 Executor 执行 10 个 task,Shuffle Read 阶段有 100 个 task。那么此时,每个 Executor 进程会创建 3 * 100 个文件,一共会创建 5 * 3 * 100 个文件。

具体原理如图示:

image

二,SortShuffleManager 运行原理

SortShuffleManager 运行机制有两种,一种是普通运行机制,另一种是 bypass 运行机制。当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数值时 (默认是 200 ) ,就会启用 bypass 机制。

1,普通机制

在该模式下,Shuffle Write 阶段会将数据写入一个内存的数据结构中,此时根据不同的算子会有不同的数据结构。比如是 reduceByKey 这种聚合类的 shuffle 算子,会选用 Map 数据结构,一遍用 Map 进行聚合(HashShuffleManager 聚合操作是放在 Shuffle Read 阶段),一遍写入内存;如果是 join 相关的普通 shuffle 算子的话,会用 Array 数据结构,直接写入内存。当内存达到临界阈值之后,会将内存中的数据进行排序,然后分批次写入磁盘 (默认每批次有 1W 条数据),在写入磁盘的时候不会像 HashShuffleManager 那样直接写入磁盘,这里会先写入内存缓冲流,当缓冲流满溢之后一次性写入磁盘。

此时也会生成大批量的文件,最后会将之前所有的临时磁盘文件进行合并,这就是 merge 过程 (就是将所有的临时磁盘文件中的数据读取出来,然后依次写入最终的文件中)。每个 task 最终会生成一份磁盘文件和一份索引文件,索引文件中标示了下游每个 task 的数据在文件中的 start offset 和 end offset。

比如,当前 stage 有 5 个 Executor,每个 Executor 分配 1 个 cpu core,共有 50 个 task,每个 Executor 执行 10 个 task;下一个 stage 有 100 个 task。那么每个 Executor 创建 10 个磁盘文件,一共有 50 个磁盘文件。

具体如下图所示:

image

2,bypass 机制

触发该机制的条件:

1,shuffle reduce 端的 task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数值的时候;

2,不是聚合类的shuffle算子(比如reduceByKey);

该机制下,当前 stage 的每个 task 会将数据的 key 进行 hash,然后将相同 hash 的 key 锁对应的数据写入到同一个内存缓冲区,缓冲写满后会溢写到磁盘文件,这里和 HashShuffleManager一致。

然后会进入 merge 阶段,将所有的磁盘文件合并成一个磁盘文件,并创建一个索引文件。

相比较于普通机制,这里有两个地方不同:

1,将数据写入内存时候,普通模式是将数据写入 Map 或者 Array 这样的内存数据结构中,这里是根据 key 的 Hash 值直接写入内存;

2,该模式下在写入磁盘之前不会排序;

3,磁盘写机制不同。

具体如图示:

image

三,shuffle 相关的参数

spark.shuffle.file.buffer

spark.reducer.maxSizeInFlight

spark.shuffle.io.maxRetries

spark.shuffle.io.retryWait

spark.shuffle.memoryFraction

spark.shuffle.manager

spark.shuffle.sort.bypassMergeThreshold

spark.shuffle.consolidateFiles

上一篇 下一篇

猜你喜欢

热点阅读