Spark & FlinkFlink专题

Flink DataStream 状态和容错 二:Checkpo

2019-01-11  本文已影响9人  Alex90

Checkpoint

Flink 中的 State 在上一篇中介绍过,为了使 State 容错,需要有 State checkpoint(状态检查点)。Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。Checkpoint 机制在 Flink 容错机制 中有更详细介绍。

Checkpoint 使用的先决条件:

  1. 一个持久化的,能够在一定时间范围内重放记录的数据源。例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph...
  2. State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...

启用和配置

Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment 对象调用 enableCheckpointing(n) 启用 Checkpoint,其中n是以毫秒为单位的 Checkpoint 间隔。

Checkpoint 的配置项包括:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 启用 checkpoint 间隔 1000 ms
env.enableCheckpointing(1000)

// 高级选项:

// 设置 exactly-once 模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 设置 checkpoint 最小间隔 500 ms 
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// 设置 checkpoint 必须在1分钟内完成,否则会被丢弃
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 设置 checkpoint 失败时,任务不会 fail,该 checkpoint 会被丢弃
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// 设置 checkpoint 的并发度为 1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

相关配置

更多相关参数可以通过 conf/flink-conf.yaml 全局配置

配置项 默认值 描述
state.backend (none) 选择 state backend 实现
state.backend.async true state backend 使用异步方法。有些不支持异步,或者仅支持异步的可并忽略此选项
state.backend.fs.memory-threshold 1024 存储 state 数据文件的最小规模,如果小于该值则会存储在 root checkpoint metadata file
state.backend.incremental false 是否采用增量 checkpoint,有些不支持增量的可并忽略此选项
state.backend.local-recovery false
state.checkpoints.dir (none) 用于指定 checkpoint 数据存储目录,目录必须对所有参与的 TaskManagers 和 JobManagers 可见
state.checkpoints.num-retained 1 指定保留已完成的 checkpoint 数量
state.savepoints.dir (none) 用于指定 savepoint 数据存储目录
taskmanager.state.local.root-dirs (none)

选择 State backend

Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。

默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。Flink 支持在其他 state backend 中存储 State 和 Checkpoint。可以通过如下方法配置:StreamExecutionEnvironment.setStateBackend(…),下面有更详细的介绍。

迭代任务中使用

Flink 目前仅为没有迭代的作业提供处理保证。在迭代作业上启用 checkpoint 会导致异常。为了强制对迭代程序执行 checkpoing,需要设置一个特殊标志:env.enableCheckpointing(interval, force = true)

在失败期间,处在循环边界的记录(以及与相关的 State 变化)将丢失。

State backend

Flink 提供了不同的 State backend,支持不通的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml 指定的选项,也可以在每个作业中设置来覆盖默认选项:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);

Flink 自带了以下几种开箱即用的 state backend:

在没有配置的情况下,系统默认使用 MemoryStateBackend

三种 State backend 介绍

MemoryStateBackend

使用 MemoryStateBackend,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。

MemoryStateBackend 的限制:

MemoryStateBackend 适用以下场景:

FsStateBackend

FsStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

FsStateBackend 在 TaskManager 的内存中持有正在处理的数据。Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

FsStateBackend 默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend 构造函数中设置:

new FsStateBackend(path, false);

FsStateBackend 适用以下场景:

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

RocksDBStateBackend 在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

RocksDBStateBackend 通常也是异步的。

RocksDBStateBackend 的限制:
RocksDB JNI API 是基于 byte[],因此 key 和 value 最大支持大小为2^31 个字节。RocksDB 自身在支持较大 value 时候有一些问题。

RocksDBStateBackendFsStateBackend 同样适用以下场景:

与前两者相比(处理状态下的 State 还是保存在内存中),使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。

性能比较

Flink 支持 Standalone 和 on Yarn 的集群部署模式,以 Windowed Word Count 处理为例测试三种 State backends 在不通集群部署上的性能差异(来源:美团 Flink _Benchmark

Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。

不同 State backend 吞吐量对比

Throughput

不同 State backend 延迟对比

Latency

State backend 的选择

StateBackend in-flight checkpoint 吞吐 推荐使用场景
MemoryStateBackend TM Memory JM Memory 调试、无状态或对数据丢失或重复无要求
FsStateBackend TM Memory FS/HDFS 普通状态、窗口、KV 结构
RocksDBStateBackend RocksDB on TM FS/HDFS 超大状态、超长窗口、大型 KV 结构

Reference:
https://flink.xskoo.com/dev/stream/state/checkpointing.html
https://tech.meituan.com/Flink_Benchmark.html

上一篇下一篇

猜你喜欢

热点阅读