Spark策略大本营:自由,平等,友爱。工具文

spark-shuffle原理&调优

2021-05-11  本文已影响0人  LancerLin_LX

spark-shuffle

Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂
在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。原文链接:https://blog.csdn.net/zhanglh046/article/details/78360762

总的来说,spark跟MR的shuffle并没有多大区别,都涉及到map(写数据的阶段),跟reduce(读数据阶段)。

spark shuffle 执行流程

image.png

本文通过源码分析spark shuffle的执行过程,以及相关参数的调优。

通过分析spark 提交的源码,我们可以知道,最终调用的是org.apache.spark.scheduler.TaskrunTask方法,而Task有2个子类,ShuffleMapTask(write(也可能存在先read后write,最后阶段是write)相当于MR中的Map阶段)跟ResultTask(开始阶段是read,相当于MR中的Reduce阶段)

大的流程shuffle-write需要经过write buffer、sort and spill、merge file三个阶段,细节上还是有差异,下面通过分析源码讲解shuffle-write的过程

image.png

shuffle write阶段

image.png

ShuffleMapTaskrunTask方法中查看SortShuffleManagerregisterShuffle()

image.png

然后在getWriter方法中,根据registerShuffle返回的结果判断

最后调用Writerwrite 方法,所以,shuffle-write的具体流程分为BypassMergeSortShuffleWriter , UnsafeShuffleWriter , SortShuffleWriter三种write的流程

image.png

BypassMergeSortShuffleWriter 流程

image.png

BypassMergeSortShuffleWriter writer流程很简单

这个Writer的优势是不需要排序,能够调整的参数

可根据实际情况适当调整以上参数

UnsafeShuffleWriter 流程

image.png image.png image.png image.png image.png image.png image.png

SortShuffleWriter 流程

前面介绍的2种Writer都有限制,可能会走上面2种Writer的算子有groupByKey、sortByKey、coalesce.最后的SortShuffleWriter像个万金油,使用无限制。

image.png image.png image.png image.png image.png image.png image.png image.png

首先,shuffle write分为write buffer、spill disk、merge file 三个阶段,分别对应上图的1,2,3

writer buffer 阶段

需要map端聚合 PartitionedAppendOnlyMap

不需要map端聚合 PartitionedPairBuffer

shuffle read阶段

不管Writer经过了那些流程,最后都会产生一个分区排序的大文件,以及一个索引文件(描述大文件的分区信息)
源码查看ResultTask runTask,发现最后调用的是rdd.iterator(partition, context),spark RDD执行通过递归迭代的方式(有点像new 对象的过程,都会先向上递归查看父类是否创建,spark中,子RDD都会查看父RDD是否计算或者缓存)。根据RDD链执行,通过getOrCompute,往前推,找到read的入口,ShuffleRdd的compute方法,找到ShuffleReader

image.png image.png

查看read方法,所以重点就在读取数据(远程+本地)获取wrappedStreams这个阶段,即重点分析通过ShuffleBlockFetcherIterator获取数据的流程

image.png

ShuffleBlockFetcherIterator

本质上就是就是读取数据的阶段,你可以理解成,client向server发送请求,获取数据,每个reducer通过读取index文件,再去大文件(shuffle write阶段合并后且分区排序文件),读取所属自己分区的数据。主要是从client读取数据的过程,超时、并发度、异常重试等方面入手,server端则通过调整处理的并发数方面入手。

image.png

ShuffleBlockFetcherIterator.initialize

image.png image.png image.png image.png

ExternalShuffleService

shuffle调优还有一点就是可以开启ExternalShuffleService 。
Spark 的 Executor 节点不仅负责数据的计算,还涉及到数据的管理。如果发生了 shuffle 操作,Executor 节点不仅需要生成 shuffle 数据,还需要负责处理读取请求。如果 一个 Executor 节点挂掉了,那么它也就无法处理 shuffle 的数据读取请求了,它之前生成的数据都没有意义了。

为了解耦数据计算和数据读取服务,Spark 支持单独的服务来处理读取请求。这个单独的服务叫做 ExternalShuffleService,运行在每台主机上,管理该主机的所有 Executor 节点生成的 shuffle 数据。有读者可能会想到性能问题,因为之前是由多个 Executor 负责处理读取请求,而现在一台主机只有一个 ExternalShuffleService 处理请求,其实性能问题不必担心,因为它主要消耗磁盘和网络,而且采用的是异步读取,所以并不会有性能影响。

解耦之后,如果 Executor 在数据计算时不小心挂掉,也不会影响 shuffle 数据的读取。而且Spark 还可以实现动态分配,动态分配是指空闲的 Executor 可以及时释放掉。

ExternalShuffleService本质是一个基于Netty写的Netty服务,所以相关调优就是对Netty参数的调优,主要有以下这些参数,具体调整,需要根据实际情况做出相应的调整。
spark.shuffle.io.serverThreads
spark.shuffle.io.receiveBuffer
spark.shuffle.io.backLog
spark.shuffle.io.sendBuffer

调优参数汇总

堆外内存相关

shuffle write阶段参数调优

shuffle read阶段参数调优

调优效果

image.png

T: 使用的内存 1T=1024G
P: 配置spark.sql.shuffle.partitions,1P=1000
C: cpu cores数量

参考链接
https://blog.csdn.net/zhanglh046/article/details/78360762
https://github.com/JerryLead/SparkInternals
https://www.cnblogs.com/itboys/p/9201750.html
https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
https://blog.csdn.net/pre_tender/article/details/101517789
https://www.bilibili.com/video/BV1sW41147vD?from=search&seid=12279554496967751348
https://www.jianshu.com/p/cda24891f9e4
https://cloud.tencent.com/developer/article/1513203

上一篇 下一篇

猜你喜欢

热点阅读