玩转大数据Flink学习指南Yarn

NUMA 和 Yarn NUMA 感知

2022-03-16  本文已影响0人  AlienPaul

NUMA概念和带来的问题

NUMA即Non-uniform memory access(非一致内存访问)。简单来说是一个机器中的CPU和内存资源分为多个分组,这些组称之为numa节点,组内的CPU内核之间通信,和访问本组所属的内存性能很高,但是跨组CPU内核交互和内存交互性能较差。不同CPU内核和物理位置的内存是不对等的。因此,在NUMA架构的机器上存在一种优化方式是将应用锁定在指定的一个或多个numa节点上,尽量减少跨numa节点调用。目前操作系统和软件调度认为所有CPU内核和内存空间是相同的,并没有感知到NUMA架构的存在,资源调度比较“随机”。这样会引起性能损耗。

在Linux中可以使用numactl命令绑定进程运行使用的CPU和内存资源。例如:

numactl --cpunodebind=node_id --membind=node_id ./path/to/app

常用参数解释:

numactl命令还支持查看本机的NUMA拓扑。可使用numactl --hardware命令输出当前机器numa node的详细情况。包含:

示例输出如下:

[root@localhost ~]# numactl --hardware
available: 2 nodes (0,8)
node 0 cpus: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
node 0 size: 61332 MB
node 0 free: 60332 MB
node 8 cpus: 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
node 8 size: 65341 MB
node 8 free: 60231 MB
node distances:
node   0   8
  0:  10  40
  8:  40  10

问题似乎得到了解决,但是在Yarn上运行的程序怎么绑定NUMA节点呢?

社区在Yarn 3.1.0版本中带来了NUMA感知(NUMA aware)功能,链接:[YARN-5764] NUMA awareness support for launching containers - ASF JIRA (apache.org)

NUMA感知配置方法如下:

在NodeManager中启用NUMA Awareness特性,默认配置为false不启动NUMA感知。启用NUMA感知的配置如下:

<property>
    <name>yarn.nodemanager.numa-awareness.enabled</name>
    <value>true</value>
</property>

使用NUMA awareness的Linux系统需要安装numactl命令。通过yarn.nodemanager.numa-awareness.numactl.cmd参数配置numactl命令在系统中路径。

<property>
    <name>yarn.nodemanager.numa-awareness.numactl.cmd</name>
    <value>/usr/bin/numactl</value>
</property>

yarn.nodemanager.numa-awareness.read-topology参数决定NodeManager是否自动读取系统NUMA信息。默认配置为false,程序不会主动获取本机NUMA信息。

<property>
    <name>yarn.nodemanager.numa-awareness.read-topology</name>
    <value>true</value>
</property>

如果不启用自动读取NUMA信息,可以手工指定。方法在源码分析中讲解。

NumaResourceAllocator

Yarn的Numa节点分配逻辑位于NumaResourceAllocator。它的init方法负责读取Numa topology。Yarn支持从numactl命令和配置项中读取numa topology。从numactl命令读取的方式为通过解析numactl --hardware命令输出,将Numa资源封装为NumaNodeResource
。从配置中读取的方式为从yarn-site.xml中读取如下配置:

最后将解析之后的资源封装为NumaNodeResource,添加在numaNodesListnumaNodeIdVsResource集合中。

public void init(Configuration conf) throws YarnException {
    // 读取yarn.nodemanager.numa-awareness.read-topology配置
    if (conf.getBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY,
                        YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY)) {
        LOG.info("Reading NUMA topology using 'numactl --hardware' command.");
        // 从yarn.nodemanager.numa-awareness.numactl.cmd读取numactl命令位置
        // 默认为/usr/bin/numactl
        String cmdOutput = executeNGetCmdOutput(conf);
        // 按行切割
        String[] outputLines = cmdOutput.split("\\n");
        // 获取available node这一行括号中的内容(可用numa node)
        Pattern pattern = Pattern.compile(NUMA_NODEIDS_REGEX);
        String nodeIdsStr = null;
        for (String line : outputLines) {
            Matcher matcher = pattern.matcher(line);
            if (matcher.find()) {
                nodeIdsStr = matcher.group(1);
                break;
            }
        }
        if (nodeIdsStr == null) {
            throw new YarnException("Failed to get numa nodes from"
                                    + " 'numactl --hardware' output and output is:\n" + cmdOutput);
        }
        // 可用numa node
        String[] nodeIdCommaSplits = nodeIdsStr.split("[,\\s]");
        // 处理range的情况,node id有可能是0-8, 9这种表示方法
        for (String nodeIdOrRange : nodeIdCommaSplits) {
            if (nodeIdOrRange.contains("-")) {
                // 是范围表示方法
                String[] beginNEnd = nodeIdOrRange.split("-");
                int endNode = Integer.parseInt(beginNEnd[1]);
                for (int nodeId = Integer
                     .parseInt(beginNEnd[0]); nodeId <= endNode; nodeId++) {
                    // 从范围begin到范围end逐个解析所属的内存和cpu核心资源
                    // node x free这一行
                    long memory = parseMemory(outputLines, String.valueOf(nodeId));
                    // node x cpus这一行
                    int cpus = parseCpus(outputLines, String.valueOf(nodeId));
                    // 包装成NumaNodeResource,加入numaNodesList和numaNodeIdVsResource集合管理
                    addToCollection(String.valueOf(nodeId), memory, cpus);
                }
            } else {
                // 不是范围表示方法,直接解析
                long memory = parseMemory(outputLines, nodeIdOrRange);
                int cpus = parseCpus(outputLines, nodeIdOrRange);
                addToCollection(nodeIdOrRange, memory, cpus);
            }
        }
    } else {
        // 如果不需要从命令读取numa topology,则从配置中读取
        LOG.info("Reading NUMA topology using configurations.");
        // 读取yarn.nodemanager.numa-awareness.node-ids配置
        Collection<String> nodeIds = conf
            .getStringCollection(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS);
        // 分别从如下配置项中读取各个numa node对应的内存和cpu核心配置
        for (String nodeId : nodeIds) {
            long mem = conf.getLong(
                "yarn.nodemanager.numa-awareness." + nodeId + ".memory",
                DEFAULT_NUMA_NODE_MEMORY);
            int cpus = conf.getInt(
                "yarn.nodemanager.numa-awareness." + nodeId + ".cpus",
                DEFAULT_NUMA_NODE_CPUS);
            addToCollection(nodeId, mem, cpus);
        }
    }
    if (numaNodesList.isEmpty()) {
        throw new YarnException("There are no available NUMA nodes"
                                + " for making containers NUMA aware.");
    }
    LOG.info("Available numa nodes with capacities : " + numaNodesList.size());
}

分配Numa node的逻辑位于allocate方法中:

private NumaResourceAllocation allocate(ContainerId containerId,
                                        Resource resource) {
    // 遍历所有numa node
    for (int index = 0; index < numaNodesList.size(); index++) {
        // 从上次分配的node开始向后遍历,这样可以确保numa node分配任务尽量的均衡
        NumaNodeResource numaNode = numaNodesList
            .get((currentAssignNode + index) % numaNodesList.size());
        // 如果可用内存和CPU vcore资源都比resource要求的多
        if (numaNode.isResourcesAvailable(resource)) {
            // 资源分配给这个container
            // numaNode里记录该container使用的资源,同时扣减可用资源数量
            numaNode.assignResources(resource, containerId);
            LOG.info("Assigning NUMA node " + numaNode.getNodeId() + " for memory, "
                     + numaNode.getNodeId() + " for cpus for the " + containerId);
            // 设置当前分配的numa node
            currentAssignNode = (currentAssignNode + index + 1)
                % numaNodesList.size();
            // 返回NumaResourceAllocation
            return new NumaResourceAllocation(numaNode.getNodeId(),
                                              resource.getMemorySize(), numaNode.getNodeId(),
                                              resource.getVirtualCores());
        }
    }
}

该方法从当前分配的node往后逐个遍历NumaNode,如果numa node的节点资源能够满足要求,将这个numa node分配给该container。返回的NumaResourceAllocation含有这个numa node的id。

分配numa资源的调用位于NumaResourceHandlerImpl::preStart方法。preStart方法在获取到numa节点对应的资源后,为container增加numa相关的启动参数。

@Override
public List<PrivilegedOperation> preStart(Container container)
    throws ResourceHandlerException {
    List<PrivilegedOperation> ret = null;
    // 调用前面所述的allocateNumaNodes方法
    NumaResourceAllocation numaAllocation = numaResourceAllocator
        .allocateNumaNodes(container);
    // container启动命令前加numactl --interleave=xxx --cpunodebind=xxx
    if (numaAllocation != null) {
        ret = new ArrayList<>();
        ArrayList<String> args = new ArrayList<>();
        args.add(numaCtlCmd);
        args.add(
            "--interleave=" + String.join(",", numaAllocation.getMemNodes()));
        args.add(
            "--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes()));
        ret.add(new PrivilegedOperation(OperationType.ADD_NUMA_PARAMS, args));
    }
    return ret;
}

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

上一篇 下一篇

猜你喜欢

热点阅读