Spark Shuffle

2018-07-14  本文已影响0人  wangdy12

Spark是一种MapReduce框架的实现,与Hadoop一样,具有Shuffle阶段

Shuffle是连接map和reduce之间的桥梁,它将map(ShuffleMapTask)的输出对应到reduce(ShuffleMapTask或ResultTask)输入中,这期间涉及到序列化反序列化、网络数据传输以及磁盘读写,可能还会有压缩解压,加密解密等,是非常耗时的操作

Shuffle有三个模块组成

Spark Shuffle演进

Tungsten Project 钨丝计划,是对Spark进行优化的一个项目,旨在提升CPU和内存的效率,tungsten-sort中涉及的内存分配,底层可以通过sun.misc.Unsafe类直接获取内存实现

Hash Shuffle

在map阶段(ShuffleMapTask),每个ShuffleMapTask都会为下游stage的每个partition写一个临时文件,它的问题在于创建的文件数目过多,现在已经不再使用

如果有M个ShuffleMapTask,下游stage有R个ShuffleMapTask或ResultTask,理论上会生成M * R个数据文件

涉及多个小文件的随机读取,硬盘的性能容易称为瓶颈,为了解决文件过多的问题,后来加入了Consolidate Files机制(文件合并机制),运行在同一个core的ShuffleMapTask,第一个会创建R个文件,后续task只是追加写入到已经创建的文件中,当每个task分配1个core时,文件总数为total-executor-cores * R

Sort Shuffle

ShuffleMapTask不再为每个Reducer生成一个单独的文件,而是将所有的结果写到一个文件里,同时产生一个索引index文件,减少了文件的数量,文件中的记录首先是按照分区的partition id顺序排列,index文件记录的各个分区对应文件内的偏移,文件总数是2 * M,Reduce阶段可以通过索引,获取相关数据,这样降低了随机磁盘IO与缓存的开销

Sort Based Shuffle的缺点是必须要进行排序,至少是要按照分区排序,如果指定了keyOrdering才需要在分区内部根据数据的key再次排序,相较于hash shuffle这是额外的性能消耗

它对应于SortShuffleManager,也是目前唯一的shuffle管理器

SortShuffleManager

SortShuffleManagerSparkContext初始化过程中的SparkEnv类初始化,创建ShuffleManager时初始化,当然也可以通过spark.shuffle.manager指定一种自定义的ShuffleManager

    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

在RDD创建过程中,如果对应为宽依赖,即ShuffleDependency,该依赖初始化过程中会通过SparkContext获取一个ShuffleId,然后通过ShuffleManager注册Shuffle,根据不同的情况返回不同的ShuffleHandle

  override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // reduce端分区数量少于spark.shuffle.sort.bypassMergeThreshold,没有map端的聚合操作
      // 不再进行归并排序,直接写numPartitions个文件,最后连接到一起,避免了序列化和反序列化,但是缓存需要较高
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // 以序列化的形式缓存输出
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // 以反序列化的形式缓存
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

ShuffleMapTask中的runTask(context: TaskContext)函数会通过shuffleManager获取ShuffleWriter,对RDD元素进行写出

var writer: ShuffleWriter[Any, Any] = null
try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
}

SortShuffleManager根据ShuffleHandle的不同类型,初始化对应的Shuffle Writer

  override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    //记录shuffle id => 该shuffle产生输出的mapper数目
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

ShuffledRDDcompute方法中获取shuffle结果,此外还有CoGroupedRDD等RDD也会请求获取Reader

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

SortShuffleManagergetReader实现

  override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

Shuffle Writer

共有三种类型的ShuffleWriter

Shuffle Reader

只有一种ShuffleReader:BlockStoreShuffleReader

MapOutputTracker中获取上游shuffle数据,通过网络获取远程数据块,本地数据直接读取,如果有需要在对数据进行聚合和排序,最终返回数据的一个迭代器详细说明

Hadoop shuffle

本质上与Spark shuffle的原理是一致的,但是在具体实现上有差别

MapReduce Shuffle Spark Shuffle
map端写出 在内存中构造一个缓冲区(默认100MB),超过则溢写,最终合并写出到本地 BypassMergeSortShuffleWriter直接写出,另外两种类型会存储在内存中,直到内存不足(这里内存相对较大)再溢写,最终同样合并写出到本地
map端数据的顺序 分区内部的数据同样是有序的 除非需要进行map端合并,分区内部不进行排序
copy reduce在每个map完成后立即开始复制它们的输出,即reduce端的任务不是等map端结束才启动的 map,reduce分别归属不同的Stage,Stage有明确的先后顺序,必须等到map task全部完成,才会启动reduce人进行拉取
内存使用 一般内存都不大,如果内存不足会进行写出,最后进行合并 通常内存较大,有专门的内存管理器,还可以通过配置允许使用堆外内存,数据尽可能的存在内存中

Spark中shuffle数据直接写出,每个文件对应的缓冲器大小默认为32KB,通过spark.shuffle.file.buffer设定

spill merge过程是否是两次IO
拉取的文件是直接位于内存还是写入到磁盘

Shuffle and sort in MapReduce

Hadoop MapReduce Shuffle map过程的数据在写磁盘时,task首先将数据写出到环形缓冲区(默认大小100MB,mapreduce.task.io.sort.mb),达到阈值就启动后台线程将数据溢出写到磁盘,此时map的输出会继续写入缓存,如果缓存被填满,map会被阻塞直到溢写过程完成

写出到磁盘之前,后台线程首先要划分分区,在每个分区内部,后台线程按照键值进行内存中排序,如果存在combiner函数,就在排序后的数据上进行结合

每次内存缓冲区达到溢出阈值,会新建一个溢出文件,最后所有的溢出文件被合并成一个已分区且已排序的输出文件,如果至少存在3个溢出文件,那么combiner就会在输出文件写到磁盘之前再次运行

reduce端因为map task可能在不同时间完成,因此reduce task在每个map完成后立即开始复制它们的输出,这称为reduce任务的复制阶段copy phase,reduce task具有少量的复制线程(默认是5,mapreduce.reduce.shuffle.parallelcopies),可以并行获取map输出,获取的数据优先放置在内存中,否则写出到磁盘,随着拷贝的增多,后台线程将他们合并为更大的有序文件,方便之后的merger

当所有的map输出都复制过来以后,reduce task进入sort phase或者叫做merger phase,将map的输出合并,并保证顺序,这里可能涉及到多轮次合并(合并因子默认为10,mapreduce.task.io.sort.factor),在此过程中会省略最后一轮次合并结果写入磁盘的过程,直接将数据输入reduce函数,即reduce phase

合并因子为10,有效合并40个文件片段的方式
上一篇下一篇

猜你喜欢

热点阅读