Map和Reduce的shuffle过程

2019-12-17  本文已影响0人  雨林不落梦不落

一,Map端的Shuffle

Mapper中的map方法处理完一行数据之后,会将数据写出到缓冲区中,数据在缓冲区中进行分区,排序(快速排序),如果指定了Combiner,那么数据在缓冲区中还会进行Combine合并。当缓冲区的容量使用到达一定限度时,MapTask会将缓冲区中的数据溢写(spill)到磁盘上,后续的数据可以继续写到缓冲区中。MapTask将所有的数据都处理完成之后,会将所有的溢写文件合并(merge)成一个结果文件(final out)。

几个小问题:

  1. 每一个MapTask自带一个缓冲区,缓冲区的本质上是一个环形的字节数组,(优势在于能够重复利用缓冲区不用寻址)。
  2. 此缓冲区维系在内存中,默认的容量为100M。
  3. 上述提到的缓冲区到达一定的限度指的是溢写的阈值,默认的阈值为0.8,目的是为了避免MapTask写出结果的时候产生大量的阻塞。
  4. 每一次溢写都会产生一个新的溢写文件,单个溢写文件中的数据时分区且排序的,但是所有的溢写文件中的数据时局部有序,整体无序的。
  5. 如果有一部分结果再缓冲区,一部分结果在溢写文件中,这个时候所有的结果都会直接合并到最后的final out 中,如果没有产生溢写过程,则缓冲区中的数据直接冲刷到final out 中。
  6. 在merge的过程中,数据会再次进行分区和排序,所以final out 是整体分区且有序。这个过程中的排序使用的是归并排序
    7.如果制定了Combiner,并且溢写文件的个数大于等于3个,那么在merge过程中自动进行combine。
    shuffle.png

二,Reduce端的shuffle

每一个ReduceTask启动fetch线程通过get请求抓取数据,在抓取数据的时候,每一个ReduceTask只抓取当前分区的数据,在抓取到数据之后,会将数据存放在本地的磁盘上。在抓取完成后,ReduceTask会将这些小文件进行merge,合并成一个大文件,在合并的过程中进行再次排序,采取的是归并排序。合并完成之后,会将相同的键对应的值放到一个迭代器中(这个过程称为分组group)。形成一个键对应一个迭代器的结构。每一个键触发一次reduce方法。

shuffle2.png

由于在整个shuffle的过程中频繁的进行合并,排序,以及和磁盘的交互,所以需要对shuffle进行优化。

  1. 增大缓冲区,一般缓冲区会设置在250-400M之间。
  2. 调大阈值,但会相应的增加阻塞的几率。
  3. 尽量的增加Combine过程。
  4. 默认情况下,fetch线程的数量为5,可以考虑增加fetch线程的数量,但要注意服务器的承载量。
  5. 增加压缩机制,(不建议这样做,在网络资源比较紧张的情况下可以选择压缩)。
  6. merge的默认因子是10(可以理解为每10个小文件合并成一个大文件),可以考虑增大这个因子,但是仍旧不建议这样做,因为会增加底层运算的复杂度。
  7. ReduceTask默认的启动阈值为0.05,即当有5%的MapTask结束,ReduceTask就可以启动抓取数据了,在MapTask数量较大时,可以考虑适当的减小该阈值。
上一篇下一篇

猜你喜欢

热点阅读