大数据

Apache Flink 进阶(六):Flink 作业执行深度解

2020-09-19  本文已影响0人  bigdata张凯翔

作者:岳猛
整理:平凡的世界-zkx

本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink Contributor、网易云音乐实时计算平台研发工程师岳猛分享。主要分享内容为 Flink Job 执行作业的流程,文章将从两个方面进行分享:一是如何从 Program 到物理执行计划,二是生成物理执行计划后该如何调度和执行。

Flink 四层转化流程

Flink 有四层转换流程,第一层为 Program 到 StreamGraph;第二层为 StreamGraph 到 JobGraph;第三层为 JobGraph 到 ExecutionGraph;第四层为 ExecutionGraph 到物理执行计划。通过对 Program 的执行,能够生成一个 DAG 执行图,即逻辑执行图。如下:


image.png

第一部分将先讲解四层转化的流程,然后将以详细案例讲解四层的具体转化。

  • 第一层 StreamGraph 从 Source 节点开始,每一次 transform 生成一个 StreamNode,两个 StreamNode 通过 StreamEdge 连接在一起,形成 StreamNode 和 StreamEdge 构成的DAG。
  • 第二层 JobGraph,依旧从 Source 节点开始,然后去遍历寻找能够嵌到一起的 operator,如果能够嵌到一起则嵌到一起,不能嵌到一起的单独生成 jobVertex,通过 JobEdge 链接上下游 JobVertex,最终形成 JobVertex 层面的 DAG。
  • JobVertex DAG 提交到任务以后,从 Source 节点开始排序,根据 JobVertex 生成ExecutionJobVertex,根据 jobVertex的IntermediateDataSet 构建IntermediateResult,然后 IntermediateResult 构建上下游的依赖关系,形成 ExecutionJobVertex 层面的 DAG 即 ExecutionGraph。
  • 最后通过 ExecutionGraph 层到物理执行层。

Program 到 StreamGraph 的转化

Program 转换成 StreamGraph 具体分为三步:

  • 从 StreamExecutionEnvironment.execute 开始执行程序,将 transform 添加到 StreamExecutionEnvironment 的 transformations。
  • 调用 StreamGraphGenerator 的 generateInternal 方法,遍历 transformations 构建 StreamNode 及 StreamEage。
  • 通过 StreamEdge 连接 StreamNode。


    image.png

    通过 WindowWordCount 来看代码到 StreamGraph 的转化,在 flatMap transform 设置 slot 共享组为 flatMap_sg,并发设置为 4,在聚合的操作中设置 slot 共享组为 sum_sg, sum() 和 counts() 并发设置为 3,这样设置主要是为了演示后面如何嵌到一起的,跟上下游节点的并发以及上游的共享组有关。

WindowWordCount 代码中可以看到,在 readTextFile() 中会生成一个 transform,且 transform 的 ID 是 1;然后到 flatMap() 会生成一个 transform, transform 的 ID 是 2;接着到 keyBy() 生成一个 transform 的 ID 是 3;再到 sum() 生成一个 transform 的 ID 是 4;最后到 counts()生成 transform 的 ID 是 5。


image.png

transform 的结构如图所示,第一个是 flatMap 的 transform,第二个是 window 的 transform,第三个是 SinkTransform 的 transform。除此之外,还能在 transform 的结构中看到每个 transform 的 input 是什么。

接下来介绍一下 StreamNode 和 StreamEdge。

  • StreamNode 是用来描述 operator 的逻辑节点,其关键成员变量有 slotSharingGroup、jobVertexClass、inEdges、outEdges以及transformationUID;
  • StreamEdge 是用来描述两个 operator 逻辑的链接边,其关键变量有 sourceVertex、targetVertex。


    image.png

    WindowWordCount transform 到 StreamGraph 转化如图所示,StreamExecutionEnvironment 的 transformations 存在 3 个 transform,分别是 Flat Map(Id 2)、Window(Id 4)、Sink(Id 5)。

transform 的时候首先递归处理 transform 的 input,生成 StreamNode,然后通过 StreamEdge 链接上下游 StreamNode。需要注意的是,有些 transform 操作并不会生成StreamNode 如 PartitionTransformtion,而是生成个虚拟节点。


image.png

在转换完成后可以看到,streamNodes 有四种 transform 形式,分别为 Source、Flat Map、Window、Sink。


image.png
每个 streamNode 对象都携带并发个数、slotSharingGroup、执行类等运行信息。
StreamGraph 到 JobGraph 的转化

Flink Job 执行流程

Flink On Yarn 模式
image.png

基于 Yarn 层面的架构类似 Spark on Yarn 模式,都是由 Client 提交 App 到 RM 上面去运行,然后 RM 分配第一个 container 去运行 AM,然后由 AM 去负责资源的监督和管理。需要说明的是,Flink 的 Yarn 模式更加类似 Spark on Yarn 的 cluster 模式,在 cluster 模式中,dirver 将作为 AM 中的一个线程去运行。Flink on Yarn 模式也是会将 JobManager 启动在 container 里面,去做个 driver 类似的任务调度和分配,Yarn AM 与 Flink JobManager 在同一个 Container 中,这样 AM 可以知道 Flink JobManager 的地址,从而 AM 可以申请 Container 去启动 Flink TaskManager。待 Flink 成功运行在 Yarn 集群上,Flink Yarn Client 就可以提交 Flink Job 到 Flink JobManager,并进行后续的映射、调度和计算处理。
Fink on Yarn 的缺陷
资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
On-Yarn 模式下,所有的 container 都是固定大小的,导致无法根据作业需求来调整 container 的结构。譬如 CPU 密集的作业或许需要更多的核,但不需要太多内存,固定结构的 container 会导致内存被浪费。
与容器管理基础设施的交互比较笨拙,需要两个步骤来启动 Flink 作业: 1.启动 Flink 守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
On-Yarn 模式下,作业管理页面会在作业完成后消失不可访问。
Flink 推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的 session 模式,令人疑惑。
在 Flink 版本 1.5 中引入了 Dispatcher,Dispatcher 是在新设计里引入的一个新概念。Dispatcher 会从 Client 端接受作业提交请求并代表它在集群管理器上启动作业。

上一篇下一篇

猜你喜欢

热点阅读