流式计算

Spark Streaming(九):性能调优

2018-11-07  本文已影响0人  雪飘千里

1、数据接收并行度调优

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams 
= new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream 
= streamingContext.union(kafkaStreams.get(0),
kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

2、任务启动调优

如果每秒钟启动的task过于多,比如每秒钟启动50个,那么发送这些task去Worker节点上的Executor的性能开销,会比较大,而且此时基本就很难达到毫秒级的延迟了。

使用下述操作可以减少这方面的性能开销:

1、Task序列化:使用Kryo序列化机制来序列化task,可以减小task的大小,从而减少发送这些task到各个Worker节点上的Executor的时间;
2、执行模式:在Standalone模式下运行Spark,可以达到更少的task启动时间。

上述方式,也许可以将每个batch的处理时间减少100毫秒,从而从秒级降到毫秒级。

3、 数据处理并行度调优

如果在计算的任何stage中使用的并行task的数量没有足够多,那么集群资源是无法被充分利用的。举例来说,对于分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行task的数量是由spark.default.parallelism参数决定的。你可以在reduceByKey等操作中,传入第二个参数,手动指定该操作的并行度,也可以调节全局的spark.default.parallelism参数;

4、数据序列化调优

数据序列化造成的系统开销可以由序列化格式的优化来减小。

在流式计算的场景下,有两种类型的数据需要序列化。

在上述的场景中,使用Kryo序列化类库可以减小CPU和内存的性能开销。使用Kryo时,一定要考虑注册自定义的类,并且禁用对应引用的tracking(spark.kryo.referenceTracking)。

在一些特殊的场景中,比如需要为流式应用保持的数据总量并不是很多,也许可以将数据以非序列化的方式进行持久化,从而减少序列化和反序列化的CPU开销,而且又不会有太昂贵的GC开销。举例来说,如果你数秒的batch interval,并且没有使用window操作,那么你可以考虑通过显式地设置持久化级别,来禁止持久化时对数据进行序列化。这样就可以减少用于序列化和反序列化的CPU性能开销,并且不用承担太多的GC开销。

5、batch interval调优(最重要)

如果想让一个运行在集群上的Spark Streaming应用程序可以稳定,它就必须尽可能快地处理接收到的数据。换句话说,batch应该在生成之后,就尽可能快地处理掉。
对于一个应用来说,这个是不是一个问题,可以通过观察Spark UI上的batch处理时间来定。batch处理时间必须小于batch interval时间。

基于流式计算的本质,batch interval对于,在固定集群资源条件下的应用能保持的数据接收速率,会有巨大的影响。例如,在WordCount例子中,对于一个特定的数据接收速率,应用业务可以保证每2秒打印一次单词计数,而不是每500ms。
因此batch interval需要被设置为,让预期的数据接收速率可以在生产环境中保持住。

计算正确的batch大小的比较好的方法,是在一个很保守的batch interval,比如5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,可以检查每个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的。否则,如果batch调度的延迟持续增长,那么就意味应用无法跟得上这个速率,也就是不稳定的。因此你要想有一个稳定的配置,可以尝试提升数据处理的速度,或者增加batch interval。
记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。

6、内存调优

首要就是优化Spark应用的内存使用和GC行为,这个在之前的Spark调优中已经写过了。
这里主要是与Spark Streaming应用相关的调优参数。

Spark Streaming应用需要的集群内存资源,是由使用的transformation操作类型决定的。
举例来说,如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。
如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。
反过来说,如果想要做一个简单的map-filter-store操作,那么需要使用的内存就很少。

通常来说,通过Receiver接收到的数据,会使用StorageLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上,而溢写到磁盘上,是会降低应用的性能的。
因此,通常是建议为应用提供它需要的足够的内存资源。
建议先在一个小规模的场景下测试内存的使用量,并进行评估。

下面展示 一些基于内存和GC策略调优的参数。

1、DStream的持久化:正如在前面第四点 “数据序列化调优” 中提到的,输入数据和某些操作生产的中间RDD,默认持久化时都会序列化为字节,与非序列化的方式相比,这会降低内存和GC开销。
使用Kryo序列化机制可以进一步减少内存使用和GC开销,为了进一步降低内存使用率,还可以对数据进行压缩,由spark.rdd.compress参数控制(默认false)。

2、清理旧数据:默认情况下,所有输入数据和通过DStream transformation操作生成的持久化RDD,会自动被清理。Spark Streaming会决定何时清理这些数据,取决于transformation操作类型。
例如,你在使用窗口长度为10分钟内的window操作,Spark会保持10分钟以内的数据,时间过了以后就会清理旧数据。但是在某些特殊场景下,比如Spark SQL和Spark Streaming整合使用时,在异步开启的线程中,使用Spark SQL针对batch RDD进行执行查询。那么就需要让Spark保存更长时间的数据,直到Spark SQL查询结束。
当然我们可以使用streamingContext.remember参数来让数据保留更长的时间。

3、CMS垃圾回收器:推荐使用并行的mark-sweep垃圾回收机制,用来保持GC低开销。虽然并行的GC会降低吞吐量,但是还是建议使用它,来减少batch的处理时间(降低处理过程中的gc开销)。
如果要使用,那么要在driver端和executor端都开启。
在spark-submit中使用--driver-java-options设置;
也可以使用spark.executor.extraJavaOptions参数设置。
-XX:+UseConcMarkSweepGC。

上一篇 下一篇

猜你喜欢

热点阅读