Flink on K8s简单实现

2021-09-30  本文已影响0人  一根很帅的藤藤菜

采用将程序Jar包上传的Flink镜像中的方式,使用Flink On K8s Standalone模式

一、编写Dockerflie


image.png

FROM flink:1.12-scala_2.11-java8
COPY ./target/xxx.jar /opt/flink/usrlib/xxx.jar
操作就是先拉取flink官方镜像,然后将本地jar包copy到镜像的usrlib目录下。

二、编写K8s资源描述文件

从Flink on Kubernetes 的架构如上图所示,Flink 任务在 Kubernetes 上运行的步骤有:

(1)首先往 Kubernetes 集群提交了资源描述文件后,会启动 Master 和 Worker 的 container。
(2)Master Container 中会启动 Flink Master Process,包含 Flink-Container ResourceManager、JobManager 和 Program Runner。
(3)Worker Container 会启动 TaskManager,并向负责资源管理的 ResourceManager 进行注册,注册完成之后,由 JobManager 将具体的任务分给 Worker Container ,再由 Container 去执行。
(4)需要说明的是,Master Container 与Worker Container是用一个镜像启动的,只是启动参数不一样,如下图所示,两个deployment文件的镜像是同一个。

具体描述文件详见附件,flink-jobmanager.yaml和flink-taskmanager.yaml,需根据实际情况更新image地址和启动的Class文件。

flink-jobmanager.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: xxx-flink-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: hub/online/xxx-flink:flink_on_k8s
          imagePullPolicy: IfNotPresent
          env:
          args: ["standalone-job", "--job-classname", "com.xxx.App"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

flink-taskmanager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: xxx-flink-taskmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
        - name: taskmanager
          image: hub/online/xxx-flink:flink_on_k8s
          imagePullPolicy: IfNotPresent
          env:
          args: ["taskmanager"]
          ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6125
              name: query-state
          livenessProbe:
            tcpSocket:
              port: 6122
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf/
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

三、定义ConfigMap

通过flink-config-configmap.yaml文件将它们定义为 ConfigMap 来实现配置的传递和读取。如果使用默认配置,这一步则不需要。
flink-config-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: xxx-flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 1
  log4j-console.properties: |+
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

四、启动JobManager

JobManager 的执行过程分为两步:

首先,JobManager 通过 Deployment 进行描述,保证 1 个副本的 Container 运行 JobManager,可以定义一个标签,例如 flink-jobmanager。

kubectl apply -f flink-jobmanager.yaml

其次,还需要定义一个JobManager的Service,通过 service name 和 port 暴露 JobManager 服务,通过标签选择对应的 pods。

kubectl apply -f flink-jobmanager-service.yaml

flink-jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
    - name: rpc
      port: 6123
    - name: blob-server
      port: 6124
    - name: webui
      port: 8081
  selector:
    app: flink
    component: jobmanager

五、启动TaskManager

TaskManager 也是通过 Deployment 来进行描述,保证 n 个副本的 Container 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager。

kubectl apply -f flink-taskmanager.yaml

六、更新Jar包操作

更新镜像中的Jar包,重新推镜像,更新完成后,修改flink-jobmanager.yaml和flink-taskmanager.yaml文件中的image路径,然后重新执行

kubectl apply -f flink-jobmanager.yaml
kubectl apply -f flink-taskmanager.yaml
上一篇 下一篇

猜你喜欢

热点阅读