Flink 之 State

2020-10-26  本文已影响0人  xiaoc024

1. Overview

State 是 Flink 中一个非常基本且重要的概念,本文将介绍什么是 State,如何使用 State,State 的存储和原理。以及 State 衍生的一些概念和应用。

2. Basic

2.1 什么是 State

一种为了满足算子计算时需要历史数据需求的,使用 checkpoint 机制进行容错,存储在 state backend 的数据结构。

首先 state 其实就是一种数据结构。然后上面的定义中隐含了三个基本知识点:

  1. 什么时候需要历史数据
  2. 为什么要容错,以及 checkpoint 如何进行容错的
  3. state backend 又是什么。

后两点会在后面小节具体展开,这里先列举一些常见的需要历史数据的场景:

2.2 有哪些常见的 State

最常见的是 Keyed State,应用于 keyedStream 上,必须在 KeyBy 操作之后使用。它的特点是同一个 sub task 上的同一个 key 共享一个 state。另外还有 operator state,顾名思义每一个 operator state 都只与一个 operator 的实例绑定。常见的 operator state 是 source state,例如记录当前 source 的 offset。它的特点是同一个 subtask 共享一个 state。另外还有一种特殊的 operator state 称为 broadcast state,它的特点是同一个算子的多个 sub task 共享一个 state

2.3 State 使用

这里以常用的 Keyed State 进行举例。前面说了 State 本质上就是一种用来存储数据的数据结构,那么作为 Keyed State,都支持哪些数据结构呢?以下列举了常见的几种数据结构:

这是官网给出的 ValueState 的使用案例:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _
  
  /** 
  也可以使用 lazy 的方式对 state 进行初始化
  lazy private val sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    ) 
  **/

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleKeyedState")
}

以上代码的功能是对输入的流数据进行平均数计算,当输入的数据大于等于 2 个时,触发计算。这里有几点需要注意:

使用 state 除了继承 RichXXFunction 外还可以直接使用系统提供的函数。如 keyBy 之后直接使用 flatMapWithState ,这里不进行具体举例。

2.4 State backend

前面介绍了 State 的类型和常见的数据结构,那么这些 state 存储的介质有哪些呢? Flink 提供了三种存储 State 的介质:

2.4.1 MemoryStateBackend:

2.4.2 FsStatebackend:

2.4.3 RocksDBStateBackend

2.5 Checkpoint

前面对 State 的使用中没有考虑容错的问题,当集群出现故障时进行恢复时,State 的值肯定不会从头开始计算,这就需要进行容错。State 使用 Checkpoint 机制进行容错。简单来说就是定时制作分布式快照,当出现故障需要进行恢复时,将所有 Task 恢复到最近一次成功的 Checkpoint 状态中,然后从那个点开始继续处理。Checkpoint 通过 Barries 对齐机制保证了恰好一次的一致性语义,关于 Barries 的原理后面将进行详细说明。

3. Deep

3.1 Checkpoint Barries

checkpoint 是 jobmanager 从 source 触发到下游所有节点完成的一次全局操作。checkpoint barriers 和 watermark 类似,都是一种特殊的事件。对某一 subtask 而言,checkpoint 表示所有 subtask 恰好处理完(不能多处理,也不能少处理。为了状态恢复时保持一致性)某个相同数据。watermark 表示这之前的数据已经接收完毕。
watermark在多 subtask 上游向下游传递时,是广播 + 取上游最小 watermark 作为当前 task 的watermark,不取最小 watermark 会丢数据。
checkpoint barriers 在多 subtask 上游向下游传递时,是广播 + checkpoint barriers 对齐(alignment)。所谓对齐就是下游 subtask 会等待他上游所有分区的 subtask 的 checkpoint barriers 都到达才进行 checkpoint。上游已经到达 checkpoint barriers 的 substask 后续数据会缓存,没有到达 checkpoint barriers 的 subtask 数据会继续处理直到 checkpoint barriers 到达。
如图是一条流的两个并行任务,逻辑是分别对奇偶数进行求和计算。

initial .png
首先由 JobManager 触发 checkpoint,向 Source 发送带有 checkpoint Id 的指令。
JobManager 触发 checkpoint
随后 Source 触发 checkpoint 将 State 保存在 State Backend 后发送 ack 给 JobManager。并且向下游发送 checkpoint barriers。此时 Source 后面算子的计算不会受到任何打断。
Source checkpoint
以 even 流的 Sum 算子为例,从图中可以看到先接收到 Source1 的 barrier 后接收到 Source2 的 barrier( 分别对应蓝色和黄色的三角 )。所以在接收到 Source1 的 barrier 后,对后面值为 4 的蓝流数据进行了缓存没有进行下一步计算,因为这个数据属于下一个 checkpoint。而在接收到 Source2 的 barrier 之前,对值为 4 的黄流数据照常进行计算直到接收到 Source2 的 barrier 为止。这就是所谓的 barriers alignment。
barriers alignment
当下游收到上游所有的 barriers 时,进行 checkpoint,并将 barrier 向下游转发。
sum checkpoint
sink 任务向 JobManager 确认状态保存到完毕后,整个 checkpoint 过程结束
sink ack

3.3 Checkpoint VS Savepoint

Savepoints are manually triggered checkpoints, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this.
Savepoints are similar to checkpoints except that they are triggered by the user and don’t automatically expire when newer checkpoints are completed.

Savepoint 底层依赖于 checkpoint,但是是由用户手动触发的。

上一篇下一篇

猜你喜欢

热点阅读