Spark writer

2019-07-28  本文已影响0人  clive0x

对UnsafeShuffleWriter可优化配置主要在最终多个spill合并时,input和output缓存配置(mergeSpills方法),有transferTo(跳过Ring0,Ring 3层直接copy,主要在大文件场景,小文件或者有些内存bug场景不合适)和普通FileStream操作

TransferTo分支配置:

1.spark.shuffle.unsafe.fastMergeEnabled:True

2.spark.shuffle.compress为False ,或者为True时,spark.io.compression.codec:Snappy,LZF,LZ4,ZStd,默认LZ4,这点比Hadoop要好,Hadoop默认为zip,导致为不能blocks并发处理,也就失去并发计算框架能力。

普通FileStream优化参数,input/output buffer

spark.shuffle.file.buffer:32K

spark.shuffle.unsafe.file.output.buffer:32K

内存array:LongArray初始大小spark.shuffle.sort.initialBufferSize:4K 

默认使用spark.shuffle.sort.useRadixSort:True 排序算法,比Tim Sort排序算法要快。缺点是在数组中排序,内存要增加0.5倍左右。

BypassMergeSortShuffleWriter条件分支:

1.没有order.

2.没有map combine.

3.partitions(Reduce数)小于spark.shuffle.sort.bypassMergeThreshold

BypassMergeSortShuffleWriter,同时打开partitions个临时文件写入,没有内存缓存,没有sort和merge,只是在最后阶段把NumberOf(partitions)临时文件合并到一个大文件,缺点是如果map*reduce足够多,走了hashsort老路。

SortShuffleWriter,数据写入内存(map combine:PartitionedAppendOnlyMap,否则PartitionedPairBuffer,都是array(2*len):k1v1k2v2...knvn,区别,前者返回在前面buckets,需要移动位置),内存不够时sort并spill产生临时文件,最终多个spill文件与内存数据排序与merge。

Shuffle落地磁盘文件,以map为单位落地数据文件和索引文件

localdirs[x]/subDirsPerLocalDir[y]/shuffle_[shuffle_id]_[map_id]_0_data

localdirs[x]/subDirsPerLocalDir[y]/shuffle_[shuffle_id]_[map_id]_0_index

上一篇 下一篇

猜你喜欢

热点阅读