Spark之SortShuffle原理参考

2019-03-28  本文已影响0人  liuzx32


Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台

回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化以此为:

  1. Basic Hash Shuffle:M * R;
  2. Consalidate 方式的Hash Shuffle:Core * R;
  3. Sort-based Shuffle:M*2(一个数据文件,一个索引文件);

Spark支持Hash Shuffle和Sort Shuffle,早期版本使用Hash Shuffle(包括优化后的Hash Shuffle)。Spark1.2起默认使用Sort Shuffle,并且Sort Shuffle在map端有三种实现,分别是UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter,根据运行时信息自动选择对应的实现。

简单介绍三种ShuffleWriter实现

  1. BypassMergeSortShuffleWriter: 和Hash Shuffle实现基本相同,区别在于map task输出会汇总为一个文件,同时生成一个索引文件,即M*2个文件;

  2. UnsafeShuffleWriter: tungsten-sort,ShuffleExternalSorter使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高;

  3. SortShuffleWriter: Sort Shuffle和Hash Shuffle的主要不同在于,map端支持Partition级别的sort,map task输出会汇总为一个文件,同时生成一个索引文件,即M*2个文件;

补充说明:上面SortShuffleWriter中提到的Partition,不是RDD中的Partition,而是类似Spark Shuffle之Hash Shuffle中的bucket,如果没有单独说明,Sort Shuffle相关文章中的Partition均为bucket,和源码中的变量名保持一致。

源码中选择哪种ShuffleWriter的逻辑参考SortShuffleManager类

  1. 没有map端聚合操作,且RDD的Partition数小于200,使用BypassMergeSortShuffleWriter。

  2. 没有map端聚合操作,RDD的Partition数小于16777216,且Serializer支持relocation,使用UnsafeShuffleWriter。

  3. 上述条件都不满足,使用SortShuffleWriter。

其中Serializer支持relocation

上面提到UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation是指,Serializer可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。Serializer的这个属性会在UnsafeShuffleWriter进行排序时用到,具体参考Introduce internal Serializer API for determining if serializers support object relocation #5924。支持relocation的Serializer是KryoSerializer,Spark默认使用JavaSerializer,可以通过参数spark.serializer设置。

其中Sort Shuffle设置:上述三种ShuffleWriter实现均由SortShuffleManager管理

关于Sorted-Based Shuffle需要明白几点:

一、Spark中的Sorted-Based Shuffle产出的结果是并没有排序的,也就是说Shuffle的Reduce阶段是没有进行排序操作的,这点和MR不一样。
二、Spark中的Sorted-Based Shuffle只是中间结果排序,也就是说Shuffle的Mapper阶段在将bucket缓存Spill到磁盘的时候进行了排序操作,生成了FileSegment,其中涉及到一个排序算法TimSort。合并FileSegment的为一个文件的同时,生成一个索引文件。
三、排序操作相当于合并相同的Key,聚合数据在一起,便于后续Reducer阶段读取相应的数据。

Sorted-Based Shuffle 的核心是借助于 ExternalSorter 把每个 ShuffleMapTask 的输出,排序到一个文件中 (FileSegmentGroup),为了区分下一个阶段 Reducer Task 不同的内容,它还需要有一个索引文件 (Index) 来告诉下游 Stage 的并行任务,那一部份是属于你的。

image

Shuffle Map Task 在ExternalSorter 溢出到磁盘的时候,产生一组 File#(File Group是hashShuffle中的概念,理解为一个file文件池,这里为区分,使用File的概念,FileSegment根据PartionID排序) 和 一个索引文件,File 里的 FileSegement 会进行排序,在 Reducer 端有4个Reducer Task,下游的 Task 可以很容易跟据索引 (index) 定位到这个 Fie 中的哪部份 FileSegement 是属于下游的,它相当于一个指针,下游的 Task 要向 Driver 去碓定文件在那里,然后到了这个 File 文件所在的地方,实际上会跟 BlockManager 进行沟通,BlockManager 首先会读一个 Index 文件,根据它的命名则进行解析,比如说下一个阶段的第一个 Task,一般就是抓取第一个 Segment,这是一个指针定位的过程。

补充说明:Sort-Based Shuffle 最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容划分成不同 FileSegment 构成的单一文件 File,另外一个是索引文件 Index。

重要提示:在Sorted-Shuffle中会排序吗?Sort-Based Shuffle的Mapper端在 Sort and Spill 的过程中会排序操作,而且是Spill到磁盘的时候再进行排序的。但在Reducer阶段的ApependOnlyMap过程不进行排序的。

Spark早期版本采用的是AppendOnlyMap来实现shuffle reduce阶段数据的聚合,当数据量不大时没什么问题,但当数据量很大时就会占用大量内存,最后可能OOM。所以从spark 0.9开始就引入了ExternalAppendOnlyMap来代替AppendOnlyMap。

上一篇下一篇

猜你喜欢

热点阅读