spark shuffle v2

2020-07-13  本文已影响0人  LancerLin_LX

spark shuffle

Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂
在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。原文链接:https://blog.csdn.net/zhanglh046/article/details/78360762

总的来说,spark跟MR的shuffle并没有多大区别,都涉及到map(写数据的阶段),跟reduce(读数据阶段)。

spark shuffle 执行流程

本文通过源码分析spark shuffle的执行过程,以及相关参数的调优。

通过分析spark 提交的源码,我们可以知道,最终调用的是org.apache.spark.scheduler.TaskrunTask方法,而Task有2个子类,ShuffleMapTask(write(也可能存在先read后write,最后阶段是write)相当于MR中的Map阶段)跟ResultTask(开始阶段是read,相当于MR中的Reduce阶段)

image.png

shuffle write阶段

image.png

查看ShuffleMapTask可以知道有3种Writer,这里我们只讨论最常用的SortShuffleWriter

Shuffle write 的'HashMap' 跟'Array'

Shuffle write没有使用常见的collection或者map,而是用一个大数组,第一位存储key,key的下一位存储value,存储的格式类都是K: (getPartition(key), key) V: value
其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,黑色部分存储 Value。如下图:

image.png

1.PartitionedAppendOnlyMap

PartitionedAppendOnlyMap extends AppendOnlyMap, AppendOnlyMap的官方介绍是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是类似 HashMap,但没有remove(key)方法。

具体操作拿到数据后

2.PartitionedPairBuffer

相对比较简单,不需要mapCombine,只需要将数据按照kv追加到数组后面,如下图。


image.png

spill溢写磁盘与PartitionedAppendOnlyMap一样,不过不需要移动数据,填充空缺的位置,数据本身就是紧密的。

Shuffle write 合并

通过PartitionedAppendOnlyMap或者PartitionedPairBuffer操作完所有的数据后,会生成一个内存collection和0个或者多个分区且排序的文件(如果数据量过大有spill操作),最后通过外排将内存的数据跟spill的文件数据,通过merge sort合并成1个分区且排序的大文件(shuffleId_mapId_0.data),跟一个索引文件(shuffleId_mapId_0.index),类似kafka里的segment跟.index文件。索引主要描述每个分区对应的数据,比如0-100是0号分区,101-200是1号分区的数据,为了给reducer fetch对应分区的数据。

shuffle read阶段

shuffle read阶段,其实就是读取数据的阶段,你可以理解成,client向server发送请求,下载数据。主要是从client读取数据的过程,超时、并发度、异常重试等方面入手,server端则通过调整处理的并发数方面入手。

Shuffle read过程

ResultTask调用runTask最终调用的是ShuffleRdd的compute方法,然后我们可以看到实际上是BlockStoreShuffleReader的read方法。read方法中,通过new ShuffleBlockFetcherIterator(), 注意这里有4个优化参数, new ShuffleBlockFetcherIterator的时候,通过mapOutputTracker获取属于自己的Iterator[(BlockManagerId, Seq[(BlockId, Long)])],然后ShuffleBlockFetcherIterator的initialize方法,是整个read切分读request的逻辑。

new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      serializerManager.wrapStream,
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
      SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
      SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
      SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

ShuffleBlockFetcherIterator.initialize

首先将val targetRequestSize = math.max(maxBytesInFlight / 5, 1L),默认是48m/5=9.6m。判断拉取的数据是否大于9.6m或者一个address拉取的blocks数大于maxBlocksInFlightPerAddress(默认是Int.MaxValue),所以只由9.6m控制,如果是,则封装成一个new FetchRequest(address, curBlocks),最后会封装成N个FetchRequest。然后开始遍历拉数据,判断是否isRemoteBlockFetchable,逻辑是

    def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
      fetchReqQueue.nonEmpty &&
        (bytesInFlight == 0 ||
          (reqsInFlight + 1 <= maxReqsInFlight &&
            bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
    }

总之就是正在拉取的数据不能大于spark.reducer.maxSizeInFlight(默认48m)并且请求数不能超过maxReqsInFlight(Int.MaxValue),不然就进入等待的deferredFetchRequests队列。所以,为了提高shuffle read的request并发读的数量,可以提高maxBytesInFlight(默认48m)的大小。并且单个address拉取的blocks数不能超过maxBlocksInFlightPerAddress(默认是Int.MaxValue),所以,降低maxBlocksInFlightPerAddress可以降低同时拉取的blocks数量,防止同时拉取多个blocks导致io过高,导致服务无响应、io超时等异常。
最终执行sendRequest请求拉取数据,调优参数spark.shuffle.io.maxRetries(默认是3)拉取失败重试的次数, spark.shuffle.io.retryWait(默认是5秒)失败后等待5秒后尝试重新拉取数据。

spark shuffle 参数调优

为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。所以,开启堆外内存对于调优非常重要

shuffle write阶段参数调优

shuffle read阶段参数调优

ExternalShuffleService

Spark 的 Executor 节点不仅负责数据的计算,还涉及到数据的管理。如果发生了 shuffle 操作,Executor 节点不仅需要生成 shuffle 数据,还需要负责处理读取请求。如果 一个 Executor 节点挂掉了,那么它也就无法处理 shuffle 的数据读取请求了,它之前生成的数据都没有意义了。

为了解耦数据计算和数据读取服务,Spark 支持单独的服务来处理读取请求。这个单独的服务叫做 ExternalShuffleService,运行在每台主机上,管理该主机的所有 Executor 节点生成的 shuffle 数据。有读者可能会想到性能问题,因为之前是由多个 Executor 负责处理读取请求,而现在一台主机只有一个 ExternalShuffleService 处理请求,其实性能问题不必担心,因为它主要消耗磁盘和网络,而且采用的是异步读取,所以并不会有性能影响。

解耦之后,如果 Executor 在数据计算时不小心挂掉,也不会影响 shuffle 数据的读取。而且Spark 还可以实现动态分配,动态分配是指空闲的 Executor 可以及时释放掉。


image.png

ExternalShuffleService参数调优

image.png

ExternalShuffleService本质是一个基于Netty写的Netty服务,所以相关调优就是对Netty参数的调优,主要有以下这些参数,具体调整,需要根据实际情况做出相应的调整,提高服务稳定性。

// 服务启动时处理请求的线程数,默认是服务器的cores * 2
spark.shuffle.io.serverThreads
// ChannelOption.SO_RCVBUF,
spark.shuffle.io.receiveBuffer
// ChannelOption.SO_BACKLOG
spark.shuffle.io.backLog
// ChannelOption.SO_SNDBUF
spark.shuffle.io.sendBuffer

实际应用的效果

image.png

T: 使用的内存 1T=1024G
P: 配置spark.sql.shuffle.partitions,1P=1000
C: cpu cores数量

参考链接
https://blog.csdn.net/zhanglh046/article/details/78360762
https://github.com/JerryLead/SparkInternals
https://www.cnblogs.com/itboys/p/9201750.html
https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
https://blog.csdn.net/pre_tender/article/details/101517789

上一篇下一篇

猜你喜欢

热点阅读