Flink Checkpoint 和 Large State 调
Overview
为了使 Flink 应用程序能够可靠地大规模运行,必须满足两个条件:
-
应用程序需要能够可靠地获取 Checkpoint
-
在发生故障后,需要足够的资源追上(catch up)输入数据流
监控 State 和 Checkpoint
监控 Checkpoint 行为的最简单方法是通过 WebUI 界面。有两个 Checkpoint Metric 最值得关注的是:
- 当触发 checkpoint 的时间一直很高时,Operator 收到第一个 checkpoint barrier 的时间一直很高,这意味着 checkpoint barriers 需要很长时间才能从 Source 到 Operator。这通常表明系统在恒定背压(backpressure)下工作。
- 对齐持续时间。在 Exactly-once 语义下,有多个输入的 Operator,已经接收到 barrier 的通道将被阻止接收进一步的数据,直到所有剩余的通道赶上并接收到它们的 barrier 的持续时间。
理想情况下,这两个值都应该是低值,持续出现较高的值意味着 checkpoint barrier 在 job graph 中缓慢移动,通常是由于 backpressure 存在(没有足够的资源来处理记录)。也可以通过增加处理记录的端到端延迟来观察。
调整 Checkpoint
应用程序可以配置固定时间间隔触发 checkpoint。当一个 checkpoint 的完成时间长于固定间隔时,在进行中的 checkpoint 完成之前不会触发下一个(默认情况下,下一个 checkpoint 将在正在进行的 checkpoint 完成后立即触发)。
当 checkpoint 结束的时间经常超过固定间隔时,系统会不断地触发 checkpoint(完成后立即启动新)。这可能意味着在两个 checkpoint 之间,Operator 处理进展过少,并且 checkpoint 占用了过多的资源。此行为对使用异步 checkpoint 的流应用程序的影响较小,但仍可能对整体应用程序性能产生影响。
为了防止这种情况,应用程序可以定义一个 checkpoint 的最小间隔(在最新 checkpoint 结束和下一个 checkpoint 开始前必须经过的最小时间间隔。):
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
下图说明了这是如何影响 checkpoint 的,避免了 checkpoint 持续不断的进行。
可以配置应用程序允许同时进行多个 checkpoint。当手动触发 savepoint 时,可能与正在进行的 checkpoint 同时进行。
调整 RocksDB
许多大规模 Flink 流计算应用程序的 State 存储使用的是 RocksDB state Backend。扩展性远远超过主内存,并可靠地存储大的 keyed state。
RocksDB 的性能会因配置而异,下面介绍一些使用 RocksDB state Backend 的最佳实践。
增量 Checkpoint
在减少 checkpoint 所需时间方面,开启增量 checkpoint 应该是首要考虑因素之一。与完全 checkpoint 相比,增量 checkpoint 可以显著减少时间,因为只记录与前一次完成的 checkpoint 相比所做的更改。
Timer 存储选择
定时器(Timer)默人存储在 RocksDB 中,当 Job 只有很少的 Timer 时,放在堆上存储可以提高性能。
请小心使用此功能,因为基于堆的 Timer 可能会增加 checkpoint 时间,并且无法在内存之外扩展。
调整 RocksDB 内存
RocksDB State Backend 的性能在很大程度上取决于其可用的内存量。为了提高性能,增加内存会有很大帮助,或者调整内存使用。
默认情况,RocksDB State Backend 使用 Flink 托管内存用于 RocksDBs buffer 和 cache(state.backend.rocksdb.memory.managed: true
)。 要调整与内存相关的性能问题,以下步骤可能会有所帮助:
-
增加托管内存的大小,这通常会改善很多情况,并且不会增加调优 RocksDB 底层配置的复杂性。
特别是对于大 Container/进程大小,除非应用程序逻辑本身需要大量 JVM 堆内存,否则总内存中的大部分通常都可以放到 RocksDB 使用(默认的托管内存比例 0.4 是保守的)。
-
RocksDB 中 write buffer 的数量取决于应用程序中的 State 数量。每个 State 对应一个 ColumnFamily(需要独立的 write buffer)。因此,具有大量 State 的应用程序通常需要更多内存才能获得相同的性能。
-
通过设置
state.backend.RocksDB.memory.managed:false
,可以尝试比较 RocksDB with managed memory 和 RocksDB with per column family memory 的性能。不使用托管内存意味着 RocksDB 按照应用程序中的 State 数量按比例分配内存。
-
如果应用程序有大量状态,并且频繁的 MemTable 刷新(写入端瓶颈),如果不能提供更多内存,那么可以增加进入写入缓冲区的内存比率(
state.backend.rocksdb.memory.write buffer ratio
)。 -
一个高级选项(面向 RocksDB 专家)可以减少具有许多状态的设置中的 MemTable 刷新次数,是通过 RocksDBOptionsFactory 调整 RocksDB 的 Columnfamily 设置
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// 当一个 Operator 中有多个状态时,增加后台最大刷新线程数
// 这意味着在一个 RocksDB 实例中会有多个 Columnfamily
return currentOptions.setMaxBackgroundFlushes(4);
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// 将 arena 块大小从默认的8MB减少到1MB。
return currentOptions.setArenaBlockSize(1024 * 1024);
}
@Override
public OptionsFactory configure(Configuration configuration) {
return this;
}
}
容量规划
本节讨论如何决定一个 Flink 作业应该使用多少资源才能可靠地运行。容量规划的基本经验法则是:
- 正常操作应具有足够的容量,以避免在恒定背压下操作。
- 在常规无背压运行程序所需的资源之上提供一些额外的资源。用来在应用程序恢复时快速处理恢复期间积累的输入数据,这取决于恢复操作通常需要多长时间(取决于故障转移时需要加载到新 TaskManager 中的状态的大小)以及要求故障恢复的速度。
- 暂时的背压通常是可以接受的,在负载峰值期间、Catchup 阶段或外部系统出现临时响应慢时。
- 某些操作(如大型窗口)会导致其下游操作符的负载存在毛刺(spiky):在构建窗口时,下游 Operator 可能是空闲的,在发出窗口数据时,下游才开始工作。下游并行性的规划需要考虑窗口发出的量以及处理这种峰值的速度。
压缩
Flink 为所有 checkpoint 和 savepoint 提供可选的压缩(默认值:off)。目前,压缩总是使用 snappy compression algorithm(version 1.1.4) 但计划在未来支持自定义压缩算法。压缩的粒度是 keyed state 的 key-group,每个 key-group 可以单独压缩,这对于缩放程序非常重要。
压缩可以通过 ExecutionConfig
开启
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
压缩选项对增量快照(RocksDB)没有影响。
任务本地恢复
Motivation
在 Flink 的 checkpoint 中,每个 Task 都会生成一个 State snapshot,然后将其写入分布式存储。每个 Task 通过发送一个描述 State 在分布式存储中的位置的句柄来确认 State 成功写入 JobManager。JobManager 依次从所有 Task 收集句柄,并将绑定到到 checkpoint 对象中。
在恢复的情况下,JobManager 打开最新的 checkpoint 对象并将句柄发送回相应的 Task,然后这些 Task 可以从分布式存储中恢复 State。使用分布式存储来存储 State 有两个重要的优点。首先,存储是容错的,其次,分布式存储中的所有 State 对所有节点都是可访问的,并且可以很容易地重新分配(例如,用于重新缩放)。
然而,使用远程分布式存储也有一个很大的缺点:所有 Task 都必须通过网络从远程位置读取其状态。在一些情况下,恢复可以将 Task 重新安排到与上一次运行相同的 TaskManager 中,但仍然要读取远程状态。这可能会导致大状态的恢复时间长。
Approach
任务本地 State 恢复是针对这一类问题,主要思想如下:对于每个 checkpoint,每个 Task 不仅将 State snapshot 写入分布式存储,而且还将 state snapshot 的辅助副本保存在该 Task 所在的本地存储中(例如,本地磁盘或内存中)。State 的主存储必须仍然是分布式存储,因为本地存储不能确保节点故障下的持久性,也不能为其他节点提供重新分发 State 的访问。
对于每个可以重新安排到上一个位置进行恢复的 Task,可以从本地辅助副本恢复 State,并避免远程读取的开销。考虑到许多故障不是节点故障,节点故障通常一次只影响一个或极少数节点,在恢复过程中,大多数 Task 很可能返回到其以前的位置,并发现其本地 State 完好无损,可以有效地缩短恢复时间。
需要注意的是,根据所选的 state backend 和 checkpoint 策略,在创建和存储本地辅助副本时,每个 checkpoint 可能需要一些额外的成本。在大多数情况下,实现只需将对分布式存储的写入复制到本地文件。
image.png主副本和辅助副本的关系
-
对于 checkpoint,主副本必须成功并且生成辅助本地副本失败不会使 checkpoint 失败。如果无法创建主副本,即使已成功创建辅助副本,checkpoint 也被认为失败。
-
只有主副本由 JobManager 确认和管理。辅助副本由 TaskManager 拥有,生命周期可以独立于主副本。例如,可以将 3 个最新 checkpoint 的历史记录保留为主副本,并且只保留最新 checkpoint 的本地副本。
-
对于恢复,如果有匹配的辅助副本可用,Flink 将始终尝试从任务本地 State 先还原。如果在从辅助副本恢复期间出现任何问题,Flink 将透明地重试,从主副本恢复。仅当主副本和(可选)辅助副本都恢复失败时,恢复才会失败。
-
任务本地副本可能只包含完整 State 的一部分(例如,写入本地文件时出现异常)。在这种情况下,Flink 将首先尝试在本地恢复本地部分,无法恢复的 State 是从主副本恢复的。
-
任务本地副本可以具有与主副本不同的格式。
-
如果 TaskManager 丢失,则其所有任务的本地副本都将丢失。
配置任务本地恢复
任务本地恢复在默认情况下是停用的,可以通过 Flink 的配置开启(state.backend.local-recovery
指定为 false 或 true,还可以在 Job 上设置 CheckpointingOptions.LOCAL_RECOVERY
)。
Allocation-preserving scheduling
任务本地恢复假设在失败情况下保持分配的 Task 调度,其原理如下:每个 Task 都会记住之前分配的 Slot,在恢复过程中会请求完全相同的 Slot 进行重启。如果 Slot 不可用,任务将从 Resource Manager 请求一个全新的 Slot。
如果一个 TaskManager 不再可用,则之前分配该 TaskManager 上的 Task 必须在其他的 TaskManager 上运行,但是不会让其他可以在原 Slot 上恢复的 Task 改变位置。在这种策略下,会让尽可能多的 Task 在原 Slot 上启动,并从本地恢复 State。