hadoop(mapreduce) shuffle
本文主要想梳理下spark的四种shuffle机制。但是在说spark之前还是很有必要把mapreduce的shuffle过程理一遍,以做对比。
一、mapreduceshuffle过程
以读取hdfs文件进行单词统计为例,会分为map阶段 和 reduce阶段
1、map阶段
1.1、首先根据maptask的切片机制会把文件切成了多个数据切片(数据块)。一个数据切片对应一个maptask.
1.2、maptask会去逐行读取每个切片文件的数据。最后按照自定义的mapper逻辑里面产生想要的(key,value)键值对。
1.3、键值对会先放入一个内存环形缓冲区中,这个缓冲区的默认大小是100M.为了防止内存缓冲区满了之后数据被覆盖,当内存达到一定阈值(默认是80%,也是就是100M的80% = 80M)会先将内存数据溢出写入磁盘。
1.4、写入磁盘之前会先将数据按照key进行排序,按照分区策略写入文件。最终一次文件溢写会形成一个分区且排序好的文件。
1.5、由于数据源源不断,所以会触发多次内存数据溢出写入磁盘,会形成多个溢出文件。
1.6、在溢写文件的时候会对文件进行合并。为什么要合并主要原因有二点
a、从内存溢写出来的文件相对较小,不合并就会产生很多的小文件。
b、每个溢写文件内部是分区且排序的,但是对于全局来说并不是分区且排序的
c、多个文件会存在相同key.这样在合并的过程中也可以将相同key进行合并,从而减少数据 量。
所以在合并的过程中会两两将文件进行重新排序和分区,2个文件形成1个文件。这样迭代下去,最终会合并成一个总的分区且排序的文件。这样就完成了map阶段的任务。
2、reduce阶段
2.1、每个reduce task负责处理一个分区的文件。
2.1、reduce task从每个map task的结果文件中拉取对应分区的数据。因为数据在map阶段已经是分区好了的,并且会有一个额外的索引文件记录每个分区的起始偏移量。所以reduce task取数的时候直接根据偏移量去拉取数据就ok.
2.2、reduce task从每个map task拉取分区数据的时候会进行再次合并,排序,按照自定义的reducer的逻辑代码去处理。将结果输出。
shuffer是指的一个过程:从maptask端溢出数据到磁盘开始,到 reducetask端去调用用户的reduce方法处理数据之前。的整个过程叫做shuffer。见下图