Spark & Flink大数据,机器学习,人工智能flink

Flink DataStream 状态和容错 一:状态的使用

2019-01-10  本文已影响4人  Alex90

有状态函数和算子在处理独立的数据和时间时存储数据,使得状态称为任何复杂算子操作中的关键部分,例如:

Flink需要了解状态,以便使用检查点(checkpoint)使状态容错,使流应用程序的保存点(savepoint)可用。

使用状态(State)

Keyed State 和 Operator State

Flink 有两种基本的状态:Keyed state 和 Operator state

Operator State

使用 Operator state(或非 Keyed state),Operator state 都绑定到一个并行的算子实例上。可以参考 Kafka connector,Kafka 使用者的每个并行实例都维护一个 topic 分区和偏移(offset)的映射作为其算子状态。Operator state 接口支持当并行性更改时在并行算子实力间重新分配状态。进行这种重新分配可能有不同的方案。

Operator states 的数据结构现目前只支持 List

Keyed State

Keyed state 与键(key)相关,只能在 KeyStream 上应用的函数和算子中使用。可以将 Keyed state 认为是分区的或分片的 Operator state,每个键(key)只有一个状态分区(state-partition)。逻辑上,每个 Keyed state 绑定唯一的 <parallel-operator-instatnce, key>,并且因为每个键属于算子的一个并行实例,我们可以将其简单地视为 <operator, key>

Keyed state 进一步被组织为 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子单位,Key groups 与定义的最大并行度相同。在程序执行期间,keyed 算子的每个并行实例都使用一个或多个 Key groups 的键。

Raw State 和 Managed State

Keyed state 和 Operator state 有两种形式存在:托管状态(managed)和原始状态(raw)。

托管状态(managed state)由 Flink 运行时控制的数据结构表示,例如内部哈希表或 RocksDB。例如 ValueState 和 ListState。Flink 运行时对状态进行编码并写入检查点。

原始状态(raw state)是算子保存在自己的数据结构中的状态。做检查点时,只会将一个字节序列写入检查点,状态的具体数据结构对 Flink 是透明的,只能看到一组原始字节。

所有数据流函数都可以使用托管状态(managed state),原始状态(raw state)接口只能在具体实现算子时使用。建议使用托管状态(而不是原始状态),因为在托管状态下,Flink 能够在并行性更改时自动重新分配状态,并且在内存管理方面可以做的更好。

如果要为托管状态自定义序列化逻辑,请参考的序列化自定义以确保将来的兼容性。默认序列化器不需要特殊处理。

使用托管的 Keyed State

托管 Keys state 接口提供对不同类型状态的访问,这些状态的作用域都是当前输入元素的键。也就是说,这种类型的状态只能用于 KeyedStream,可以通过调用 stream.keyBy(…) 创建。

首先看下可用的不同类型的状态,以及它们如何在程序中使用:

所有类型的状态都有 clear() 方法来清除当前活动键的状态。FoldingStateFoldingStateDescriptor 已在 Flink 1.4 中弃用,未来版本将被完全删除。可以使用 AggregatingStateAggregatingStateDescriptor 代替。

这些 state 对象仅用于状态接口。状态不一定存储在内部,但可能存储在磁盘或其他位置。从 state 获得的值取决于输入数据元的,如果接收的 Keys 不同,用户函数的调用结果会不同。

要获得状态句柄,必须创建一个 StateDescriptor,保存了状态的名称(可以创建多个状态,必须具有唯一的名称来引用状态),状态所持有的值的类型,用户指定的函数(例如 ReduceFunction)。根据要获取的状态类型,可以创建 ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor。只有使用 RuntimeContext 可以访问到状态,因此只能在 Rich function 中使用。

在 RichFunction 方法中 RuntimeContext 有以下方法来访问状态:

下面是在 FlatMapFunction 中使用示例:

// 实现了一个简单的计数窗口
// 通过输入元组的第一个参数分组
// 在分组的流中,每接收到两个元组,返回两那个元组的第二个参数的平均值
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // 访问 state value
    val tmpCurrentSum = sum.value

    // 记录 (count, value)
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // 更新 state value
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
    sum.update(newSum)

    // 如果 state value 达到 2, 发送统计的平均值并清空 state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // 输出 (1,4) (1,5)

  env.execute("ExampleManagedState")
}
状态生存周期 (TTL)

生存期(TTL)可以被指定给任何类型的 keyed state,如果配置了TTL并且状态值已过期,状态存储的值会被清除。所有状态集合类型都支持每个条目设置TTL。为了使用状态TTL,必须首先构建 StateTtlConfig 对象,然后通过传递配置在任何状态描述符中启用TTL函数:

// build StateTtlConfig
// Time.seconds(1) 是生存时间
// UpdateType 更新类型
//   - UpdateType.OnCreateAndWrite - 创建和写入时(默认)
//   - UpdateType.OnReadAndWrite - 读取和写入时
// StateVisibility 状态可见性,读取访问时是否返回过期值
//   - StateVisibility.NeverReturnExpired - 永远不会返回过期的值(默认)
//   - StateVisibility.ReturnExpiredIfNotCleanedUp -如果仍然可用则返回
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// use StateTtlConfig to enable TTL
stateDescriptor.enableTimeToLive(ttlConfig)

补充:

清理过期状态

目前只有在显式访问值时才会删除过期值,例如调用 ValueState.value()如果未读取过期状态的数据,则不会将其删除,这可能会导致状态不断增长。将来的版本中可能会发生变化

此外,可以在获取完整的状态快照时激活清理方法,减小快照的大小。目前不会清除本地状态,但在当从上一个快照恢复时,不会包括已删除的过期状态。可以在 StateTtlConfig 配置(不适用于使用 RocksDB 的增量 checkpoint):

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build
Scala DataStream API 中使用 State

除了上述接口外,Scala API 可以在 KeyedStream 上的有状态 map()flatMap() 函数中使用单一 ValueState 的快捷方式。在函数获取 ValueState 当前值,并必须返回用于更新状态的更新值:

val stream: DataStream[(String, Int)] = ...

// mapWithState
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

使用托管的 Operator State

要使用托管 Operator State,有状态函数可以实现 CheckpointedFunction 接口(更通用的)或 ListCheckpointed<T extends Serializable>接口。

CheckpointedFunction
CheckpointedFunction 接口提供对具有不同重新分发方式的 non-keyed state 的访问。需要实现两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

当执行检查点时,snapshotState() 被调用。 每次初始化用户定义的函数时,都会调用 initializeState() 函数(首次初始化函数,或者从检查点恢复时)。因此,initializeState() 不仅要初始化不同类型的状态,还包括状态恢复的逻辑。

目前,支持列表(list)类型的托管的 Operator state。状态被认为是可序列化对象的列表,彼此独立,因此在可以用于重新分配。换句话说,这些对象是可以重新分配 non-keyed state 的最小单元。根据状态访问方法,有重新分发方案:

下面示例时有状态的 SinkFunction,使用 CheckpointedFunction 在将数据元发送之前进行缓存。表示了基本的Even-split redistribution 状态列表:

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

    @transient
    private var checkpointedState: ListState[(String, Int)] = _

    // elemenet buffer
    private val bufferedElements = ListBuffer[(String, Int)]()

    override def invoke(value: (String, Int)): Unit = {
        // put value in buffer
        bufferedElements += value
        
        // send elements in buffer when buffer size reach threshold
        if (bufferedElements.size == threshold) {
            for (element <- bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear()
        }
    }

    // state add element when snapshot
    // 清除前一个检查点包含的所有对象,添加当前 buffer 中的元素
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for (element <- bufferedElements) {
            checkpointedState.add(element)
        }
    }

    // initializeState 用于首次初始化或从检查点恢复
    // 使用 FunctionInitializationContext 作为参数,用来初始化 non-keyed state
    // ListState 类型的,non-keyed state 将在检查点时存储在该容器中。
    override def initializeState(context: FunctionInitializationContext): Unit = {
        // 状态的初始化类似于 keyed state,StateDescriptor 包含状态名称和状态值的类型信息:
        val descriptor = new ListStateDescriptor[(String, Int)](
            "buffered-elements",
            TypeInformation.of(new TypeHint[(String, Int)]() {})
        )

        checkpointedState = context.getOperatorStateStore.getListState(descriptor)

        // 使用上下文对象的 isRestored() 方法来检查当前是否是失败后恢复的情况。
        // 如果 true,表示是恢复失败的情况,应用恢复数据的逻辑:
        //   恢复数据添加到 buffer 中
        if(context.isRestored) {
            for(element <- checkpointedState.get()) {
                bufferedElements += element
            }
        }
    }

}

状态访问方法的命名约定中包含其重新分发模式,其次时状态结构。例如,在 union redistribution 模式下,使用 getUnionListState(descriptor) 方法访问 list state。如果方法名称不包含重新分发模式(getListState(descriptor)),表示将使用基本的 even-split redistribution 模式。

Keyed state 也可以在 initializeState() 方法中初始化。可以使用 FunctionInitializationContext 完成。

ListCheckpointed
ListCheckpointedCheckpointedFunction 的一个有限变体,只支持 even-split redistribution 模式下 list 类型的状态恢复。同样需要实现两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;
有状态的 Source Functions

与其他算子相比,有状态的 source functions 需要更多的关注。为了更新状态和输出是在一个原子操作下(在精确一次的语义下完成故障恢复),用户需要从源的上下文中获取一个锁。

class CounterSource extends RichParallelSourceFunction[Long]
       with ListCheckpointed[Long] {

    @volatile
    private var isRunning = true

    private var offset = 0L

    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        val lock = ctx.getCheckpointLock

        while (isRunning) {
            // 输出和更新在一个原子操作中
            lock.synchronized({
                ctx.collect(offset)
                offset += 1
            })
        }
    }

    override def cancel(): Unit = isRunning = false

    override def restoreState(state: util.List[Long]): Unit =
        for (s <- state) {
            offset = s
        }

    override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
        Collections.singletonList(offset)

}

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/
https://flink.xskoo.com/dev/stream/state/state.html

上一篇 下一篇

猜你喜欢

热点阅读