Spark之SortShuffle原理参考
- refer1: https://www.cnblogs.com/itboys/p/9201750.html
- refer2: https://www.cnblogs.com/jcchoiling/p/6431969.html
- refer3: https://www.jianshu.com/p/b4096078a48c?from=timeline&isappinstalled=0
- refer4: https://blog.csdn.net/qq_37142346/article/details/81875249
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台
回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化以此为:
- Basic Hash Shuffle:M * R;
- Consalidate 方式的Hash Shuffle:Core * R;
- Sort-based Shuffle:M*2(一个数据文件,一个索引文件);
Spark支持Hash Shuffle和Sort Shuffle,早期版本使用Hash Shuffle(包括优化后的Hash Shuffle)。Spark1.2起默认使用Sort Shuffle,并且Sort Shuffle在map端有三种实现,分别是UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter,根据运行时信息自动选择对应的实现。
简单介绍三种ShuffleWriter实现
-
BypassMergeSortShuffleWriter: 和Hash Shuffle实现基本相同,区别在于map task输出会汇总为一个文件,同时生成一个索引文件,即M*2个文件;
-
UnsafeShuffleWriter: tungsten-sort,ShuffleExternalSorter使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高;
-
SortShuffleWriter: Sort Shuffle和Hash Shuffle的主要不同在于,map端支持Partition级别的sort,map task输出会汇总为一个文件,同时生成一个索引文件,即M*2个文件;
补充说明:上面SortShuffleWriter中提到的Partition,不是RDD中的Partition,而是类似Spark Shuffle之Hash Shuffle中的bucket,如果没有单独说明,Sort Shuffle相关文章中的Partition均为bucket,和源码中的变量名保持一致。
源码中选择哪种ShuffleWriter的逻辑参考SortShuffleManager类
-
没有map端聚合操作,且RDD的Partition数小于200,使用BypassMergeSortShuffleWriter。
-
没有map端聚合操作,RDD的Partition数小于16777216,且Serializer支持relocation,使用UnsafeShuffleWriter。
-
上述条件都不满足,使用SortShuffleWriter。
其中Serializer支持relocation
上面提到UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation是指,Serializer可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。Serializer的这个属性会在UnsafeShuffleWriter进行排序时用到,具体参考Introduce internal Serializer API for determining if serializers support object relocation #5924。支持relocation的Serializer是KryoSerializer,Spark默认使用JavaSerializer,可以通过参数spark.serializer设置。
其中Sort Shuffle设置:上述三种ShuffleWriter实现均由SortShuffleManager管理
关于Sorted-Based Shuffle需要明白几点:
一、Spark中的Sorted-Based Shuffle产出的结果是并没有排序的,也就是说Shuffle的Reduce阶段是没有进行排序操作的,这点和MR不一样。
二、Spark中的Sorted-Based Shuffle只是中间结果排序,也就是说Shuffle的Mapper阶段在将bucket缓存Spill到磁盘的时候进行了排序操作,生成了FileSegment,其中涉及到一个排序算法TimSort。合并FileSegment的为一个文件的同时,生成一个索引文件。
三、排序操作相当于合并相同的Key,聚合数据在一起,便于后续Reducer阶段读取相应的数据。
Sorted-Based Shuffle 的核心是借助于 ExternalSorter 把每个 ShuffleMapTask 的输出,排序到一个文件中 (FileSegmentGroup),为了区分下一个阶段 Reducer Task 不同的内容,它还需要有一个索引文件 (Index) 来告诉下游 Stage 的并行任务,那一部份是属于你的。
imageShuffle Map Task 在ExternalSorter 溢出到磁盘的时候,产生一组 File#(File Group是hashShuffle中的概念,理解为一个file文件池,这里为区分,使用File的概念,FileSegment根据PartionID排序) 和 一个索引文件,File 里的 FileSegement 会进行排序,在 Reducer 端有4个Reducer Task,下游的 Task 可以很容易跟据索引 (index) 定位到这个 Fie 中的哪部份 FileSegement 是属于下游的,它相当于一个指针,下游的 Task 要向 Driver 去碓定文件在那里,然后到了这个 File 文件所在的地方,实际上会跟 BlockManager 进行沟通,BlockManager 首先会读一个 Index 文件,根据它的命名则进行解析,比如说下一个阶段的第一个 Task,一般就是抓取第一个 Segment,这是一个指针定位的过程。
补充说明:Sort-Based Shuffle 最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容划分成不同 FileSegment 构成的单一文件 File,另外一个是索引文件 Index。
重要提示:在Sorted-Shuffle中会排序吗?Sort-Based Shuffle的Mapper端在 Sort and Spill 的过程中会排序操作,而且是Spill到磁盘的时候再进行排序的。但在Reducer阶段的ApependOnlyMap过程不进行排序的。
Spark早期版本采用的是AppendOnlyMap来实现shuffle reduce阶段数据的聚合,当数据量不大时没什么问题,但当数据量很大时就会占用大量内存,最后可能OOM。所以从spark 0.9开始就引入了ExternalAppendOnlyMap来代替AppendOnlyMap。