Spark流式计算

Spark之官方调优

2016-09-04  本文已影响0人  wisfern

由于大部分Spark计算都是在内存中完成的,所以Spark程序的瓶颈可能由集群中任意一种资源导致,如:CPU、网络带宽、或者内存等。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存。 本文将主要涵盖两个主题:1.数据序列化(这对于优化网络性能极为重要);2.减少内存占用以及内存调优。同时,我们也会提及其他几个比较小的主题。

1 数据序列化

序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。 通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库:

要切换使用 Kryo,你可以在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。 目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo

Spark对一些常用的Scala核心类型,如在Twitter chill 库的AllScalaRegistrar中,自动使用Kryo序列化格式。

如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo的文档中有详细描述了更多的高级选项,如:自定义序列化代码等。

如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer 配置项。其值至少需要大于最大对象的序列化长度。

最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。

2 内存调优

内存占用调优主要需要考虑3点:数据占用的总内存(你会希望整个数据集都能装进内存);访问数据集中每个对象的开销;垃圾回收的开销(如果你的数据集中对象周转速度很快的话)。

一般情况下,Java对象的访问时很快的,但同时Java对象会比原始数据(仅包含各个字段值)占用的空间多2~5倍。主要原因有:

本节只是Spark内存管理的一个概要,下面我们会更详细地讨论各种Spark内存调优的具体策略。特别地,我们会讨论如何评估数据的内存使用量,以及如何改进 – 要么改变你的数据结构,要么以某种序列化格式存储数据。 最后,我们还会讨论如何调整Spark的缓存大小,以及如何调优Java的垃圾回收器。

2.1 内存管理概览

Spark中内存主要用于两类目的:执行计算和数据存储。执行计算的内存主要用于Shuffle、关联(join)、排序(sort)以及聚合(aggregation),而数据存储的内存主要用于缓存和集群内部数据传播。Spark中执行计算和数据存储都是共享同一个内存区域(M)。 如果执行计算没有占用内存,那么数据存储可以申请占用所有可用的内存,反之亦然。执行计算可能会抢占数据存储使用的内存,并将存储于内存的数据逐出内存,直到数据存储占用的内存比例降低到一个指定的比例(R)。 换句话说,RM基础上的一个子区域,这个区域的内存数据永远不会被逐出内存。然而,数据存储不会抢占执行计算的内存。

这样设计主要有这么几个需要考虑的点。首先,不需要缓存数据的应用可以把整个空间用来执行计算,从而避免频繁地把数据吐到磁盘上。其次,需要缓存数据的应用能够有一个数据存储比例(R)的最低保证,也避免这部分缓存数据被全部逐出内存。最后,这个实现方式能够在默认情况下,为大多数使用场景提供合理的性能,而不需要专家级用户来设置内存使用如何划分。

虽然有两个内存划分相关的配置参数,但一般来说,用户不需要设置,因为默认值已经能够适用于绝大部分的使用场景:

2.2 评估内存消耗

确定一个数据集占用内存总量最好的办法就是,创建一个RDD,并缓存到内存中,然后再到web UI上”Storage”页面查看。页面上会展示这个RDD总共占用了多少内存。

要评估一个特定对象的内存占用量,可以用 SizeEstimator.estimate 方法。这个方法对试验哪种数据结构能够裁剪内存占用量比较有用,同时,也可以帮助用户了解广播变量在每个执行器堆上占用的内存量。

2.3 数据结构调优

减少内存消耗的首要方法就是避免过多的Java封装(减少对象头和额外辅助字段),比如基于指针的数据结构和包装对象等。以下有几条建议:

2.4 序列化RDD存储

如果经过上面的调整后,存储的数据对象还是太大,那么你可以试试将这些对象以序列化格式存储,所需要做的只是通过 RDD persistence API 设置好存储级别,如:MEMORY_ONLY_SERSpark会将RDD的每个分区以一个巨大的字节数组形式存储起来。以序列化格式存储的唯一缺点就是访问数据会变慢一点,因为Spark需要反序列化每个被访问的对象。 如果你需要序列化缓存数据,我们强烈建议你使用Kryo,和Java序列化相比,Kryo能大大减少序列化对象占用的空间(当然也比原始Java对象小很多)。

2.5 垃圾回收调优

JVM的垃圾回收在某些情况下可能会造成瓶颈,比如,你的RDD存储经常需要“换入换出”(新RDD抢占了老RDD内存,不过如果你的程序没有这种情况的话那JVM垃圾回收一般不是问题,比如,你的RDD只是载入一次,后续只是在这一个RDD上做操作)。当Java需要把老对象逐出内存的时候,JVM需要跟踪所有的Java对象,并找出哪些对象已经没有用了。 概括起来就是,垃圾回收的开销和对象个数成正比,所以减少对象的个数(比如用Int数组取代LinkedList),就能大大减少垃圾回收的开销。 当然,一个更好的方法就如前面所说的,以序列化形式存储数据,这时每个RDD分区都只包含有一个对象了(一个巨大的字节数组)。在尝试其他技术方案前,首先可以试试用序列化RDD的方式(serialized caching)评估一下GC是不是一个瓶颈。

如果你的作业中各个任务需要的工作内存和节点上存储的RDD缓存占用的内存产生冲突,那么GC很可能会出现问题。下面我们将讨论一下如何控制好RDD缓存使用的内存空间,以减少这种冲突。

衡量GC的影响

GC调优的第一步是统计一下,垃圾回收启动的频率以及GC所使用的总时间。给JVM设置一下这几个参数(参考Spark配置指南,查看Spark作业中的Java选项参数):-verbose:gc -XX:+PrintGCDetails,就可以在后续Spark作业的worker日志中看到每次GC花费的时间。 注意,这些日志是在集群worker节点上(在各节点的工作目录下stdout文件中),而不是你的驱动器所在节点。

高级GC调优

为了进一步调优GC,我们就需要对JVM内存管理有一个基本的了解:

Spark GC调优的目标就是确保老年代(Old generation )只保存长生命周期RDD,而同时新生代(Young generation)的空间又能足够保存短生命周期的对象。这样就能在任务执行期间,避免启动full GC。以下是GC调优的主要步骤:

我们的很多经验表明,GC调优的效果和你的程序代码以及可用的总内存相关。网上还有不少调优的选择,但总体来说,就是控制好full GC的启动频率,就能有效减少垃圾回收开销。

3 其他事项

3.1 并行度

一般来说集群并不会满负荷运转,除非你把每个操作的并行度都设得足够大。Spark会自动根据对应的输入文件大小来设置“map”类算子的并行度(当然你可以通过一个SparkContext.textFile等函数的可选参数来控制并行度),而对于想 groupByKeyreduceByKey这类 “reduce” 算子,会使用其各父RDD分区数的最大值。你可以将并行度作为构建RDD第二个参数(参考spark.PairRDDFunctions),或者设置spark.default.parallelism 这个默认值。一般来说,评估并行度的时候,我们建议2~3个任务共享一个CPU

3.2 Reduce任务的内存占用

如果RDD比内存要大,有时候你可能收到一个OutOfMemoryError错误,其实这是因为你的任务集中的某个任务太大了,如reduce任务groupByKeySparkShuffle算子(sortByKey,groupByKey,reduceByKey,join等)会在每个任务中构建一个哈希表,以便在任务中对数据分组,这个哈希表有时会很大。最简单的修复办法就是增大并行度,以减小单个任务的输入集。Spark对于200ms以内的短任务支持非常好,因为Spark可以跨任务复用执行器JVM,任务的启动开销很小,因此把并行度增加到比集群中总CPU核数没有任何问题。

3.3 广播大变量

使用SparkContext中的广播变量相关功能(broadcast functionality)能大大减少每个任务本身序列化的大小,以及集群中启动作业的开销。如果你的Spark任务正在使用驱动程序中定义的巨大对象(比如:静态查询表),请考虑使用广播变量替代。Spark会在master上将各个任务的序列化后大小打印出来,所以你可以检查一下各个任务是否过大;通常来说,大于20KB的任务就值得优化一下。

3.4 数据本地性

数据本地性对Spark作业往往会有较大的影响。如果代码和其所操作的数据在同一节点上,那么计算速度肯定会更快一些。但如果二者不在一起,那必然需要移动其中之一。一般来说,移动序列化好的代码肯定比挪动一大堆数据要快。Spark就是基于这个一般性原则来构建数据本地性的调度。

数据本地性是指代码和其所处理的数据的距离。基于数据当前的位置,数据本地性可以划分成以下几个层次(按从近到远排序):

Spark倾向于让所有任务都具有最佳的数据本地性,但这并非总是可行的。某些情况下,可能会出现一些空闲的执行器(executor)没有待处理的数据,那么Spark可能就会牺牲一些数据本地性。有两种可能的选项:a)等待已经有任务的CPU,待其释放后立即在同一台机器上启动一个任务;b)立即在其他节点上启动新任务,并把所需要的数据复制过去。

通常,Spark会等待一会,看看是否有CPU会被释放出来。一旦等待超时,则立即在其他节点上启动并将所需的数据复制过去。数据本地性各个级别之间的回落超时可以单独配置,也可以在统一参数内一起设定;详细请参考 configuration page 中的 spark.locality 相关参数。如果你的任务执行时间比较长并且数据本地性很差,你就应该试试调大这几个参数,不过默认值一般都能适用于大多数场景了。

参考资料

Spark调优(官方)

上一篇 下一篇

猜你喜欢

热点阅读