Flink DataStream 状态和容错 二:Checkpo
Checkpoint
Flink 中的 State 在上一篇中介绍过,为了使 State 容错,需要有 State checkpoint(状态检查点)。Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。Checkpoint 机制在 Flink 容错机制 中有更详细介绍。
Checkpoint 使用的先决条件:
- 一个持久化的,能够在一定时间范围内重放记录的数据源。例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph...
- State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...
启用和配置
Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment
对象调用 enableCheckpointing(n)
启用 Checkpoint,其中n
是以毫秒为单位的 Checkpoint 间隔。
Checkpoint 的配置项包括:
-
恰好一次(exactly-once)或至少一次(at-least-once):Checkpoint 支持这两种模式。对于大多数应用来说,恰好一次是优选的。至少一次可能在某些要求超低延迟(几毫秒)的应用程序使用。
-
Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则中止正在进行的 checkpoint。
-
Checkpoint 最小间隔时间(毫秒):如果设置为5000,表示在上一个 checkpoint 完成后的至少5秒后才会启动下一个 checkpoint,不论 checkpoint 的持续时间和间隔是多少。即使 checkpoint 间隔永远不会小于此参数。是为了保证 checkpoint 之间能够完成一定量的数据处理工作。
配置 time between checkpoint 相比配置 checkpoint interval 通常更容易。因为 checkpoint 耗时有时会明显比平时更长,time between checkpoint 更不容易收到影响(例如,目标存储系统临时性的响应缓慢)
这个值还意味着并发 checkpoint 的数量是一个
-
Checkpoint 并发数:默认情况下,当一个 checkpoint 处于运行状态时,系统不会触发另一个 checkpoint。确保整个拓扑结构不会花费太多时间用于 checkpoint。该设置可以设置多个重叠的 checkpoint,特点的场景可能会需要。
当设置 time between checkpoint 时,不能使用此配置。
-
外部 checkpoint:可以配置在系统外部持久化 checkpoint。Checkpoint 信息写入外部持久存储,在作业失败时不会自动清除,因此作业失败时可以用来恢复。
-
Checkpoint 出错时,任务状态:决定了如果在 checkpoint 过程中发生错误,当前任务是否将失败或继续执行。默认会任务失败。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// 启用 checkpoint 间隔 1000 ms
env.enableCheckpointing(1000)
// 高级选项:
// 设置 exactly-once 模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置 checkpoint 最小间隔 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// 设置 checkpoint 必须在1分钟内完成,否则会被丢弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 设置 checkpoint 失败时,任务不会 fail,该 checkpoint 会被丢弃
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
// 设置 checkpoint 的并发度为 1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
相关配置
更多相关参数可以通过 conf/flink-conf.yaml
全局配置
配置项 | 默认值 | 描述 |
---|---|---|
state.backend | (none) | 选择 state backend 实现 |
state.backend.async | true | state backend 使用异步方法。有些不支持异步,或者仅支持异步的可并忽略此选项 |
state.backend.fs.memory-threshold | 1024 | 存储 state 数据文件的最小规模,如果小于该值则会存储在 root checkpoint metadata file |
state.backend.incremental | false | 是否采用增量 checkpoint,有些不支持增量的可并忽略此选项 |
state.backend.local-recovery | false | |
state.checkpoints.dir | (none) | 用于指定 checkpoint 数据存储目录,目录必须对所有参与的 TaskManagers 和 JobManagers 可见 |
state.checkpoints.num-retained | 1 | 指定保留已完成的 checkpoint 数量 |
state.savepoints.dir | (none) | 用于指定 savepoint 数据存储目录 |
taskmanager.state.local.root-dirs | (none) |
选择 State backend
Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。
默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。Flink 支持在其他 state backend 中存储 State 和 Checkpoint。可以通过如下方法配置:StreamExecutionEnvironment.setStateBackend(…)
,下面有更详细的介绍。
迭代任务中使用
Flink 目前仅为没有迭代的作业提供处理保证。在迭代作业上启用 checkpoint 会导致异常。为了强制对迭代程序执行 checkpoing,需要设置一个特殊标志:env.enableCheckpointing(interval, force = true)
。
在失败期间,处在循环边界的记录(以及与相关的 State 变化)将丢失。
State backend
Flink 提供了不同的 State backend,支持不通的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml
指定的选项,也可以在每个作业中设置来覆盖默认选项:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
Flink 自带了以下几种开箱即用的 state backend:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
在没有配置的情况下,系统默认使用 MemoryStateBackend
三种 State backend 介绍
MemoryStateBackend
使用 MemoryStateBackend
,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。
MemoryStateBackend
的限制:
- 单个 State 的大小默认限制为5MB,可以在
MemoryStateBackend
的构造函数中增加。 - 不论如何配置,State 大小都无法大于
akka.framesize
(JobManager 和 TaskManager 之间发送的最大消息的大小) - JobManager 必须有足够的内存大小
MemoryStateBackend
适用以下场景:
- 本地开发和调试
- 只持有很小的状态,如方法:Map、FlatMap、Filter... 或 Kafka Consumer
FsStateBackend
FsStateBackend
需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。
FsStateBackend
在 TaskManager 的内存中持有正在处理的数据。Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。
FsStateBackend
默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend
构造函数中设置:
new FsStateBackend(path, false);
FsStateBackend
适用以下场景:
- State 较大,窗口时间较长和 key/value 较大的 State
- 所有高可用性的情况
RocksDBStateBackend
RocksDBStateBackend
需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。
RocksDBStateBackend
在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。
RocksDBStateBackend
通常也是异步的。
RocksDBStateBackend
的限制:
RocksDB JNI API 是基于 byte[],因此 key 和 value 最大支持大小为2^31 个字节。RocksDB 自身在支持较大 value 时候有一些问题。
RocksDBStateBackend
与 FsStateBackend
同样适用以下场景:
- State 较大,窗口时间较长和 key/value 较大的 State
- 所有高可用性的情况
- 目前唯一支持增量 checkpoint。
与前两者相比(处理状态下的 State 还是保存在内存中),使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。
性能比较
Flink 支持 Standalone 和 on Yarn 的集群部署模式,以 Windowed Word Count 处理为例测试三种 State backends 在不通集群部署上的性能差异(来源:美团 Flink _Benchmark)
Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。
不同 State backend 吞吐量对比
Throughput- 使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
- Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。
不同 State backend 延迟对比
Latency- 使用 FileSystem 和 Memory 时延迟基本一致且较低。
- 使用 RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。
State backend 的选择
StateBackend | in-flight | checkpoint | 吞吐 | 推荐使用场景 |
---|---|---|---|---|
MemoryStateBackend | TM Memory | JM Memory | 高 | 调试、无状态或对数据丢失或重复无要求 |
FsStateBackend | TM Memory | FS/HDFS | 高 | 普通状态、窗口、KV 结构 |
RocksDBStateBackend | RocksDB on TM | FS/HDFS | 低 | 超大状态、超长窗口、大型 KV 结构 |
Reference:
https://flink.xskoo.com/dev/stream/state/checkpointing.html
https://tech.meituan.com/Flink_Benchmark.html