flinkflink

Flink源码阅读(二)--- JobGraph 的生成

2021-02-07  本文已影响0人  sj_91d7

本文内容是基于Flink 1.9来讲解。在执行Flink任务的时候,会涉及到三个Graph,分别是StreamGraph,JobGraph,ExecutionGraph。其中StreamGraph和JobGraph是在client端生成的,ExecutionGraph是在JobMaster中执行的。

本篇文章在Flink源码阅读(一)--- StreamGraph 的生成 基础上,介绍下JobGraph的生成

1. JobVertex

在StreamGraph中,每个operator对应一个StreamNode。在JobGraph中,JobVertex对应的是可chain起来的operator list,把一些operator chain起来,可以较少网络以及序列化和反序列化的开销,大部分情况下可以提高作业性能。

2. JobEdge

在StreamGraph中,StreamNode之间的连接关系使用StreamEdge表示。在JobGraph中,JobVertex之间的连接关系使用JobEdge表示。

3. JobGraph生成入口StreamingJobGraphGenerator#createJobGraph()方法

    private JobGraph createJobGraph() {

        // make sure that all vertices start immediately
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();

        setChaining(hashes, legacyHashes, chainedOperatorHashes);

        setPhysicalEdges();

        setSlotSharingAndCoLocation();

        configureCheckpointing();

        JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }

主要做的工作如下
1.1 为streamGraph的StreamNode生成hash值(如果用户为operator指定了uid,就使用用户自定义的,否则自动生成。这个uid不管是用户指定还是自动生成,必须保证Job全局唯一性),这个值以后作为JobVertexID来唯一标识节点
1.2 设置task chain关系,并且把可chain的operator创建一个新的JobVertex添加到JobGraph中
    1.2.1 是否可chain,代码逻辑在StreamingJobGraphGenerator.isChainable方法中
        - 下游算子的输入只有一个
        - 下游算子不为空
        - 上游算子不为空
        - 上游和下游算子的slotSharingGroup相同
        - 下游算子的ChainingStrategy为ALWAYS
        - 上游算子的ChainingStrategy为HEAD或者ALWAYS
        - 上下游算子中间的edge的Partitioner是ForwardPartitioner
        - 上下游算子的并发相同
        - streamGraph配置是可以chain的
1.3 设置PhysicalEdges
1.4 设置SlotSharingAndCoLocation,就是把streamGraph StreamNode的SlotSharingGroup属性设置到JobVertex中
1.5 配置checkpoint。主要就是设置triggerVertices(所有的输入节点),commitVertices(所有的节点),ackVertices(所有的节点)
1.6 设置执行配置信息,比如默认并发度,失败时retry次数,retry delay等

通过createJobGraph方法,就完成了StreamGraph到JobGraph的转换。

4. 小结

JobGraph主要是把StreamGraph中,可以chain起来的operator进行合并,这样可以减小网络以及序列化和反序列化的开销。

上一篇下一篇

猜你喜欢

热点阅读