spark 4种 shuffle机制与mapreduce shu
纵观整个mapreduce过程会发现存在许多的排序和文件合并操作。
为什么要排序,主要原因有:
1、key的存在combiner操作,排序之后相同的key放到一块显然方便做合并操作。
2、reduce task是按key去处理数据的。 如果没有排序那必须从所有数据中把当前相同key的所有value数据拿出来,然后进行reduce逻辑处理。显然每个key到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个key对于的value集合。
3、reduce task按key去处理数据时,如果key按顺序排序,那么reduce task就按key顺序去读取,显然当读到的key是文件末尾的key那么久标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些key处理完了,哪些key没有处理完。
为什么要文件合并,主要原因有:
1、因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据。
2、任务去读取文件的数增多,打开的文件句柄数也会增多
3、mapreduce是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。
虽有千万种理由需要这么做,但是很耗资源,并且像排序其实我们有些业务并不需要排序。在hadoop 2.x 排序就变为可选了。
spark的shuffle是在mapreduce shuffle基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在spark中shuffle write相当于mapreduce 的map,shuffle reade相当于mapreduce 的reduce.
spark shuffle分4种
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager
1、未经优化的HashShuffleManager,其原理见下图
从图中可以看到,相比mapreduce,排序不见了,文件合并不见了。上游task写文件的时候只是将数据按分区追加到文件中,并没有像mapreduce 那样先内存溢写成文件,然后再文件与文件之间进行合并,虽然节省了排序、合并的开销。但有一个弊端就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。如上图 下游有3个shuffle reade task ,那每个上游shuffle write就会形成3个文件。 形成的文件数是 shuffle reade个数 × shuffle write个数。
2、优化的HashShuffleManager,其原理见下图
相比第一种机制。就是在一个excutor中的task是可以共用一个buffer内存。在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,而是允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。此时的文件个数是 CPU core的数量 × 下一个stage的task数量。
为了开启优化后的HashShuffleManager,我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager
3、SortShuffleManager,其原理见下图
这种机制和mapreduce差不多,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量,由于每个task最终只有一个磁盘文件所以文件个数等于上游shuffle write个数。
4、bypass运行机制
相比第3中少了排序,task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
bypass运行机制的触发条件如下:
1、shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
2、不是聚合类的shuffle算子(比如reduceByKey)。因为不像第3种机制那样会对聚合类算子以map的数据结构存储,在写的过程中会先进行局部聚合。
spark shuffle 优于mapreduce shuffle的原因1、减少了磁盘io
2、可选的shuffle和排序