Spark & FlinkFlink1.13spark||flink||scala

Flink Checkpoint 和 Large State 调

2021-06-21  本文已影响0人  Alex90

Overview

为了使 Flink 应用程序能够可靠地大规模运行,必须满足两个条件:

监控 State 和 Checkpoint

监控 Checkpoint 行为的最简单方法是通过 WebUI 界面。有两个 Checkpoint Metric 最值得关注的是:

理想情况下,这两个值都应该是低值,持续出现较高的值意味着 checkpoint barrier 在 job graph 中缓慢移动,通常是由于 backpressure 存在(没有足够的资源来处理记录)。也可以通过增加处理记录的端到端延迟来观察。

调整 Checkpoint

应用程序可以配置固定时间间隔触发 checkpoint。当一个 checkpoint 的完成时间长于固定间隔时,在进行中的 checkpoint 完成之前不会触发下一个(默认情况下,下一个 checkpoint 将在正在进行的 checkpoint 完成后立即触发)。

当 checkpoint 结束的时间经常超过固定间隔时,系统会不断地触发 checkpoint(完成后立即启动新)。这可能意味着在两个 checkpoint 之间,Operator 处理进展过少,并且 checkpoint 占用了过多的资源。此行为对使用异步 checkpoint 的流应用程序的影响较小,但仍可能对整体应用程序性能产生影响。

为了防止这种情况,应用程序可以定义一个 checkpoint 的最小间隔(在最新 checkpoint 结束和下一个 checkpoint 开始前必须经过的最小时间间隔。):

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

下图说明了这是如何影响 checkpoint 的,避免了 checkpoint 持续不断的进行。

可以配置应用程序允许同时进行多个 checkpoint。当手动触发 savepoint 时,可能与正在进行的 checkpoint 同时进行。

调整 RocksDB

许多大规模 Flink 流计算应用程序的 State 存储使用的是 RocksDB state Backend。扩展性远远超过主内存,并可靠地存储大的 keyed state

RocksDB 的性能会因配置而异,下面介绍一些使用 RocksDB state Backend 的最佳实践。

增量 Checkpoint

在减少 checkpoint 所需时间方面,开启增量 checkpoint 应该是首要考虑因素之一。与完全 checkpoint 相比,增量 checkpoint 可以显著减少时间,因为只记录与前一次完成的 checkpoint 相比所做的更改。

Timer 存储选择

定时器(Timer)默人存储在 RocksDB 中,当 Job 只有很少的 Timer 时,放在堆上存储可以提高性能。

请小心使用此功能,因为基于堆的 Timer 可能会增加 checkpoint 时间,并且无法在内存之外扩展。

调整 RocksDB 内存

RocksDB State Backend 的性能在很大程度上取决于其可用的内存量。为了提高性能,增加内存会有很大帮助,或者调整内存使用。

默认情况,RocksDB State Backend 使用 Flink 托管内存用于 RocksDBs buffer 和 cache(state.backend.rocksdb.memory.managed: true)。 要调整与内存相关的性能问题,以下步骤可能会有所帮助:

public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {

    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        // 当一个 Operator 中有多个状态时,增加后台最大刷新线程数
        // 这意味着在一个 RocksDB 实例中会有多个 Columnfamily
        return currentOptions.setMaxBackgroundFlushes(4);
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(
        ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        // 将 arena 块大小从默认的8MB减少到1MB。
        return currentOptions.setArenaBlockSize(1024 * 1024);
    }

    @Override
    public OptionsFactory configure(Configuration configuration) {
        return this;
    }
}

容量规划

本节讨论如何决定一个 Flink 作业应该使用多少资源才能可靠地运行。容量规划的基本经验法则是:

压缩

Flink 为所有 checkpoint 和 savepoint 提供可选的压缩(默认值:off)。目前,压缩总是使用 snappy compression algorithm(version 1.1.4) 但计划在未来支持自定义压缩算法。压缩的粒度是 keyed state 的 key-group,每个 key-group 可以单独压缩,这对于缩放程序非常重要。

压缩可以通过 ExecutionConfig 开启

ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);

压缩选项对增量快照(RocksDB)没有影响。

任务本地恢复

Motivation

在 Flink 的 checkpoint 中,每个 Task 都会生成一个 State snapshot,然后将其写入分布式存储。每个 Task 通过发送一个描述 State 在分布式存储中的位置的句柄来确认 State 成功写入 JobManager。JobManager 依次从所有 Task 收集句柄,并将绑定到到 checkpoint 对象中。

在恢复的情况下,JobManager 打开最新的 checkpoint 对象并将句柄发送回相应的 Task,然后这些 Task 可以从分布式存储中恢复 State。使用分布式存储来存储 State 有两个重要的优点。首先,存储是容错的,其次,分布式存储中的所有 State 对所有节点都是可访问的,并且可以很容易地重新分配(例如,用于重新缩放)。

然而,使用远程分布式存储也有一个很大的缺点:所有 Task 都必须通过网络从远程位置读取其状态。在一些情况下,恢复可以将 Task 重新安排到与上一次运行相同的 TaskManager 中,但仍然要读取远程状态。这可能会导致大状态的恢复时间长。

Approach

任务本地 State 恢复是针对这一类问题,主要思想如下:对于每个 checkpoint,每个 Task 不仅将 State snapshot 写入分布式存储,而且还将 state snapshot 的辅助副本保存在该 Task 所在的本地存储中(例如,本地磁盘或内存中)。State 的主存储必须仍然是分布式存储,因为本地存储不能确保节点故障下的持久性,也不能为其他节点提供重新分发 State 的访问。

对于每个可以重新安排到上一个位置进行恢复的 Task,可以从本地辅助副本恢复 State,并避免远程读取的开销。考虑到许多故障不是节点故障,节点故障通常一次只影响一个或极少数节点,在恢复过程中,大多数 Task 很可能返回到其以前的位置,并发现其本地 State 完好无损,可以有效地缩短恢复时间。

需要注意的是,根据所选的 state backend 和 checkpoint 策略,在创建和存储本地辅助副本时,每个 checkpoint 可能需要一些额外的成本。在大多数情况下,实现只需将对分布式存储的写入复制到本地文件。

image.png

主副本和辅助副本的关系

配置任务本地恢复

任务本地恢复在默认情况下是停用的,可以通过 Flink 的配置开启(state.backend.local-recovery 指定为 false 或 true,还可以在 Job 上设置 CheckpointingOptions.LOCAL_RECOVERY)。

Allocation-preserving scheduling

任务本地恢复假设在失败情况下保持分配的 Task 调度,其原理如下:每个 Task 都会记住之前分配的 Slot,在恢复过程中会请求完全相同的 Slot 进行重启。如果 Slot 不可用,任务将从 Resource Manager 请求一个全新的 Slot。

如果一个 TaskManager 不再可用,则之前分配该 TaskManager 上的 Task 必须在其他的 TaskManager 上运行,但是不会让其他可以在原 Slot 上恢复的 Task 改变位置。在这种策略下,会让尽可能多的 Task 在原 Slot 上启动,并从本地恢复 State。


上一篇下一篇

猜你喜欢

热点阅读