Shuffle操作
http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
Spark中的某些操作会触发称为shuffle的事件。 shuffle是Spark的重新分配数据的机制,因此它可以跨分区进行不同的分组。 这通常涉及跨执行程序和机器复制数据,使得shuffle成为复杂且昂贵的操作。
为了理解在shuffle期间发生的事情,我们可以考虑reduceByKey操作的示例。 reduceByKey操作生成一个新的RDD(重新分区),其中单个键的所有值都组合成一个元组,是 key 和 对与该key关联的所有value执行reduce函数的结果。 挑战在于,并非单个key的所有值都必须位于同一个分区,甚至是同一个机器上,但它们必须位于同一位置才能计算结果。
背景:
在Spark中,数据通常不跨分区分布,以便在特定操作的必要位置。 在计算过程中,单个任务将在单个分区上运行 - 因此,要组织单个reduceByKey reduce任务执行的所有数据,Spark需要执行全部操作。 它必须从所有分区读取以查找所有键的所有值,然后将各个值组合在一起以计算每个键的最终结果 - 这称为shuffle。
尽管新shuffle数据的每个分区中的元素集将是确定性的,并且分区本身的排序也是如此,但这些元素的排序不是。 如果在shuffle后需要可预测的有序数据,则可以使用:
mapPartitions使用例如.sorted对每个分区进行排序;
repartitionAndSortWithinPartitions在同时重新分区的同时有效地对分区进行排序;
sortBy来创建一个全局排序的RDD
可以导致shuffle的操作包括重新分区操作,例如重新分区和合并,“ByKey操作(计数除外)”,如groupByKey和reduceByKey,以及联合操作,如 cogroup and join.
性能影响:
Shuffle是一项昂贵的操作,因为它涉及磁盘I / O,数据序列化和网络I / O. 为了组织shuffle的数据,
Spark生成多组任务 -map tasks以组织数据,以及一组 reduce tasks来聚合它。 这个术语来自MapReduce,并不直接与Spark的map和reduce操作有关。
在内部,各个map任务的结果会保留在内存中,直到它们无法fit。 然后,这些基于目标分区进行排序并写入单个文件。 在reduce方面,tasks 读取相关的排序的块。
某些shuffle操作会消耗大量的堆内存,因为它们使用内存中的数据结构来在传输记录之前或之后组织记录。 具体来说,reduceByKey和aggregateByKey在map侧创建这些结构,并且'ByKey操作在reduce侧生成这些结构。 当数据不适合内存时,Spark会将这些表溢出到磁盘,从而导致磁盘I / O的额外开销和垃圾收集增加。
Shuffle还会在磁盘上生成大量中间文件。 从Spark 1.3开始,这些文件将被保留,直到不再使用相应的RDD并进行垃圾回收。 这样做是为了在重新计算 lineage 时不需要重新创建shuffle文件。 如果应用程序保留对这些RDD的引用或者GC不经常启动,则垃圾收集可能仅在很长一段时间后才会发生。 这意味着长时间运行的Spark作业可能会占用大量磁盘空间。
配置Spark上下文时,spark.local.dir配置参数指定临时存储目录。
可以通过调整各种配置参数来调整shuffle行为。See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.