Spark调优
spark调优方面
数据序列化
序列化在任何分布式应用程序的性能中都起着重要的作用。将对象序列化为大量字节的格式将大大降低计算速度。因此,选择合适的序列化方式将是优化程序的第一件事。spark
旨在在便利性(允许在操作中使用任何的java
类型)和性能之间取得平衡,提供了两种序列化库。
Java
序列化:默认情况下,spark
允许使用java
的ObjectOutputStream
框架序列化对象,并且可以与程序中创建的任何实现java.io.Serializable
的类一起使用。还可以通过扩展java.io.Externalizable
来更紧密地控制序列化的性能。java
序列化是灵活的,但通常很慢,并导致许多大型序列化格式的类。
kryo
序列化:spark
还可以使用kryo
库更快地序列化对象。kryo
比java
序列化(通常高达10倍)明显更快,更紧凑,但不支持所有Serializable
类型,并且要求提前注册在程序中使用的类以获得最佳性能。
通过使用sparkConf
初始化作业并调用conf.set("spark.serializer", "org.apache.spark.serializer.kryoSerializer")
来使用kryo
序列化方式。该设置的序列化方式,不仅用于在work
节点之间数据shuffle
,还用于将RDD
序列化到磁盘。kryo
不是默认的唯一原因是因为需要自定义注册要求。从spark 2.0.0
开始,在使用简单类型,简单类型数组或字符串类型对RDD
进行数据shuffle
,内部使用kryo
进行序列化。
spark
自动包含kryo
序列化器,用于Twitter chill
库中AllscalaRegistrar
涵盖的许多常用核心scala
库。
要使用kryo
注册自己的自定义类,需要使用registerKryoClasses
方法
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classof[MyClass1], classof[Myclass2]))
val sc = new SparkContext(conf)
如果需要注册的对象很大的时候,可以通过增加spark.kryoserializer.buffer
的大小来容纳允许的序列化对象的大小。
如果没有注册自定义类,kryo
仍会工作,但它必须存储每个对象的完整类名,这是一种浪费!
内存调整
内存优化有三个方面的考虑:对象所占的内存,访问对象的成本以及垃圾回收所占的开销。
默认情况下,java
对象访问速度很快,但与其“原始”数据相比,可以轻松占用2-5倍的空间。这是由于一下几个原因:
- 每个不同的
java
对象都有一个”对象头“,大约16个字节,并包含诸如指向该类的指针之类的信息。对于其中包含非常少数据的对象(比如一个int
字段),这可能比数据大。 -
java String
比原始字符串数据上有大约40个字节的开销(因为它们将它存储在一个char
数组中并保留额外的数据,如长度),并且由于String
内部使用UTF-16
编码,一个字符需要占用两个字节,因此一个长度为10字节的字符串需要60个字节。 - 公共集合类,例如
HashMap
和LinkedList
,使用链表数据结构,其中每个节点都有一个“包装”对象(例如Map.Entry
)。此对象不仅包含“对象头”,还具有指向列表中下一个对象的指针(通常每个指针占8个字节)。 - 原始类型的集合通常将它们存储为“装箱”对象,例如
java.lang.Integer
。
内存管理概述
spark
中的内存使用大致分为两类:execution
和storage
。execution
内存是用于shuffle
、join
、sort
、aggregation
计算的内存,而storage
内存是用于在集群中cache
和broadcast
内部数据的内存。在spark
中,execution
和storage
共享一个统一的区域(M
)。当没有使用execution
内存时,storage
可以获取所有可用的内存,反之亦然。如有必要,execution
可以驱逐storage
,但仅限于总存储内存使用量低于某个阈值(R
)。换句话说,R
描述了M
从不驱逐缓存块的子区域。由于实现的复杂性,storage
不会驱逐execution
。
这种设计保证了几个显著的特征。首先,不使用缓冲的应用程序可以使用整个空间执行,从而避免不必要的磁盘溢出。其次,使用缓冲的应用程序可以保留最小存储空间(R
),其中数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户内部划分内存的专业知识。
虽然有两种相关配置,但大部分用户不需要调整他们,因为默认值适用于大多数工作负载:
-
spark.memory.fraction
表示大小M
为(JVM
堆空间 -300MB
)的一小部分(默认值为0.6)。剩余的空间(0.4)保留用于数据结构,spark
中内部元数据,防止在异常大而稀疏的记录下发生OOM
错误。 -
spark.memory.storageFraction
表示R
为M
(默认为0.5)的一小部分。R
是M
缓冲块不受执行驱逐的存储空间。 -
spark.memory.fraction
值的配置不仅仅调试JVM
堆空间或trunred
设置。还有一些GC
优化
确定内存消耗
调整数据集所需内存消耗量的最佳方式就是创建RDD
,将其放入缓冲中,然后查看web UI
中的“storage”页面。来确定RDD
占用多少内存。
为了估计特定对象的内存消耗,使用SizeEstimator
的estimate
方法。这对于尝试使用不同的数据布局来调整内存使用情况以及确定广播变量在每个执行程序堆上占用的空间量非常有用。
数据结构优化
减少内存消耗的第一种方法是避免增加开销的java
功能,例如基于指针的数据结构和包装器对象。有很多中方法:
- 使用的数据结构优先选择对象数组和基本类型,而不是标准的
java
或scala
集合类(例如HashMap
)。fastutil
库提供了原始数据类型非常方便的集合类,同时兼容java
标准类库。 - 尽可能避免使用包含大量小对象和指针的嵌套结构。
- 考虑使用数据
ID
或枚举对象而不是String
类型的主键。 - 如果内存少于
32GB
,设置Jvm
参数-XX:+UseCompressedOops
来将8字节指针修改为4字节。
序列化RDD存储
当上面的优化都尝试了,但时当对象仍然太大而无法有效存储时,减少内存使用的一种更简单的方法是使用RDD
持久化API
中的序列化StorageLevels
以序列化形式存储他们。spark
将每个RDD
分区存储为一个大字节数据。由于必须动态地反序列化每个对象,因此以序列化形式存储数据的唯一缺点是访问时间较慢。如果以序列化的形式缓冲数据,建议使用kryo
,因为它占用的空间比java
序列化小得多。
垃圾收集调整
当程序存储的RDD
进行大量的“失效”时,JVM
垃圾回收可能会出现问题。(在读取RDD
一次然后在其上运行许多操作的程序中通常不会出现问题。)当java
需要清理旧对象以便为新对象腾出空间时,它需要遍历所有java
对象并查找未使用的。垃圾收集的成本与java
对象的数量成正比,因此使用具有较少对象的数据结构(例如,使用int
数组而不是LinkedList
)大大降低了这种成本。一种更好的方法是如上述以序列化形式持久化对象:现在每个RDD
分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC
是一个问题,首先要使用序列化缓冲。
由于任务的execution
内存(运行任务所需的空间量)与节点上缓冲的RDD
之间的干扰,GC
也可能成为问题。下面讨论如何控制分配给RDD
缓存的空间来缓解这种情况。
测量GC的影响
GC
调整的第一步是收集关于垃圾收集发生频率和GC
花费的时间的统计信息。这可以通过添加 -Verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
到java
选项来完成。下次运行spark
作业时,每当发生垃圾回收时,都会看到工作日志中打印的消息。这些日志将在集群的工作节点(stdout
在其工作目录的文件中),而不是驱动程序。
高级GC调整
为了进一步调整垃圾收集,首先介绍一下JVM
中有关内存管理的一些基本信息:
-
java
堆空间分为Young
和Old
两个区域。Young
意味这持有新生的对象,而Old
则持有生命周期较长的对象。 -
Young
进一步分为三个区域[Eden
,Survivor1
,Survivor2
]。 - 垃圾收集过程的简化描述:当
Eden
已满时,在Eden
上运行小级别GC
,并将从Eden
和Survivor1
中存活的对象复制到Survivor2
。Survivor
区被交换。如果一个对象足够大或Survivor2
已满,则将其移到Old区。最后,当Old
区接近满时,将调用大级别的GC
。
Spark
中GC
调整的目标是确保只有长期存在的RDD
存储在Old
代中,并且Young
代的大小足以存储短期对象。这将有助于避免大级别的GC
收集在任务执行期间创建的临时对象。一些可能有用的步骤:
- 通过收集
GC
统计数据来检查是否有太多的垃圾收集。如果在任务完成之前多次调用完整的GC
,则意味这没有足够的内存可用于执行任务。- 如果有太多次要
GC
而不是主要GC
,那么Eden
分配更多的内存会有所帮助。可以将Eden
的大小设置为偏大每个任务所需的内存量。如果Eden
的大小E
,则可以使用该选项设置Young
代的大小-Xmn=4/3*E
。(按比例增加4/3是为了考虑Survivor
使用的空间)。
- 如果有太多次要
- 在打印的
GC
统计信息中,如果Old
接近满时,则通过降低来减少用于缓冲的内存量spark.memory.fraction
;缓存更少的对象比减慢任务执行速度更好。考虑减小Young
的大小。-Xmn
如果没有,尝试更改JVM NewRatio
参数的值。许多JVM
将次默认为2,这意味着Old
占堆的2/3。它应该足够大,使得该值超过spark.memory.fraction
。 - 尝试使用
G1GC
垃圾收集器-XX:+UseG1GC
。在垃圾收集成为瓶颈的某些情况下,它可以提高性能。可能重要的时增加G1
的region
大小-XX:G1HeapRegionSize
。 - 例如,如果任务时从
hdfs
读取数据,则可以使用从hdfs
读取的数据块的大小来估计任务使用的内存量。解压块的大小通常时块大小的2或3倍。因此,如果有3或4个任务的工作空间,并且hdfs
块大小为128M
,可以估计Eden的带下4*3*128
。 - 监控垃圾收集所用频率和时间如何随新设置而变化。
可以通过设置spark.executor.extraJavaOptions
作业的配置来指定执行程序的GC
调整标志。
其他考虑因素
并行程度
为每个操作设置足够高的并行度,否则将无法充分利用群集。Spark
会根据其大小自动设置要在每个文件上运行的"map"任务的数量(尽管可以通过可选参数来控制它SparkContext.textFile
等),并且对于分布式"reduce"操作,例如groupByKey
和reduceByKey
,它使用最大的父级RDD
的分区数量。可以将并行级别作为第二个参数传递,或者设置config
属性spark.default.parallelism
以更改默认值。通常,建议群集中每个CPU
核心有2-3个任务。
减少任务的内存使用情况
有时候,你会得到一个OutOfMemoryError
,不是因为RDD
不适合内存,而是因为一个任务的数据集太大了,例如Spark
的Shuffle
操作一个reduce
任务groupByKey
,reduceByKey
,join
等。建立每个任务中的哈希表来进行分组,而这往往是大的。最简单的解决方法是增加并行度,以便每个任务的输入集更小。Spark
可以有效地支持短至200毫秒的任务,因为它在许多任务中重用了一个执行程序JVM
,并且它具有较低的任务启动成本,因此可以安全地将并行度提高到超过群集中的核心数。
广播大变量
使用可用的广播变量可以大大减少每个序列化任务的大小,以及在群集上启动作业的成本。如果任务使用一个小的数据集进行类似与join
操作,可以转化为map
端join
,建议转换为广播变量。Spark
会在主服务器上打印每个任务的序列化大小,因此可以查看它以确定任务是否过大; 一般来说,大于约20 KB的任务可能值得优化。
数据位置
数据位置可能会对Spark
作业的性能产生重大影响。如果数据和在其上运行的代码在一起,那么计算往往很快。但是如果代码和数据是分开的,那么必须进行移动。通常,将序列化代码从一个地方移动到另一个地方比数据移动更快,因为代码大小比数据小得多。Spark
围绕数据局部性的一般原则构建其调度。
数据位置是数据与处理它的代码的接近程度。根据数据的当前位置,有多个级别的位置。从最近到最远的顺序:
-
PROCESS_LOCAL
数据与正在运行的代码位于同一JVM
中。这是最好的地方 -
NODE_LOCAL
数据在同一节点上。可能位于同一节点上的HDFS
中,也可能位于同一节点上的另一个执行程序中。这比PROCESS_LOCAL
因为数据必须在进程之间传输要慢一些 -
NO_PREF
可以从任何地方快速访问数据,并且没有位置偏好 -
RACK_LOCAL
数据位于同一机架服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机 -
ANY
数据在网络上的其他位置,而不在同一个机架中
Spark
更喜欢在最佳位置级别安排所有任务,但这并不总是可行的。在任何空闲执行程序上没有未处理数据的情况下,Spark
会切换到较低的位置级别。有两个选项:a
)等待忙碌的CPU
释放以启动同一服务器上的数据任务,或b
)立即在需要移动数据的更远的地方启动新任务。
Spark
通常会做的是等待忙碌的CPU
释放。一旦超时到期,它就开始将数据从远处移动到空闲CPU
。每个级别之间的回退等待超时可以单独配置,也可以在一个参数中一起配置(spark.locality
上的 参数)。默认情况下通常效果很好。