Spark和Hadoop的shuffule
spark的shuffle和Hadoop的shuffle(mapreduce)的区别和关系是什么?
Hadoop Shuffle
对shuffle的期望:
- 完整地从map task端拉取数据到reduce 端。
- 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
- 减少磁盘IO对task执行的影响。
Map端
Partition
将map的结果发送到相应的reduce端,总的partition的数目等于reducer的数量。
负载均衡,效率;
Spill (sort & combiner)
把内存缓冲区中的数据写入到本地磁盘,在写入本地磁盘时先按照partition、再按照key进行排序。
如果在程序中设置了combine,map输出数据根据分区排序完成后,在写入文件之前会执行一次combine操作。
Merge
merge过程:当map很大时,每次溢写会产生一个spill_file,这样会有多个spill_file,而最终的一个map task输出只有一个文件,因此,最终的结果输出之前会对多个中间过程进行多次溢写文件(spill_file)的合并,此过程就是merge过程。也即是,待Map Task任务的所有数据都处理完后,会对任务产生的所有中间数据文件做一次合并操作,以确保一个Map Task最终只生成一个中间数据文件。
Reduce 端
Copy
拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为这时map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
Merge
Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比 map 端的更为灵活,它基于 JVM 的heap size设置,因为 Shuffle 阶段 Reducer 不运行,所以应该把绝大部分的内存都给 Shuffle 用。
Reduce处理
merge的最后会生成一个文件,大多数情况下存在于磁盘中,但是需要将其放入内存中。当reducer 输入文件已定,整个 Shuffle 阶段才算结束。然后就是 Reducer 执行,把结果放到 HDFS 上。
Spark
详细探究Spark的shuffle实现和hadoop mapreduce shuffle原理
Shuffle Write
将数据分成bucket,并将其写入到磁盘的过程。
ShuffleConsolidation,每个bucket并非对应一个文件,而是对应文件中的一个segment,显著减少了Shuffle文件的数量。
ShuffleFetch
从存储shuffle数据的节点Fetch数据,并执行用户定义的聚集操作。
Lcoal的数据通过BlackManager来获取。
远端的数据,通过BlockTransferServoce来完成。
Spark假定在大多数场景下,Shuffle数据的排序不是必须的。
所以采用Hash-Based的Aggerator方式。
Spark将需要聚集的数据分为两类
-
不需要归并排序的。在内存这种的AppendOnlyOnMap中进行聚集。
-
需要归并排序的,先在内存中聚集,达到一定阈值时,先排序再写入磁盘。