spark性能优化四:Java虚拟机调优
1、Java虚拟机垃圾回收调优的背景
Spark虽然是scala开发的,但是其中也调用了很多的java api,而且scala也是运行在Java虚拟机中,所以spark是运行在java虚拟机中的。
如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找到那些已经不再使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。
垃圾回收的性能开销,是跟内存中的对象的数量,成正比的。所以,对于垃圾回收的性能问题,首先要做的就是,使用更高效的数据结构,如果array和string;其次就在持久化RDD时,使用序列化的持久化级别,而且用Kryo序列化类库,这样,每个partition就只是一个对象——一个字节数组
2、监测垃圾回收
我们可以对垃圾回收进行监测,包括多久进行一个垃圾回收,以及每次垃圾回收耗费的时间;只要在spark-submit脚本中,增加一个配置既可,
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。
注:这里打印出Java虚拟机的垃圾回收的相关信息,是输出到worker上的日志中了
只是其中一种监测方法,还可以通过SparkUI(4040端口)来观察每个stage的垃圾回收的情况。
3、垃圾回收过程
Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代放的是短时间存活的对象,老年代放的是长时间存活的对象,年轻代又被划分为三块空间,Eden、Survivor1、Survivor2;
-
首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC(小型垃圾回收),进行年轻代的垃圾回收——Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中,然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。
-
其次,如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,默认eden、survior1、survior2的内存比例是8:1:1,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2已经满了,那么就会直接放入老年代,这时就会出现,短时间存活的对象,进入老年代的问题。如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。
如果你的JVM内存不够大的话,可能导致频繁的年轻代内存满溢,频繁的进行minor gc。频繁的minor gc会导致短时间内,有些存活的对象,多次垃圾回收都没有回收掉。会导致这种短声明周期(其实不一定是要长期使用的)对象,年龄过大,垃圾回收次数太多还没有回收到,跑到老年代。
老年代中,可能会因为内存不足,囤积一大堆,短生命周期的,本来应该在年轻代中的,可能马上就要被回收掉的对象。此时,可能导致老年代频繁满溢。频繁进行full gc(全局/全面垃圾回收)。full gc就会去回收老年代中的对象。full gc由于这个算法的设计,是针对的是,老年代中的对象数量很少,满溢进行full gc的频率应该很少,因此采取了不太复杂,但是耗费性能和时间的垃圾回收算法。full gc很慢。
full gc / minor gc,无论是快,还是慢,都会导致jvm的工作线程停止工作,就是说,gc的时候,spark停止工作了。
总结,内存不充足的时候:
- 1、频繁minor gc,也会导致频繁spark停止工作
- 2、老年代囤积大量活跃对象(短生命周期的对象),导致频繁full gc,full gc时间很长,短则数十秒,长则数分钟,甚至数小时。可能导致spark长时间停止工作。
- 3、严重影响咱们的spark的性能和运行的速度。
4、Spark垃圾回收调优之executor内存
Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代,不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代,从而造成短时间存活的对象,长期呆在老年代中占据了空间,从而导致频繁Full GC。
Full GC时,要回收大量的短时间存活的对象,导致Full GC速度缓慢。
如果在task执行期间,发生了大量Full GC,那么说明,年轻代的Eden区域给的空间不够大,此时可执行一些操作来优化垃圾回收行为:
-
1、优化executor内存比例,降低cache操作的内存占比
对于Spark应用的垃圾回收来说,最重要是就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放。
在这种情况下,很有可能因为你的内存空间的不足,task创建的对象过多,那么一旦发现40%的内存空间不够用了,就会触发java虚拟机的垃圾回收操作,因此在极端情况下,垃圾回收操作可能会被频繁触发,导致spark频繁的停止工作,性能影响会很大。
在上述情况下,如果发现垃圾回收频繁发生,(可以通过spark UI来查看,可以看到stage的运行情况,包括stage每个task的运行时间,gc时间等)那么就需要会那个比例进行调优,使用new SparkConf().set("spark.storage.memoryFraction", "0.5"),——spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
——可以将RDD缓存占用空间的比例降低,从而给更多的空间让task创建的对象进行使用。
因此,对于RDD持久化,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗;给task提供更多的内存,从而避免task的执行频繁触发垃圾回收。 -
2、给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域,预计大小的4/3;
-
3、如果使用的是HDFS文件,那么很好估计Eden区域的大小,如果每个executor有4个task,然后每个hdfs压缩块加压缩后大小是3倍,此外每个HDFS块的大小是64M,那么Eden区域的预计大小就是:4 * 3 * 64MB,然后通过-Xmn参数,将Eden区域大小设置为4 * 3 * 64 * 3/4
5、Spark垃圾回收调优之executor堆外内存
有时候,如果spark作业处理的数据量特别特别大,几亿数据量;然后spark作业一运行,时不时的报错,shuffle file cannot find,executor、task lost,out of memory(内存溢出);
可能是说executor的堆外内存不太够用,导致executor在运行的过程中,可能会内存溢出;然后可能导致后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件,但是executor可能已经挂掉了,关联的block manager也没有了;所以可能会报shuffle output file not found;resubmitting task;executor lost;spark作业彻底崩溃。
上述情况下,就可以去考虑调节一下executor的堆外内存。也许就可以避免报错;此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。
比如在一个spark作业中,有两个stage,在作业运行的时候,stage0的executor挂了,block manager也没有了;此时,stage1的executor的task,虽然通过Driver的MapOutputTrakcer获取到了自己数据的地址;但是实际上去找对方的block manager获取数据的时候,是获取不到的,此时会在本机就会打印出log shuffle output file not found。。。(使用
client—standalone client、yarn client—模式模式)
DAGScheduler resubmitting task时,会一直会挂掉,反复挂掉几次,反复报错几次,这会导致整个spark作业崩溃了
调节方式:--conf spark.yarn.executor.memoryOverhead=2048
注意,这个针对的是基于yarn的提交模式;
默认情况下,这个堆外内存上限大概是300多M;后来我们通常项目中,真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G(1024M),甚至是2G、4G
通常这个参数调节上去以后,就会避免掉某些JVM OOM的异常问题,同时呢,会让整体spark作业的性能,得到较大的提升。
6、Executor gc引起的连接超时
Executor的task在执行算子操作时,优先从本地关联的BlockManager中获取数据,如果本地blockManger中没有,那么会通过TransferService去远程连接其他节点上Executor的BlockManager去获取。
如果此时,正好碰到那个executor的JVM在垃圾回收,此时就会没有响应,无法建立网络连接,会卡住,Spark默认的的网络连接超时时间是60s,如果60s都无法建立连接的话,就宣告失败。
之前碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。
这种情况下,很有可能是有那份数据的executor在jvm gc。所以拉取数据的时候,建立不了连接。然后超过默认60s以后,直接宣告失败。
报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stage。TaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。
这时候可以考虑调节连接的超时时长。
--conf spark.core.connection.ack.wait.timeout=300
调节这个值比较大以后,通常来说,可以避免部分的偶尔出现的某某文件拉取失败,某某文件lost掉了。。。
在真正处理大数据(不是几千万数据量、几百万数据量),几亿,几十亿,几百亿的时候。很容易碰到executor堆外内存,以及gc引起的连接超时的问题。file not found,executor lost,task lost。
调节上面两个参数,还是很有帮助的。
/usr/local/spark/bin/spark-submit \
--class com.ibeifeng.sparkstudy.WordCount \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar \
7、总结
根据经验来看,对于垃圾回收的调优,尽量是调节executor内存的比例就可以了,因为jvm的调优是非常复杂和敏感的,除非到了万不得已的地方,同时,自己对jvm相关的技术很了解,然后才进行Eden区域的调节
注:一些高级的参数
-XX:SurvivorRatio=4:如果值为4,那么就是两个Survivor跟Eden的比例是2:4,也就是说每个Survivor占据的年轻代比例是1/6,所以,也可以尝试调大Survivor区域的大小;
-XX:NewRatio=4:调节新生代和老年代的比例