spark性能优化一:系统级优化
1、概览
Spark计算本质是基于内存的,所以Spark程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存;如果内存能够容纳得下所有数据,那么网络传输和通信就会导致性能出现瓶颈;但是如果内存比较紧张,不足以放下都有的数据(比如在针对10亿以上的数据量进行计算时),还是需要对内存的使用进行性能优化的,比如说使用一下手段来减少内存的消耗。
Spark性能优化,其实主要就是在于对内存的使用进行调优;
因为通常情况下,如果Spark应用程序计算的数据量比较小,并且内存足够用,那么只要运维保障网络正常,一般是不会有大的性能问题的;但是Spark应用程序的性能问题往往是针对大数据量(10亿级别)进行计算时出现,因此通常来说,Spark性能优化,主要是针对内存进行性能优化;当然除了内存调优外,还有别的可以调。
2、分配更多资源
性能调优的王道,其实就是增加更多的资源,基本上,在一定范围内,增加资源与性能的提升是成正比的;当我们写完一个复杂的Spark应用后,第一步,就是要调节最优的资源配置,当公司的资源达到了自己能利用的顶端后,才需要考虑下面这些优化技术。
下面来讨论下怎么分配更多资源
-
1、分配哪些资源
executor(executor数量)、cpu per executor(每个executor的cpu)、memory per executor(每个executor的内存)、dirver memory -
2、在哪里分配
在我们生产环境中,提交spark作业时用的spark-submit shell脚本,可以调整对应的参数
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的数量
--driver-memory 100m \ 配置driver的内存(影响不大)
--executor-memory 100m \ 配置每个executor的内存大小
--executor-cores 3 \ 配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
-
3、分配多少
- Yarn 集群:查看spark应用要提交到的资源队列大概有多少资源;比如说,我们有500G内存,100个cpu core;那我们设置50个executor时,平均每个executor 会有10G内存,2个cpu core;
一个原则,能使用的资源有多大,就尽量去调节到最大的大小
- Yarn 集群:查看spark应用要提交到的资源队列大概有多少资源;比如说,我们有500G内存,100个cpu core;那我们设置50个executor时,平均每个executor 会有10G内存,2个cpu core;
-
4、为什么这么分配,分配后性能怎么提升?
SparkContext中的DAGScheduler、TaskScheduler会将我们的算子,切割成大量的task,提交到Application的executor中去执行。-
增加executor:如果executor数量比较少,那么能够并行执行的task数据就比较少,这意味着我们的Application的并行执行的能力比较弱;
比如现在又3个executor,每个executor有2个cpu core,那么只能同时并行执行6个task,6个执行完以后,再换下一批6个task。
增加了executor数量以后,意味着能够并行执行的task数量变多了,比如原先是6个,现在可以并行执行10个、20个,那么并行能力就比之前提升了数倍;相应的,性能(执行的速度)也能提升数倍。 -
增加每个executor的cpu core:也是增加了执行时的并行能力,原本20个executor,每个才2个cpu core,能够并行执行的task数量是40个;
现在每个executor的cpu core增加到了5个,能够并行执行的task数量,就是100个。执行的速度,提升了2.5倍 -
增加每个executor的cpu memory:内存增加以后,当需要对RDD进行Cache时,就可以缓存更多的数据,这会减少数据写入磁盘,甚至不写入磁盘,减少了磁盘IO;对于Shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合,如果内存不够,就会写入磁盘,如果executor有更多的内存,同理也会减少磁盘IO;同时,task执行时,可能会创建很多对象,如果内存较小,可能会频繁GC,minor GC和full GC(速度很慢),当内存加大以后,带来更少的GC,性能提升
-
3、提高并行度
分配了更多资源以后,还需要充分的利用,所以要尽量设置合理的并行度,来充分利用集群的资源,从而提高Spark应用程序的性能。
那么,Spark并行度指的是什么?
Spark作业中,Application,jobs,action(collect)操作会触发一个job;每个job会拆成多个stage,啥时候拆分呢?是在发生shuffle(reduceByKey)的时候,会拆分出一个stage。详细过程见前面写过的stage划分算法
其实,spark并行度指的是Spark作业中,各个阶段(stage)的task数量;
-
为什么要调节并行度呢?
假设,现在已经已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,每个executor有10G内存,每个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。但是,我们task数量没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,每个executor只会并行运行2个task。每个executor剩下的一个cpu core,就浪费掉了。
这就会导致,资源虽然分配够了,但是并行度(task数量)没有与资源相匹配,出现了浪费。 -
设置多少
Spark官方推荐,给集群中的每个cpu core设置2~3个task。也就是说,task数量,设置成spark application总 cpu core数量的2-3倍。比如150个core,基本要设置task数量为300~500; -
如何设置
可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度;
SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")
如果没有设置的话,Spark会自动以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。
4、优化技术
通常情况下,当资源和并行度都合理设置好以后,spark作业就很快了,可能几分钟就跑完了。这时候如果还需要优化,那就是优化spark作业代码了。
- 重构RDD架构和RDD持久化
- Shuffle调优(最重要)
- reduceByKey和groupByKey的合理使用
- spark算子调优
- 数据本地化
- 使用序列化的持久化级别
- 广播共享数据
- 使用高性能的序列化类库
- 优化数据结构(使用fastutil)
- Java虚拟机垃圾回收调优
关于优化这里说明一下,每个优化技术重要性并不是一样的,起的作用也不是一样的,这点一定要分清。对于性能来说,打个比方,我们要做一桌菜,最重要的是主菜,那么对于优化来说,分配资源,并行度设置,RDD架构和持久化就是三道主菜;shuffle调优、groupByKey用ruduceByKey改写、数据本地化,就是配菜,虽然没有主菜那么重要,但是也是必不可少的;至于broadcast、kryo、fastutil,就是一些特殊的调料,有的话更好吃,但是没有的话,也不会影响特别大。所以优化的过程中,一定要把握到优化的技术点,不要在不重要的点上花费太多的时间。
直观一点,假如我们的spark作业,在没有经过调优的时候,大概30分钟运行完成,
现在如果我们能申请到更多的资源(资源更大的Yarn队列),可能3分钟就可以完成;
如果我们shuffle调优,groupByKey用redueceBykey改写,数据本地聚合,可能15分钟可以完成;
如果我们broadcast、kryo、fastutil、jvm调优,可能25分钟可以完成;
从这里就可以看出,我们优化时的顺序。
5、诊断内存消耗
-
首先,设置RDD的并行度,有两种方法,第一种,在parallelize()、textFile()等方法中传入第二个参数,设置RDD的task/partition;第二种,用SparkConf.set()方法,设置一个参数(spark.default.parallelism),可以统一设置这个application所有RDD的partition数据
-
其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可
-
最后,观察Driver的log,找到类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息,这显示了每个partition占用了多少内存
-
将这个内存信息乘以partition数量,即可得出RDD的内存占用量