FlinkFlinkflink

Flink源码阅读(六)--- Flink 内存模型

2021-03-12  本文已影响0人  sj_91d7

本篇文章主要介绍下FLink的内存模型,在介绍Flink内存模型之前,我们首先学习下JVM内存结构

1. JVM内存结构

Java7 升级为 Java8的时候,JVM内存结构发生了改变,咱们看下区别是什么。这部分内容原文 Java8 JVM内存结构

1.1 Java7 对应的 JVM 内存结构

Java7内存结构.png

很多人愿意将方法区称作永久代。

本质上来讲两者并不等价,仅因为Hotspot将GC分代扩展至方法区,或者说使用永久代来实现方法区。在其他虚拟机上是没有永久代的概念的。也就是说方法区是规范,永久代是Hotspot针对该规范进行的实现。

1.2 Java8 对应的 JVM 内存结构

Java8内存结构.png

元空间MetaSpace存在于本地内存,意味着只要本地内存足够,它不会出现像Java7永久代中“java.lang.OutOfMemoryError: PermGen space”这种错误。

那为什么用MetaSpace代替了方法区呢?是因为通常使用PermSize和MaxPermSize设置永久代的大小就决定了永久代的上限,但是不知道应该设置多大合适, 如果使用默认值很容易遇到OOM错误。
当使用元空间时,可以加载多少类的元数据就不再由MaxPermSize控制, 而由系统的实际可用空间来控制。

1.3 JVM 堆内堆外内存什么含义?

这部分内容原文 堆内堆外内存

堆内内存

堆外内存

2. Flink 内存模型

Flink1.10 对Flink的内存模型进行了改造,咱们分开来介绍Flink1.10之前版本,已经Flink1.10之后版本

2.1 Flink1.10前的Flink内存模型

首先看下内存模型图


Flink1.10前内存模型.png

看下flink源码,来分析下内存各个分区大小是怎么设置的,入口 ContaineredTaskManagerParameters#create 方法

    /**
     * Computes the parameters to be used to start a TaskManager Java process.
     *
     * @param config The Flink configuration.
     * @param containerMemoryMB The size of the complete container, in megabytes.
     * @return The parameters to start the TaskManager processes with.
     */
    public static ContaineredTaskManagerParameters create(
            Configuration config,
            long containerMemoryMB,
            int numSlots) {
        // (1) try to compute how much memory used by container
        final long cutoffMB = calculateCutoffMB(config, containerMemoryMB);

        // (2) split the remaining Java memory between heap and off-heap
        final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
        // use the cut-off memory for off-heap (that was its intention)
        final long offHeapSizeMB = containerMemoryMB - heapSizeMB;

        // (3) obtain the additional environment variables from the configuration
        final HashMap<String, String> envVars = new HashMap<>();
        final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;

        for (String key : config.keySet()) {
            if (key.startsWith(prefix) && key.length() > prefix.length()) {
                // remove prefix
                String envVarKey = key.substring(prefix.length());
                envVars.put(envVarKey, config.getString(key, null));
            }
        }

        // done
        return new ContaineredTaskManagerParameters(
            containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
    }
}
  1. container cut-off 区域
       - check cutoff ratio,memoryCutoffRatio 默认是0.25
       - check min cutoff value, 默认最小cutoff区域是600MB
       - cutoff区域大小:containerMemoryMB * memoryCutoffRatio
       - 这部分区域是预留内存,RocksDB使用的native内存,或者 JVM overhead都是使用这部分区域。

  2. Network buffers 区域(也就是 Off-heap 区域)
       - 用于网络传输(比如 shuffle、broadcast)的内存 Buffer 池,属于 Direct Memory 并由 Flink 管理。
       - taskmanager.memory.segment-size 默认是 32kb
       - taskmanager.network.memory.fraction 默认是 0.1
       - Network buffers 区域大小:( containerMemoryMB - cutoffMB ) * taskmanager.network.memory.fraction,并且介于 64MB ~ 1GB之间

  3. Heap 区域
       - Heap 区域大小: containerMemoryMB - cutoffMB - networkReservedMemory
       - 可以使用JVM参数 -Xms 和 -Xmx 来设置上下限

2.2 Flink1.10及之后版本的Flink内存模型

首先看下内存模型图


Flink1.10及之后版本内存模型.png

看下flink源码,来分析下内存各个分区大小是怎么设置的,入口 AbstractContainerizedClusterClientFactory#getClusterSpecification 方法,这个方法是在集群提交作业的时候被调度。

    @Override
    public ClusterSpecification getClusterSpecification(Configuration configuration) {
        checkNotNull(configuration);
        
        // JM 内存模型
        final int jobManagerMemoryMB =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                                configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
                        .getTotalProcessMemorySize()
                        .getMebiBytes();

        // TM 内存模型
        final int taskManagerMemoryMB =
                TaskExecutorProcessUtils.processSpecFromConfig(
                                TaskExecutorProcessUtils
                                        .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                                                configuration,
                                                TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                        .getTotalProcessMemorySize()
                        .getMebiBytes();

        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

        return new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(jobManagerMemoryMB)
                .setTaskManagerMemoryMB(taskManagerMemoryMB)
                .setSlotsPerTaskManager(slotsPerTaskManager)
                .createClusterSpecification();
    }

咱们主要是分析TM的内存模型,也就是直接看下TaskExecutorProcessUtils.processSpecFromConfig方法,该方法主要做了下面几件事情

2.2.1. 首先初始化TM进程内存选项

对应TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS对象,根据configure文件读取,涉及到下面几个参数:
1.1 task 和 managed 内存相关
   - taskmanager.memory.task.heap.size: task heap大小,没有默认值。
   - taskmanager.memory.managed.size:flink框架manage 内存大小,没有默认值
1.2 Total Flink Memory 内存
   - taskmanager.memory.flink.size: Total Flink Memory大小,没有默认值。
1.3 Total Process Memory 内存
   - taskmanager.memory.process.size: Total Process Memory大小,没有默认值。
1.4 Jvm Metaspace和 Overhead 内存
   - taskmanager.memory.jvm-metaspace.size: jvm-metaspace大小,主要存储类的元数据信息,默认值 256MB
   - taskmanager.memory.jvm-overhead.min:jvm-overhead区域的最小值,默认值是 192 MB
   - taskmanager.memory.jvm-overhead.max:jvm-overhead区域的最大值,默认值是 1GB
   - taskmanager.memory.jvm-overhead.fraction:jvm-overhead占内存比例,默认值是0.1

    static final ProcessMemoryOptions TM_PROCESS_MEMORY_OPTIONS =
            new ProcessMemoryOptions(
                    Arrays.asList(
                            TaskManagerOptions.TASK_HEAP_MEMORY,
                            TaskManagerOptions.MANAGED_MEMORY_SIZE),
                    TaskManagerOptions.TOTAL_FLINK_MEMORY,
                    TaskManagerOptions.TOTAL_PROCESS_MEMORY,
                    new JvmMetaspaceAndOverheadOptions(
                            TaskManagerOptions.JVM_METASPACE,
                            TaskManagerOptions.JVM_OVERHEAD_MIN,
                            TaskManagerOptions.JVM_OVERHEAD_MAX,
                            TaskManagerOptions.JVM_OVERHEAD_FRACTION));

2.2.2. 接下来看下FLink内存模型,内存大小怎么计算的

如果指定了Total Process Memory大小,可以接下来看 ProcessMemoryUtils#deriveProcessSpecWithTotalProcessMemory方法
根据设置的Total Process Memory大小,来计算其他区域内存大小

(1) 内存第一部分: Jvm Metaspace大小

   - 如果在conf文件中设置了taskmanager.memory.jvm-metaspace.size,就按照设置的来,否则走默认值 256MB

(2) 内存第二部分: Jvm Overhead 大小

   - 根据 taskmanager.memory.jvm-overhead.fraction比例来计算,OverheadMemorySize = totalProcessMemorySize * taskmanager.memory.jvm-overhead.fraction
如果,OverheadMemorySize正好介于taskmanager.memory.jvm-overhead.min(192MB)和 taskmanager.memory.jvm-overhead.max(1GB)之间,那取值就是OverheadMemorySize
否则,if OverheadMemorySize > taskmanager.memory.jvm-overhead.max ,那取值就是taskmanager.memory.jvm-overhead.max。
if OverheadMemorySize < taskmanager.memory.jvm-overhead.min ,那取值就是taskmanager.memory.jvm-overhead.min

(3) 内存第三部分: Total Flink Memory 大小

   - totalFlinkMemorySize = totalProcessMemorySize - jvmMetaspaceSize - jvmOverheadSize

2.2.3. Total Flink Memory 内,各个分区大小计算

Total Flink Memory中又涉及到好几块区域,分别来看下计算规则,都是基于totalFlinkMemorySize来计算的。对应TaskExecutorFlinkMemoryUtils#deriveFromTotalFlinkMemory方法,该方法主要做了下面几件事情:

  1. 获取frameworkHeap大小,可以通过taskmanager.memory.framework.heap.size参数修改,默认值是128MB
  2. 获取frameworkOffHeap大小,可以通过taskmanager.memory.framework.off-heap.size参数修改,默认值是128MB
  3. 获取taskOffHeap大小,可以通过taskmanager.memory.task.off-heap.size参数修改,默认值是0

接下来还有taskHeapMemory,networkMemory,managedMemory需要设置,这里会有if-else逻辑,第一种情况:如果明确指定了taskHeapMemory大小, else 是第二种情况,接下来咱们按照else这个分支分析下 taskHeapMemory,networkMemory,managedMemory这三块区域的内存大小。

  1. managedMemorySize 大小
  1. networkMemorySize 大小
  1. taskHeapMemorySize 大小

至此,Flink内存模型已经介绍完成。

3. 指定TM内存模型的方式

整个TM内存模型可以通过三种方式来指定

  1. 通过指定 taskmanager.memory.task.heap.sizetaskmanager.memory.managed.size来确定
  2. 通过指定 taskmanager.memory.flink.size 也就是 Total Flink Memory大小
  3. 通过指定 * taskmanager.memory.process.size* 也就是 Total Process Memory大小

对应源码ProcessMemoryUtils#memoryProcessSpecFromConfig方法

    public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
        if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
            // all internal memory options are configured, use these to derive total Flink and
            // process memory
            return deriveProcessSpecWithExplicitInternalMemory(config);
        } else if (config.contains(options.getTotalFlinkMemoryOption())) {
            // internal memory options are not configured, total Flink memory is configured,
            // derive from total flink memory
            return deriveProcessSpecWithTotalFlinkMemory(config);
        } else if (config.contains(options.getTotalProcessMemoryOption())) {
            // total Flink memory is not configured, total process memory is configured,
            // derive from total process memory
            return deriveProcessSpecWithTotalProcessMemory(config);
        }
        return failBecauseRequiredOptionsNotConfigured();
    }

4. 总结

本文介绍了Flink1.9 和 Flink1.12的内存模型以及各个区域的计算方法。
简单总结下Flink1.10之后的内存模型:

  1. 本质上 Java 应用使用的内存(不包括 JVM 自身的开销)可以分为三类:
       - JVM 堆内存:Heap
       - 不在 JVM 堆上但受到 JVM 管理的内存:Direct
       - 完全不受 JVM 管理的内存:Native

  2. Direct 内存是直接映射到 JVM 虚拟机外部的内存空间,但是其用量又受到 JVM 的管理和限制,从这个角度来讲,认为它是 JVM 内存或者非 JVM 内存都是讲得通的。

  3. Flink UI中 metric 加在一起为什么不是 TM 的总内存?
       一方面是因为 Native 内存没有被算进去(也就是 Cut-off 的主要部分),因为 Native 是不受 JVM 管理的,MXBean 完全不知道它的使用情况。另一方面,JVM 自身的开销也并不是都被覆盖到了,比如对于栈空间,JVM 只能限制每个线程的栈空间有多大,但是不能限制线程的数量,因此总的栈空间大小也是不受控制的,也没有通过 Metric 来体现。

  4. Window相关的算子会将窗口内的数据作为状态保存在内存里,等待窗口触发再进行计算。想问一下这里的状态是存在哪种类型的内存里面?
      这个应该是存在 state 里的,具体用哪种类型的内存取决于你的 State Backend 类型。MemoryStateBackend/FsStateBackend 用的是 Heap 内存,RocksDBStateBackend 用的是 Native 内存,也就是 1.10 中的 Manage Memory。

  5. flink 1.10里将RocksDBStateBackend改为使用managed memory,统一使用 offheap 内存,您的解答里说的是native内存,不知道是不是您说的“完全不受 JVM 管理的内存:Native”这个?
       是的

  6. 不是太清楚offheap和direct以及native的关系是怎样的?
       Flink 配置项中的 task/framework offheap,是包括了 direct 和 native 内存算在一起的,也就是说用户不需要关心具体使用的是 direct 还是 native。

  7. Overhead 主要涉及到哪部分信息存储?
       使用的native内存,主要存储线程栈,code cache, garbage collection space等。

  8. 为什么本地起的 Standalone Flink,为啥 UI 上展示的 Heap 会超过设置的 taskmanager.memory.process.size 的值?
       这主要是因为,我们只针对 Metaspace 设置了 JVM 的参数,对于其他 Overhead 并没有设置 JVM 的参数,也并不是所有的Overhead 都有参数可以控制(比如栈空间)。

Non-Heap Max 是 JVM 自己决定的,所以通常会比 Flink 配置的 Metaspace + Overhead 要大。
可以这样理解,Flink 将整个 TM 的内存预算划分给了不同的用途,但是并不能严格保证各部分的内存都不超用,只能是 Best Effort。
其中,Managed、Network、Metaspace 是严格限制的,Off-Heap、Overhead 是不能完全严格限制的,Heap 整体是严格限制的但是 Task/Framework 之间是非严格的。

5. 参考

  1. https://juejin.cn/post/6844903975419019277
  2. https://blog.csdn.net/khxu666/article/details/80775635
上一篇下一篇

猜你喜欢

热点阅读