【Flink on k8s】Flink on Kubernete
本文基于 Flink-1.12 介绍 Flink on Kubernetes 的部署模式,重点讲述 Session 和 Native Session 模式。
1.Flink on Kubernetes 的背景
Kubernetes 简介:
Kubernetes 项目来源于 Google 内部 Borg 项目,于 2014 年发布到开源社区。Kubernetes 已经成长为容器管理领域的事实标准。在大数据相关领域,包括 Spark、Hive、Kafka 等项目正在迁移到 Kubernetes。
Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的长稳性。
① Flink 特性:提供的实时服务是需要长时间、稳定地运行,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
② Kubernetes 优势:为应用提供了部署、管理能力,同时保证其稳定运行。Kubernetes 具有很好的生态,可以集成各种运维工具,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的扩缩容机制,可以大大提高资源利用率。
2.Flink Session 和 Application 模式
2.1 Session 模式
Session 模式简介
预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager。
image.png优点:
Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即作业启动速度快。
缺点:
① 资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足。
② 作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。
部署指导
参考:Flink on Standalone Kubernetes Reference
① 集群配置
集群配置通过 configmap 挂载到容器中
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
② Deployment 文件
把 Flink 镜像上传到私有镜像仓。编辑 jobmanager-service.yaml、jobmanager-deployment.yaml、taskmanager-deployment.yaml
jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
③ 执行 yaml
通过 kubectl create -f 命令创建 Flink 集群
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
2.2 Application 模式
Application 模式简介
每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。
image.png注意:
① Flink 镜像需要包含作业即Application 依赖的 Class
② 启动作业的时候需要指定 Main 函数入口类
优点:
一个作业独占一个集群,作业的隔离性好。
缺点:
资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用。
3.Flink Native Session 模式
3.1 Native Session 模式简介
类似 Session 模式,需要预先构建 JobManager。不同点是用户通过 Flink Client 向 JobManager 提交作业后,根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源,最后把作业提交到 TaskManager 上。
image.png3.2 优缺点分析
优点:
TaskManager 的资源是实时的、按需进行的创建,对资源的利用率更高。
缺点:
作业真正运行起来的时间较长,因为需要等待 TaskManager 创建。
3.3 部署指导
参考:Native Kubernetes - Session Mode
① 集群配置
集群配置通过 configmap 挂载到容器中,如上 2.1 所示。
新增如下配置:
flink-configuration-configmap.yaml
kubernetes.cluster-id: my-first-flink-cluster
execution.attached: true
② 配置 jobmanager-deployment.yaml
如上 2.1 所示,需要把启动脚本修改为 ./bin/kubernetes-session.sh
jobmanager-deployment.yaml
// 忽略...
spec:
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/kubernetes-session.sh;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
// 忽略...
③ 执行 yaml
通过 kubectl create -f 命令创建 Flink 集群
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
4.Flink Native Application 模式
4.1 Native Application 模式简介
类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性,即 Flink 直接与 Kubernetes 进行通信并按需申请资源,无需用户指定 TaskManager 资源的数量。
image.png4.2 优缺点分析
优点:
① 一个作业独占一个集群,作业的隔离性好。
② 资源利用率相对较高,按需申请 JobManager 和 TaskManager。
缺点:
① 一个作业独占一个集群,JobManager 不能复用。
② 作业启动较慢,在作业提交后,才开始创建 JobManager 和 TaskManager。
5.Flink 运行模式总结
模式 | 隔离性 | 作业启动时间 | 资源利用率 | 资源按需创建 |
---|---|---|---|---|
Session | 弱,作业共享集群 | 较短,立即启动 | 较低,集群长期存在 | 否 |
Application | 强,作业独享集群 | 最长,等待集群创建完成 | 一般,作业结束后释放资源 | 否 |
Native Session | 弱,作业共享集群 | 一般,等待 TaskManager 创建 | 较低,TaskManager 按需申请 | 是 |
Native Application | 强,作业独占集群 | 一般, 等待集群创建完成 | 最好,集群按需创建 | 是 |