shuffle过程详解
基本概念
环形缓冲区
mapper类的map函数每次调用context.write()时,都会写出一条记录。maptask频繁的写出数据,输出的结果会先输出到一个环形缓冲区中。
环形缓冲区默认的大小是100M,这个大小是由配置参数决定的:
mapreduce.task.io.sort.mb=100
当maptask不断将数据写入环形缓冲区,达到环形缓冲区的容量阈值(默认0.8),环形缓冲区会触发写入磁盘的操作,以下参数决定环形缓冲区溢出写的阈值:
mapreduce.map.sort.spill.percent=0.8
环形缓冲区的结构
环形缓冲区底层存储的是一个首尾相连字节数组,该数组中维护了一个equtor,用于区分元数据和原始数据。这里的原始数据就是指Maptask输出的数据,元数据则是用于记录原始数据的数据。元数据和原始数据从equtor开始背向开始在环形缓冲区中写入数据。
元数据包含4部分内容:
- 原始数据中key的起始位置。
- 原始数据中value的起始位置。
- value的长度。
- 分区信息,记录这条记录属于哪条分区。
每条记录的元数据占用的空间大小是一样的,都是64字节。这样当shuffle过程中进行排序时,只需要在环形缓冲区中先对元数据进行排序。
Map的shuffle过程
这一过程主要包括输出、排序、溢写、合并等步骤,如下图所示。

1、输出collect。
每个Maptask都将数据输出到该Maptask对应的环形缓冲区Kvbuffer中。
2、排序sort。
在环形缓冲区Kvbuffer中对元数据按照partition和sortkey进行排序,排序算法是快速排序。
3、溢写spill。
当环形缓冲区Kvbuffer中数据达到溢写阈值时,会生成一个溢写文件,将环形缓冲区中的原始数据写入该文件。
按照上一步排序的元数据,溢写时对原始数据进行排序。
由于一个Maptask处理的数据可能需要多次溢写才能写完,所以每个Maptask可能生成多个溢写文件。最终剩在环形缓冲区中的数据达不到阈值条件,会强制刷出生成一个溢写文件。
4、合并merge。
每一个Maptask将上一步生成的多个溢写文件进行归并排序,合并成一个文件。
Reduce的shuffle过程
这一过程主要包括复制、排序合并步骤。

1、抓取数据copy。
在reduce端会从各maptask运行节点按分区拖取数据。
2、排序合并sort-merge。
每个分区的数据从多个maptask拖取过来后进行归并排序,合并成一个文件。
最后各个分区的文件通过分区组件的逻辑,划分到不同的reducetask。
combiner与shuffle
如果有combiner组件,将会在shuffle的这些步骤发挥作用:
- 环形缓冲区溢写时,调用combiner处理后生成溢写文件。
- 多个溢写文件归并是也会调用一次combiner处理后生成一个文件。