眼君的大数据之路

shuffle过程详解

2020-08-27  本文已影响0人  眼君

基本概念

环形缓冲区

mapper类的map函数每次调用context.write()时,都会写出一条记录。maptask频繁的写出数据,输出的结果会先输出到一个环形缓冲区中。
环形缓冲区默认的大小是100M,这个大小是由配置参数决定的:

mapreduce.task.io.sort.mb=100

当maptask不断将数据写入环形缓冲区,达到环形缓冲区的容量阈值(默认0.8),环形缓冲区会触发写入磁盘的操作,以下参数决定环形缓冲区溢出写的阈值:

mapreduce.map.sort.spill.percent=0.8
环形缓冲区的结构

环形缓冲区底层存储的是一个首尾相连字节数组,该数组中维护了一个equtor,用于区分元数据和原始数据。这里的原始数据就是指Maptask输出的数据,元数据则是用于记录原始数据的数据。元数据和原始数据从equtor开始背向开始在环形缓冲区中写入数据。
元数据包含4部分内容:

  1. 原始数据中key的起始位置。
  2. 原始数据中value的起始位置。
  3. value的长度。
  4. 分区信息,记录这条记录属于哪条分区。

每条记录的元数据占用的空间大小是一样的,都是64字节。这样当shuffle过程中进行排序时,只需要在环形缓冲区中先对元数据进行排序。

Map的shuffle过程

这一过程主要包括输出、排序、溢写、合并等步骤,如下图所示。


1、输出collect。

每个Maptask都将数据输出到该Maptask对应的环形缓冲区Kvbuffer中。

2、排序sort。

在环形缓冲区Kvbuffer中对元数据按照partition和sortkey进行排序,排序算法是快速排序。

3、溢写spill。

当环形缓冲区Kvbuffer中数据达到溢写阈值时,会生成一个溢写文件,将环形缓冲区中的原始数据写入该文件。
按照上一步排序的元数据,溢写时对原始数据进行排序。
由于一个Maptask处理的数据可能需要多次溢写才能写完,所以每个Maptask可能生成多个溢写文件。最终剩在环形缓冲区中的数据达不到阈值条件,会强制刷出生成一个溢写文件。

4、合并merge。

每一个Maptask将上一步生成的多个溢写文件进行归并排序,合并成一个文件。

Reduce的shuffle过程

这一过程主要包括复制、排序合并步骤。


1、抓取数据copy。

在reduce端会从各maptask运行节点按分区拖取数据。

2、排序合并sort-merge。

每个分区的数据从多个maptask拖取过来后进行归并排序,合并成一个文件。
最后各个分区的文件通过分区组件的逻辑,划分到不同的reducetask。

combiner与shuffle

如果有combiner组件,将会在shuffle的这些步骤发挥作用:

  1. 环形缓冲区溢写时,调用combiner处理后生成溢写文件。
  2. 多个溢写文件归并是也会调用一次combiner处理后生成一个文件。
上一篇下一篇

猜你喜欢

热点阅读