Spark 性能优化 高级篇

2018-05-17  本文已影响0人  博弈史密斯

本篇文章的上篇是:Spark 性能调优 基础篇

1. 数据倾斜调优

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

数据倾斜,本质上就是:分配给 Task 的数据 不均衡的现象

1.1 数据倾斜发生时的现象

1.2 数据倾斜发生的原理

在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。

数据量大的 Task会比其他 Task 执行时间更长,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

1.3 如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:**ByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

1.3.1 某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

可以通过Spark Web UI来查看当前运行到了第几个stage。
在启动spark-shell时,调试信息会给出spark driver’s UI的入口,如:
Spark context Web UI available at http://192.168.56.156:4040
在浏览器输入 ip:端口号 进入 Spark Web UI 界面,点击 Stages,下面看到 Tasks:



倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

数据倾斜只发生在 Shuffle,通过Web UI,可确定数据倾斜发生在哪个 Stage,然后再定位代码,确定是哪个 Shuffle类算子。

1.3.2 某个task莫名其妙内存溢出的情况

内存溢出 可能是 task 数据量任务过大,可能发生了数据倾斜。如果运行在 YARN 上,可查看 yarn-cluster 模式下的log中的异常栈,定位到你的代码中哪一行发生了内存溢出,可能会看到 Shuffle 类算子。通过 Web UI 辅助查看报错的那个stage 的各个 task 的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

1.4 查看导致数据倾斜的key的数据分布情况

根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

1.5 数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

可以通过Hive来进行数据预处理然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

但是这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

解决方案二:过滤少数导致倾斜的key

如果发现导致倾斜的key就少数几个,而且对数据分析结果影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

比如,在Spark SQL 中可以使用 where子 句过滤掉这些key,或者在Spark Core 中对执行filter算子过滤掉这些key。

解决方案三:提高shuffle操作的并行度

这种方案是处理数据倾斜最简单的一种方案。

在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,这5个key都是分配给一个task的。而增加了shuffle read task以后,每个 task 就分配到一个key,那么自然每个task的执行时间都会变短了。

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理。

解决方案四:两阶段聚合(局部聚合+全局聚合)

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

将原本相同的 key 通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个task处理数据量过多的问题。

对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。但如果是join类的shuffle操作,还得用其他的解决方案。

解决方案五:将reduce join转为map join

在对RDD使用 join 类操作,发生了数据倾斜,而且join操作中的一个 RDD数据量比较小(比如几百M或者一两G),比较适用此方案。

不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着另外一个RDD从 Broadcast变量中获取较小RDD的全量数据,将两个RDD的数据连接起来。

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个RDD是比较小的,则可以采用广播小 RDD 全量数据 + map算子来实现与 join 同样的效果,也就是map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。具体原理如下图所示。


image.png

这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果广播的RDD数据比较大就可能发生内存溢出了。因此并不适合两个都是大表的情况。

解决方案六:采样倾斜key并分拆join操作

两个RDD 进行 join的时候,如果数据量都比较大,如果 RDD 的少数几个 key 数据量很大,可以采取类似 解决方案四 的方式,对数据量大的 key 打上一个随机数,分成了若干个小数据量的 key,然后再进行 join。

如果导致倾斜的key特别多的话,这种方式也不适合。

解决方案七:使用随机前缀和扩容RDD进行join

如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

该方案的实现思路基本和“解决方案六”类似:对两个要 join 的RDD 的每条数据,都进行 n倍扩容(每条数据 随机打上 0~n 的前缀)。

这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。

比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。

大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

2. shuffle调优

大多数Spark作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理:Spark Shuffle,以及相关参数的说明,同时给出各个参数的调优建议。

2.1 shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
spark.shuffle.io.maxRetries
spark.shuffle.io.retryWait
spark.shuffle.memoryFraction
spark.shuffle.manager
spark.shuffle.sort.bypassMergeThreshold
spark.shuffle.consolidateFiles

参考

https://tech.meituan.com/spark-tuning-pro.html

上一篇 下一篇

猜你喜欢

热点阅读