SortShuffle之UnsafeShuffleWriter

2019-01-07  本文已影响10人  liuzx32

UnsafeShuffleWriter 对应SortShuffle的tungsten-sort方式

#实现方式参考图:

tungsten-sort

#UnsafeShuffleWriter内部使用了和BytesToBytesMap基本相同的数据结构处理map端的输出,不过将其细化为ShuffleExternalSorter和ShuffleInMemorySorter两部分,功能如下

ShuffleExternalSorter 使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair

ShuffleInMemorySorter 使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes

#排序

写文件或溢写前(spill到disk前),根据数据的PartitionId信息,使用TimSort算法对ShuffleInMemorySorter的long数组排序,排序的结果为,PartitionId相同的聚集在一起,且PartitionId较小的排在前面,ShuffleExternalSorter中的数据不需要处理,如下图所示:

溢出到磁盘时排序

#写文件

依次读取ShuffleInMemorySorter中long数组的元素,再根据page number和offset信息去ShuffleExternalSorter中读取K-V Pair写入文件,如下图所示:

写出到磁盘:数据文件和索引文件

#溢写 & 合并

内存不足时,溢写数据到磁盘,每次溢写会生成上图中的一个dataFile,如果多次溢写产生多个dataFile,会在map端数据处理结束后进行merge合并为一个dataFile,如下图所示:

merge fileSegment为一个File

至此,UnsafeShuffleWriter的实现就介绍完了。

#优势

SPARK-7081中简述了UnsafeShuffleManager的优势,如下介绍:

1. ShuffleExternalSorter使用UnSafe API操作序列化数据,而不是Java对象,减少了内存占用及因此导致的GC耗时(参考Spark 内存管理之Tungsten),这个优化需要Serializer支持relocation。

2. ShuffleExternalSorter存原始数据,ShuffleInMemorySorter使用压缩指针存储元数据,每条记录仅占8 bytes,并且排序时不需要处理原始数据,效率高。

3. 溢写 & 合并这一步操作的是同一Partition的数据,因为使用UnSafe API直接操作序列化数据,合并时不需要反序列化数据。

4. 溢写 & 合并可以使用fastMerge提升效率(调用NIO的transferTo方法),设置spark.shuffle.unsafe.fastMergeEnabled为true,并且如果使用了压缩,需要压缩算法支持SerializedStreams的连接,各默认值如下

#使用

Spark Shuffle之Sort Shuffle中讨论了使用UnsafeShuffleWriter需满足的前提条件,如下

#接下来分析下为什么要满足这三个要求

1. map-side aggregation:从上面的实现也可以看出,UnsafeShuffleWriter不是类似HashMap的数据结构,无法聚合key对应的value,所以无法支持map端的aggregation。

2. Partition数小于16777216:参考第一幅图,存储PartitionId信息使用24bit,能表示的最大值为 (1 << 24) = 16777215,因此Partition数要小于16777216。

3. Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。

#总结

本文介绍tungsten-sort(UnsafeShuffleWriter)的实现、优势及何种情况下被Spark使用。

#

上一篇 下一篇

猜你喜欢

热点阅读