Flink算子的生命周期

2018-11-19  本文已影响239人  丹之

Flink算子的生命周期

前言

前面已经介绍了 flink 的逻辑计划、物理计划等相关信息,本文将重点介绍 flink 的 operator 以及运行时的 task,后续会介绍 flink task 的调度算法

算子

什么是一个算子

flink 中的一个 operator 代表一个最顶级的 api 接口,拿 streaming 来说就是,在 DataStream 上做诸如 map/reduce/keyBy 等操作均会生成一个算子

算子的生成

先来看 operator 的继承关系:


对于 Streaming 来说所有的算子都继承自 StreamOperator,StreamOperator 中定义了一系列的生命周期方法,同时也定义了 snapshort 的接口,AbstractStreamOperator 定义了基本的设置和声明周期方法,AbstractUdfStreamOperator 定义了用户自定义函数的生命周期和快照策略,这些接口的调用时机会在下面一一阐述😄。

算子的生成触发于对 DataStream 的操作上,比如 map addSink等。

算子 chain

flink 基本组件和逻辑计划生成一节 我们介绍了 JobGraph 的生成过程,其中 JobGraph 的生成最大的意义在于做了一些算子的 chain 优化,那么什么样的节点可以被 chain 呢?如下图:

一些必须要经过 shuffle 的节点是 chain 或者 节点可达 的边界,非常类似于 Spark Streaming 中对于 Stage 的划分,上图中 keyBy 这样的 groupBy 操作就是划分是否可被 chain 的边界

在 StreamingJobGraphGenerator 的 createChain 方法中为每个 StreamNode 生成了一个 StreamConfig,并且对于可以生成 JobVertex 的节点[ chain 的起始节点 ]设置了如下属性:

//StreamingJobGraphGenerator line212
if (currentNodeId.equals(startNodeId)) {

   config.setChainStart();
   config.setChainIndex(0);
   config.setOutEdgesInOrder(transitiveOutEdges);
   config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

   for (StreamEdge edge : transitiveOutEdges) {
      connect(startNodeId, edge);
   }

   config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

}

上面的逻辑概括如下:

连接的逻辑如下:

//StreamingJobGraphGenerator line357
private void connect(Integer headOfChain, StreamEdge edge) {

   physicalEdgesInOrder.add(edge);

   Integer downStreamvertexID = edge.getTargetId();

   JobVertex headVertex = jobVertices.get(headOfChain);
   JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);

   StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

   downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

   StreamPartitioner<?> partitioner = edge.getPartitioner();
   if (partitioner instanceof ForwardPartitioner) {
      downStreamVertex.connectNewDataSetAsInput(
         headVertex,
         DistributionPattern.POINTWISE,
         ResultPartitionType.PIPELINED,
         true);
   } else if (partitioner instanceof RescalePartitioner){
      downStreamVertex.connectNewDataSetAsInput(
         headVertex,
         DistributionPattern.POINTWISE,
         ResultPartitionType.PIPELINED,
         true);
   } else {
      downStreamVertex.connectNewDataSetAsInput(
            headVertex,
            DistributionPattern.ALL_TO_ALL,
            ResultPartitionType.PIPELINED,
            true);
   }

   if (LOG.isDebugEnabled()) {
      LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
            headOfChain, downStreamvertexID);
   }
}

概括下逻辑:

以上只是客户端生成逻辑计划时的算子 chain,在运行时算子的的 chain 被封装成了一个单独的对象 OperatorChain,里面在原有的基础上将 operators 的操作封装起来并且确定了下游的的输出入口

来看 OperatorChain 的核心实现

首先总结下构造器的功能:

这里的关键是算子 chain 的创建过程,见下图创建过程:

上图中 S 节点的下游 A/B/C 是可以与 S Chain 在一起的,D/E 是必须经过网络传输的节点,一个 OperatorChain 封装了图中的节点 S/A/B/C,也就是说上图可以被看做如下所示:

OperatorChain 中有两个关键的方法:createOutputCollectorcreateChainedOperator,前者负责获取一个 StreamNode 的输出Output,后者负责创建 StreamNode 对应的 chain 算子,两者相互调用形成递归,如上面的创建过程图,具体的流程如下:

那么 S 发射一条消息后的处理流程是如何呢?

S 在调用 processElement 方法时会调用 output.collect,这里的 output 为 A 对应的 ChainingOutput,ChainingOutput 的 collect 调用了对应的算子 StreamOperator AprocessElement 方法,这里又会调用 B 的 ChainingOutput 的 collect 方法,以此类推。这样便实现了可 chain 算子的本地处理,最终经由网络输出 RecordWriterOutput 发送到下游节点

算子的运行

flink 算子的运行牵涉到两个关键类 Task.javaStreamTask.java,Task 是直接受 TaskManager 管理和调度的,而 Task 又会调用 StreamTask,StreamTask 中封装了算子的处理逻辑

我们先来看 StreamTask

StreamTask 的 JavaDoc 上描述了其生命周期:

*  -- restoreState() -> restores state of all operators in the chain
*  
*  -- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> open-operators()
*        +----> run()
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()

StreamTask 运行之初会尝试恢复算子的 State 快照,然后由 Task 调用其 invoke 方法

下面重点分析一下其 invoke 方法的实现

StreamTask 中还有关于 Checkpoint 和 StateBackup 的核心逻辑,这里先不介绍,会另开一篇😄

我们来看 StreamTask 的实现类之一 OneInputStreamTask ,便可以知道 init()run() 分别都做了什么:

init方法

StreamInputProcessor 是 StreamTask 内部用来处理 Record 的组件,里面封装了外部 IO 逻辑【内存不够时将 buffer 吐到磁盘上】以及 时间对齐逻辑【Watermark】,这两个将会合并一节在下一章介绍_

run方法:

真正的运行时类 Task

这里我们会详细的介绍下 Task 的核心逻辑

Task 代表一个 TaskManager 中所起的并行 子任务,执行封装的 flink 算子并运行,提供以下服务:消费输入data、生产 IntermediateResultPartition [ flink关于中间结果的抽象 ]、与 JobManager 交互

JobManager 分发 Task 时最初是抽象成了一个描述类 TaskDeploymentDescriptor,TaskManager 在抽到对应的 RPC 请求后会将 Task 初始化后将 线程 拉起,TaskDeploymentDescriptor 是提供 task 信息的核心抽象:

构造器的一些组件我们会在介绍 TaskManager 的时候再详述

其核心的运行方法 run()逻辑总结如下:

line408: run

关于网络管理[ 输入和输出 ] NetworkEnvironment,内存管理 MemoryManager 会分别开章节介绍

那么 StreamTask 是如何在 Task 中被实例化,又是如何被调用的呢?

//line 418
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
//一系列初始化操作 ...
//line 584
invokable.invoke();

上面的 invokable 就是 StreamTask,StreamTask 的继承关系:

那么具体是什么时候被 set 进去作为属性的呢?

在 StreamNode 生成的时候有这样一段逻辑:

public <IN, OUT> void addOperator(
      Integer vertexID,
      String slotSharingGroup,
      StreamOperator<OUT> operatorObject,
      TypeInformation<IN> inTypeInfo,
      TypeInformation<OUT> outTypeInfo,
      String operatorName) {

   if (operatorObject instanceof StoppableStreamSource) {
      addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
   } else if (operatorObject instanceof StreamSource) {
      addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
   } else {
      addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
   }

将 OneInputStreamTask 等 StreamTask 设置到 StreamNode 的节点属性中,同时在 JobVertex 的节点构造时也会做一次初始化:

jobVertex.setInvokableClass(streamNode.getJobVertexClass());

在 TaskDeploymentDescriptor 实例化的时候会获取 jobVertex 中的属性,见ExecutionVertex line673

算子初始化

那么算子是什么时候被初始化的呢?这就需要梳理下 StreamTask 的 init() 方法的处理时机,上面已经分析过 init() 方法会在 StreamTask 的 invoke() 方法中被调用,那么 invoke() 方法又是何时被调用的呢?这就涉及到另外一个重要的类 Task.java,Task 才是运行时真正直接被 TaskManager 实例化和调用的类,上面已经分析过 Task 的 run 方法,是 TaskManager 收到 rpc 命令后起起来的 具体的细节会另起一章 flink 任务分发

算子销毁

StreamTask 下执行完 invoke 方法之后[意味着流程正常结束或者有异常打断],会执行下面这段逻辑:

/**
 * Execute the operator-specific {@link StreamOperator#dispose()} method in each
 * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
 * from <b>tail to head</b> operator in the chain.
 */
private void tryDisposeAllOperators() throws Exception {
   for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
      if (operator != null) {
         operator.dispose();
      }
   }
}

所以,算子中有任何 hook 函数或者必须执行的销毁工作可以写在 dispose 方法里,这段逻辑是 flink 保证一定可以执行到的

上一篇下一篇

猜你喜欢

热点阅读