spark executor的内存管理相关

2020-06-24  本文已影响0人  早点起床晒太阳

参考资料:https://zhuanlan.zhihu.com/p/115888408 (真的很不错的文章,受益匪浅)

前言

Executor 内存管理方面讲解的很多,今天第一次细细来学习一下
内存管理相关我觉得理解可以让我们更对于广播变量以及缓存包括shuffle有些更好的控制和理解
spark在yarn模式下运行的时候,其申请的资源是以container的形式存在的。最大资源申请受yarn.scheduler.maximum-allocation-mb这个值决定。

内存组成和分配

内存分为堆内内存和堆外内存。内存分配的总览图如下图所示(借用的图)


image.png

堆内内存

堆内内存onheap由spark.executor.memory指定,堆内内存executorMemory是spark使用的主要部分,其大小通过-Xmx参数传给jvm,内部有300M的保留资源不被executor使用。

300M的源代码以及相关解释如下

  // Set aside a fixed amount of memory for non-storage, non-execution purposes.
  // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
  // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
  // the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

此外的可用内存usableMemory被分为spark管理的内存和用户管理的内存两部分,spark管理的内存通过spark.memory.fraction进行控制,默认0.6。这块内存在spark中被称为unified region(代号M)或统一内存或可用内存,其进一步被分为执行内存ExecutionMemory和StorageMemory,见上图。其中storage memory(代号R)是M的一个subregion,其的大小占比受spark.memory.storageFraction控制,默认为0.5,即默认占usableMemory的 0.6*0.5=0.3。我们用onHeapStorageRegionSize来表示storage这部分的大小

源码见下面 UnifiedMemoryManager

 def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

1、ExecutionMemory执行内存:主要存储Shuffle、Join、Sort、Aggregation等计算过程中的临时数据;
2、StorageMemory存储内存:主要存储spark的cache数据,如RDD.cache RDD.persist在调用时的数据存储,用户自定义变量及系统的广播变量等

这两块内存在当前默认的UnifiedMemoryManager(Spark1.6引入)下是可以互相动态侵占的,即Execution内存不足时可以占用Storage的内存,反之亦然,其详细规则如下:

1、Execution内存不足且onHeapStorageRegionSize有空闲时,可以向Storage Memory借用内存,但借用后storage不能将execution占用的部分驱逐evict出去,只能等着Execution自己释放。
2、Storage内存不足时可以借用Execution的内存,且当Execution又有内存资源需求时可以驱逐Storage占用的部分,但只能驱逐StorageMemory-onHeapStorageRegionSize的大小,原来划定的onHeapStorageRegionSize且在使用的不可被抢占。

这里我做了下一个小小的测试,执行了一个测试样例,提交参数如下所示

/opt/beh/core/spark/bin/spark-submit  --master yarn  --class com.example.sparklearn.test.Demo1  --num-executors 1 --executor-memory 8g --executor-cores 4  /home/hadoop/zgh/sparklearn-0.0.1-SNAPSHOT.jar

然后查看UI界面,发现单个executor的Storage Memory的内存只有4.4G,那么这4.4G怎么来的呢,我们一起来计算下,如下图所示


image.png

分配的executor-memory为8g,

依照java内存的划分,其堆内存分为eden、survivor*2和tenured部分,同时刻只有一个survivor可用,因而指定的堆内存实际可用的内存即Runtime.getRuntime.maxMemory查到的通常比Xmx指定的要小,一般为90-95%的样子,我们这里可以使用这种方式

scala>  val demo = spark.sql("select * from ceshi.xunlian");
demo: org.apache.spark.sql.DataFrame = [name: string]

scala> demo.map(x=>{Runtime.getRuntime.maxMemory}).collect
20/06/24 11:39:57 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 243924ms (threshold=30000ms); ack: seqno: 9 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0, targets: [DatanodeInfoWithStorage[192.168.1.113:50010,DS-1d2bffdf-ff9a-4f97-bbd0-734b97636128,DISK]]
res3: Array[Long] = Array(7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432, 7635730432)

来查看具体的应用中实际内存情况
这么一看是7635730432B,相当于7282M。 7282M0.6=4369.2 。实际这里显示时不是将字节除以102410241024,而是直接除以10001000*1000得到的。

上面说了占可用内存spark.memory.fraction(0.6)的spark 统一内存,另外0.4的用户内存用于存储用户代码生成的对象及RDD依赖等,用户在处理partition中的记录时,其遍历到的记录可以看做存储在Other区,当需要将RDD缓存时,将会序列化或不序列化的方式以Block的形式存储到Storage内存中。

堆外内存

前面说了,堆外内存有的是参数spark.yarn.executor.memoryOverhead控制,有的是参数spark.memory.offHeap.size控制,这个都算offheap内存,不过前者主要用于JVM自身,字符串, NIO Buffer等开销,而后者主要是供统一内存管理用作Execution Memory及Storage Memory的用途。

spark.yarn.executor.memoryOverhead设置的内存默认为executor.memory的0.1倍,最低384M,这个始终存在的,在采用yarn时,这块内存是包含在申请的容器内的,即申请容器大小大于spark.executor.memory+spark.yarn.executor.memoryOverhead。

而通过spark.memory.offHeap.enable/size申请的内存不在JVM内,spark.memory.offHeap.enable 默认为false,spark.memory.offHeap.size 默认值为0。Spark利用TungSten技术直接操作管理JVM外的原生内存。主要是为了解决Java对象开销大和GC的问题。

上一篇下一篇

猜你喜欢

热点阅读