Demo 示例:如何原生的在 K8s 上运行 Flink?
Kubernetes 简介
什么是 Kubernetes?
Kubernetes 相信大家都比较熟悉,近两年大家都在讨论云原生的话题,讨论 Kubernetes。那么什么是 Kubernetes 呢?
- K8s 是一个资源管理系统。
如果大家对 Yarn、 Mesos 熟悉,假设给定一批裸的物理机,将资源管理系统部署上去之后,可以在此之上基于它的 API 或者 SDK 开发一些分布式软件或者应用程序。例如可以在 Yarn 上开发传统的 MapReduce,在 K8s 上可以开发一些分布式的 Web Server,或者是大数据计算任务等等。
- K8s 是一个容器编排系统。
不同于传统的 Yarn,K8s 在所有的进程运行过程中,是全部基于容器化的,但这里的容器并不只是单纯的 Docker 容器,它也包括 Rocket 等其他相关的隔离措施。如果在生产环境中要求比较高的话,可能会有一些安全容器,比如 Kata Containers 等等。K8s 在 Slave 上部署的应用程序,都是用容器化的方式去做分发和管理,同时用容器化的技术做隔离。
- K8s 是一个自动化运维系统。
它是一个声明式的 API,我们只需要告诉 K8s 集群需要创建一个 Deployment,设置的副本数量,需要达到一个什么样的状态,调度系统也就是 K8s 就会帮助我们维持状态,直到达到设置的状态为止。如果中间发生了一些 failover 或者发生了一些失败,它会自动地将任务迁移到其他的机器上,来满足当前的调度。
- 云原生。
目前几乎所有的云厂商都已经提供了 K8s 服务支持,包括国内的阿里、国际上的 Amazon、Google 等等,包括传统的微软都已经提供了对于 K8s 的 Managed 服务或者是 Unmanaged 服务。随着目前 Lambda 表达式或者 Function 计算的应用, Serverless 方式也变得更加流行。除了传统的部署小集群以外,通过云产生一个 manager,构建一个大的 Serverless 集群,然后用户按需进行计算资源付费,这也是一种新的模式。
Kubernetes 的架构

上图是 K8s 基本的架构,这是一个非常典型的 Master-Slave 的架构。
- 在 Master 上,是由 Controller,API Server,Scheduler 以及包括做存储的 Etcd 等构成。Etcd 可以算成 Master,也可以作为独立于 Master 之外的存储来对待。Master 的 Controller、API Server、Scheduler 都是单独的进程模式。这和 Yarn 有一些不同,Yarn 的整个 Master 是一个单进程的模式。K8s 的 Master 还可以在多个 Master 之间完成自发的选举,然后由 active 状态的 Master 对外提供服务。
- 在 Slave 上,它主要是包括 Kube proxy、Kubelet,以及 Docker 等相关的组件,每个 Node 上部署的相关组件都是类似的,通过它来管理上面运行的多个 Pod。
- 根据不同用户的习惯,可以通过 UI 或者 CLI 的方式向 K8s 提交任务。用户可以通过 K8s 提供的 Dashboard Web UI 的方式将任务进行提交,也可以通过 Kubectl 命令行的方式进行提交。
Kubernetes 的一些概念
- ConfigMap
ConfigMap 是一个 K-V 数据结构。通常的用法是将 ConfigMap 挂载到 Pod ,作为配置文件提供 Pod 里新的进程使用。在 Flink 中可以将 Log4j 文件或者是 flink-conf 文件写到 ConfigMap 里面,在 JobManager 或者 TaskManger 起来之前将它挂载到 Pod 里,然后 JobManager 去读取相应的 conf 文件,加载其配置,进而再正确地拉起 JobManager 一些相应的组件。
- Service(简称 SVC )
一种对外暴露服务的方式。如果现在内部有一个服务,需要在 K8s 外部进行访问,此时可以通过 Service,然后用 LoadBalancer 或者 NodePort 的方式将其暴露出去。
如果有一个 Service,不希望或不需要将其对外暴露,可以把它设置为 Cluster IP 或者是 None 这种 Headless 的模式。这个时候,它可以用于服务之间相互连接,例如传统的前端去联后端服务,或者是在 Flink 中非 HA 的情况下,TaskManager 去连 JobManager 等等。
- Pod
Pod 是 K8s 里最小的调度单元。K8s 都是以 Pod 进行调度的。每个 Pod 可以包含一个或者多个 Container。每个 Container 都会有自己的资源,相互之间资源也是已经隔离的,但是所有 Container 共享同一个网络,这就意味着所有的 Container 可以通过 localhost 直接进行通信。
同时,Container 之间可以通过 Volume 共享一些文件。比如 JobManager 或 TaskManager 的 Pod 里产生了一些日志,在同一个 Pod 里再去起另外一个进程收集不符合 K8s 的原生语义。可以通过 SideCar 的方式去起另外一个 Container,把 JobManager 产生的日志收走。这就是一个 Pod 多个 Container 的具体用途。
- Deployment
因为 Pod 是可以随时被终止的,所以当 Pod 终止之后,就无法再拉起来去做 failover 等其他相关操作。Deployment 是在 Pod 之上提供了更高一层的抽象。Deployment 可以设置 Pod 的状态,比如需要起 5 个 TaskManager,Deployment 会维持当前状态。当有 TaskManager 挂了以后,它会起新的 TaskManager,来补上。这样可以避免自己汇报 Pod 的状态,可以去做一些更复杂的管理 failover 等等。这也是最基础的概念——运维自动化。

目前都有什么样的任务在 K8s 上运行?
除了传统的 Web 以及移动端一些无状态的如 MySQL、Kafka 等存储相关的任务外,有状态的服务也不断地在 K8s 上做适配和运行。除此之外,深度学习框架 Tensorflow 原生即可在 K8s 上运行,包括 Spark、Flink 等等,一些大数据相关的框架也在不断地去兼容,不断地去适配,以便让更多的大数据服务可以更好地在 K8s 上运行。
从这一点我们可以看出, K8s 相比于 Yarn 或传统的 Hadoop 具有更好的包容性,它可以把存储、深度学习、大数据包括 OLAP 分析等多种计算框架、引擎都运行在 K8s 之上。这样就会带来一个很大的好处,整个公司只需要去管理一个调度架构,就可以把所有的存储,实时计算,批量计算,包括深度学习,OLAP 分析等等,都在一个集群里面运行。除了管理更方便以外,也可以达到更好的集群利用率。
Flink On Kubernetes 的部署演进
Flink 在 K8s 上最简单的方式是以 Standalone 方式进行部署。这种方式部署的好处在于不需要对 Flink 做任何改动,同时 Flink 对 K8s 集群是无感知的,通过外部手段即可让 Flink 运行起来。
Standalone Session On K8s
Standalone方式在k8s运行步骤:

如图所示:
- 步骤1, 使用 Kubectl 或者 K8s 的 Dashboard 提交请求到 K8s Master。
- 步骤2, K8s Master 将创建 Flink Master Deployment、TaskManager Deployment、ConfigMap、SVC 的请求分发给 Slave 去创建这四个角色,创建完成后,这时 Flink Master、TaskManager 启动了。
- 步骤3, TaskManager 注册到 JobManager。在非 HA 的情况下,是通过内部 Service 注册到 JobManager。
- 至此,Flink 的 Sesion Cluster 已经创建起来。此时就可以提交任务了。
- 步骤4,在 Flink Cluster 上提交 Flink run 的命令,通过指定 Flink Master 的地址,将相应任务提交上来,用户的 Jar 和 JobGrapth 会在 Flink Client 生成,通过 SVC 传给 Dispatcher。
- 步骤5,Dispatcher 会发现有一个新的 Job 提交上来,这时会起一个新的 JobMaster,去运行这个 Job。
- 步骤6,JobMaster 会向 ResourceManager 申请资源,因为 Standalone 方式并不具备主动申请资源的能力,所以这个时候会直接返回,而且我们已经提前把 TaskManager 起好,并且已经注册回来了。
- 步骤7-8,这时 JobMaster 会把 Task 部署到相应的 TaskManager 上,整个任务运行的过程就完成了。
Standalone perjob on K8s
现在我们看一下 Perjob 的部署,因为 Session Cluster 和 Perjob 分别都有不同的适用场景,一个 Session 里面可以跑多个任务,但是每个任务之间没有办法达到更好的隔离性。而 Perjob 的方式,每个job都会有一个自己独立的 Flink Cluster 去运行,它们之间相互独立。
■ Perjob 的特点:
- 用户的 Jar 和依赖都是在镜像里提前编译好,或者通过 Init Container 方式,在真正 Container 启动之前进行初始化。
- 每个 Job 都会启动一个新的 Cluster。
- 一步提交,不需要像 Session Cluster 一样先启动集群再提交任务。
- 用户的 main 方法是在 Cluster 里运行。在特殊网络环境情况下,main 方法需要在 Cluster 里运行的话,Session 方式是无法做到的,而 Perjob 方式是可以执行的。

■ 执行步骤:
由 Standalone JobCluster EntryPoint 执行,从 classpath 找到用户 Jar,执行它的 main 方法得到 JobGrapth 。再提交到 Dispathcher,这时候走 Recover Job 的逻辑,提交到 JobMaster。JobMaster 向 ResourceManager 申请资源,请求 slot,执行 Job。
Helm Chart 方式
Helm 类似于 Linux 上的 Yum。
K8s 里的 Helm 是一个包管理工具,可以很方便的安装一个包。部署一个 Flink 集群等操作,只需要 helm install 就可以将之前很多步的安装操作,一步去完成。本质上没有什么差别,只是它用 Helm 重新组织,包括一些模板等等,用起来会更加方便。
Flink Kubernetes Operator

- 任务生命周期管理
使用 Operator 的方式来管理 Flink,主要是来管理多个 Cluster 的情况,可起到任务生命周期管理的作用。它和 Standalone、Native 的方式,本质上不是在一个层次上,它类似于一个更上层的做任务管理的工具。
- 基于 K8s Operator,方便创建 Flink Cluster。
之前去创建一个 Perjob Cluster,可能需要部署多次,如果任务要做升级,甚至可能需要把之前的删掉,然后修改配置,再重新部署。
引入 K8s Operator 就只需要做一些简单操作。比如 Operator 中有自己的一套 yaml 描述方式,修改其中某一个字段,如修改 image 的 version 字段,此时后台会自动触发一些重启,包括对目前正在执行的任务做 savepoint,然后把 Cluster 销毁掉,再进行新的定向就可以将集群拉起,等一系列自动化的操作。对 Flink 的配置做修改等也都可以在后台自动化完成。
目前 Operater 有 Lyft 和 Google 两个开源的 operator,他们在功能上类似,而且都是已经经过生产检验,与目前的 Standalone Cluster 结合的比较好的,已经达到生产可用的标准。
参考:
1.lyft/flinkk8soperator
https://github.com/lyft/flinkk8soperator
2.GoogleCloudPlatform/flink-on-k8s-operator
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
总结
当然,Flink on K8s 当前也存在一些不足:
- 无论 Operator、Helm Chart 或者是直接使用 Kubectl Yaml 的方式,Flink 都感知不到 K8s 的存在。
- 目前主要使用静态的资源分配。需要提前确认好需要多少个 TaskManager,如果 Job 的并发需要做一些调整,TaskManager 的资源情况必须相应的跟上,否则任务无法正常执行。
- 用户需要对一些 Container、Operator 或者 K8s 有一些最基本的认识,这样才能保证顺利将 Flink 运行到 K8s 之上。
- 对于批处理任务,或者想在一个 Session 里提交多个任务不太友好。无法实时申请资源和释放资源。因为 TaskManager 的资源是固定的,批处理任务可能会分多个阶段去运行,需要去实时地申请资源、释放资源,当前也无法实现。如果需要在一个 Session 里跑多个 Job 并且陆续运行结束当前也无法实现。这时如果维持一个比较大的 Session Cluster,可能会资源浪费。但如果维持的 Session Cluster 比较小,可能会导致 Job 跑得慢或者是跑不起来。
基于这几点,我们在社区推进了一个 Native 的集成方案,这个 Native 类似于 Yarn 这种原生的集成,就是让 Flink 原生的感知到下层 Cluster 的存在。
Navtive Integration 的技术细节
为什么叫 Native 方式?包括如下几个含义。
- 资源申请方式:Flink 的 Client 内置了一个 K8s Client,可以借助 K8s Client 去创建 JobManager,当 Job 提交之后,如果对资源有需求,JobManager 会向 Flink 自己的 ResourceManager 去申请资源。这个时候 Flink 的 ResourceManager 会直接跟 K8s 的 API Server 通信,将这些请求资源直接下发给 K8s Cluster,告诉它需要多少个 TaskManger,每个 TaskManager 多大。当任务运行完之后,它也会告诉 K8s Cluster释放没有使用的资源。相当于 Flink 用很原生的方式了解到 K8s Cluster 的存在,并知晓何时申请资源,何时释放资源。
- Native 是相对于 Flink 而言的,借助 Flink 的命令就可以达到自治的一个状态,不需要引入外部工具就可以通过 Flink 完成任务在 K8s 上的运行。
具体如何工作?主要分 Session 和 Perjob 两个方面来给大家介绍。
Native Kubernetes Session 方式

首先 Session 的方式。
- 第一个阶段:启动 Session Cluster。Flink Client 内置了 K8s Client,告诉 K8s Master 创建 Flink Master Deployment,ConfigMap,SVC。创建完成后,Master 就拉起来了。这时,Session 就部署完成了,并没有维护任何 TaskManager。
- 第二个阶段:当用户提交 Job 时,可以通过 Flink Client 或者 Dashboard 的方式,然后通过 Service 到 Dispatcher,Dispatcher 会产生一个 JobMaster。JobMaster 会向 K8sResourceManager 申请资源。ResourceManager 会发现现在没有任何可用的资源,它就会继续向 K8s 的 Master 去请求资源,请求资源之后将其发送回去,起新的 Taskmanager。Taskmanager 起来之后,再注册回来,此时的 ResourceManager 再向它去申请 slot 提供给 JobMaster,最后由 JobMaster 将相应的 Task 部署到 TaskManager 上。这样整个从 Session 的拉起到用户提交都完成了。
- 需注意的是,图中 SVC 是一个 External Service。必须要保证 Client 通过 Service 可以访问到 Master。在很多 K8s 集群里,K8s 和 Flink Client 是不在同一个网络环境的,这时候可以通过 LoadBalancer 的方式或者 NodePort 的方式,使 Flink Client 可以访问到 Jobmanager Dispatcher,否则 Jar 包是无法提交的。
Native Kubernetes Perjob 方式

我们再来看一下 Perjob 的方式,如图所示,Perjob 方式其实和之前是有一些类似,差别在于不需要先去起一个 Session Cluster,再提交任务,而是一步的。
- 首先创建出了 Service、Master 和 ConfigMap 这几个资源以后,Flink Master Deployment 里面已经带了一个用户 Jar,这个时候 entrypoint 就会从用户 Jar 里面去提取出或者运行用户的 main,然后产生 JobGraph。之后再提交到 Dispatcher,由 Dispatcher 去产生 Master,然后再向 ResourceManager 申请资源,后面的逻辑的就和 Session 的方式是一样的。
- 它和 Session 最大的差异就在于它是一步提交的。因为没有了两步提交的需求,如果不需要在任务起来以后访问外部 UI,就可以不用外部的 Service。可直接通过一步提交使任务运行。通过本地的 port-forward 或者是用 K8s ApiServer 的一些 proxy 可以访问 Flink 的 Web UI。此时,External Service 就不需要了,意味着不需要再占用一个 LoadBalancer 或者占用 NodePort。这就是 perjob 方式。
Session 与 Perjob 方式的不同
我们来看一下 Session 和 Perjob 方式有哪些不同?

Demo 演示
Session
- 启动 Session
注意:image 需要替换为自己的镜像或者使用 docker hub 上的 Flink 官方镜像库。
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
- 提交 job 到 Session
./bin/flink run -d -p 10 -e kubernetes-session -Dkubernetes.cluster-id=k8s-session-1 examples/streaming/WindowJoin.jar
- 停止 Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=k8s-session-1 -Dexecution.attached=true
Application
Application 模式是 FLIP-85 引入的一种新的模式,长远来看是用于替换社区目前的 perjob 模式的。目前 application 模式和 perjob 模式最大的区别是用户代码在 client 端还是 jobmanager 端运行。在 K8s 部署上,由于用户的 jar 和依赖都可以提前打在镜像里面,所以支持 application 模式就变得非常容易。
注意:Application 模式是在 Flink 1.11 中才支持的功能,需要使用对应的 Flink client 和镜像。可以参考社区文档来构建自己的镜像。
./bin/flink run-application -p 10 -t kubernetes-application \
-Dkubernetes.cluster-id=k8s-app1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
local:///opt/flink/examples/streaming/WindowJoin.jar
目前功能的状态
- Native Kubernetes Session模式
FLINK-9953:已经在 Flink 1.10发布:
https://issues.apache.org/jira/browse/FLINK-9953
- Native Kubernetes Application 模式
FLINK-10934:计划在 Flink 1.11发布
https://issues.apache.org/jira/browse/FLINK-10934
- Native模式下高可用
FLINK-12884:目前高可用方式是基于 zk 实现的,未来是希望不依赖于外部组件,基于 K8s 的 ConfigMap 去做 Meta 存储和 Leader 选举。目前已经有内部实现,未来将会贡献给社区。
https://issues.apache.org/jira/browse/FLINK-12884
- 其他功能支持:
FLINK-14460:计划陆续在 Flink 1.11/1.12两个版本中发布和完善
https://issues.apache.org/jira/browse/FLINK-14460
包括内容如下:
- Label,annotation,node-selector:希望将任务调度到某个集群特定的机器上的应用场景可能需要 node-selector,希望给集群打特定的 Label 并且外部能访问到,可以使用 Label 的功能等等。
- Sidecar container:帮助完成日志收集等
- Init container:可以帮助在 JobManager,TaskManager 启动之前,把 Jar 下载好,这样我们可以使用统一的景象,不需要把用户 Jar 打到镜像里
- 存储优化
- Pod 模板完成一些不常用的功能等等
目前社区的 Flink on K8s 的 native 方案还在快速发展和完善,希望大家多多试用并且提出反馈意见,如果有兴趣也非常欢迎一起参与进来开发。