flinkflink

Tuning Checkpoints and Large Sta

2018-03-10  本文已影响174人  小C菜鸟

原文链接


本页面提供了如何配置和优化使用大型状态的应用程序的指南。

概述

为了让Flink应用程序在大规模集群上可靠的运行,必须满足两个条件:

第一部分讨论如何更好地执行checkpoint。最后一部分展示了一些关于计划分配多少资源的最佳实践。

监控状态和检查点

监控checkpoint行为的最简单方法是通过UI的checkpoint部分。checkpoint监控文档展示了如何访问可用的checkpoint指标。

当增加checkpoint时,特别感兴趣的两个数字是:

`checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration`

 如果触发checkpoint的延迟时间总是非常高时,表明**checkpoint barriers**需要很长时间才能从Source到达操作符。这通常表明系统在一个恒定的反压力下运行。

注意:当出现瞬时的反压力,数据倾斜,或网络问题,这些数字会偶尔增高。然而,如果这些数字一直很高,这就意味着Flink将许多资源放入了checkpoint。

优化Checkpoint

Checkpoint是根据应用配置的间隔定期触发的。当执行Checkpoint需要的时间比Checkpoint间隔更长时,下一个Checkpoint在当前执行中的Checkpoint完成之前不会触发。默认情况下,当前正在执行的Checkpoint完成之后会立即触发下一个Checkpoint。

当Checkpoint完成的时间通常比间隔时间还要长(例如,因为状态比预期的要大,或者存储Checkpoint的速度突然变慢),系统就会不断的产生新的Checkpoint(新的Checkpoint在完成之后立即产生)。这意味着太多的资源被绑定在Checkpoint上,而操作符很少进行处理。这种行为对使用异步Checkpoint状态的流应用可能影响较小,但仍然可能会影响总体应用程序性能。

为了防止这种情况,应用可以定义一个Checkpoint最小间隔时间

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

这个间隔是Checkpoint完成之后和新的Checkpoint开始之间的最小间隔时间。下图演示了它对检查点的影响。

checkpoint_tuning.jpg

注意:可以配置应用程序(通过CheckpointConfig),允许在进程中同时存在多个Checkpoint。对于具有较大状态的Flink应用程序,这通常会将过多的资源绑定到Checkpoint。当一个savepoint被手动触发时,它可能与正在进行的Checkpoint同时进行。

优化网络缓冲

在Flink 1.3 之前,网络缓冲数量的增加也导致了Checkpoint时间的增加,因为保留更多的in-flight数据意味着Checkpoint barrier被延迟了。从Flink 1.3 开始,每个输出/输入通道使用的网络缓冲数量是受限的,因此网络缓冲可以配置为不影响Checkpoint时间(请参见网络缓冲配置)。

尽可能进行异步Checkpoint

当状态进行异步快照时,Checkpoint的性能要比同步快照好。特别是在具有多个join, Co-functions或者窗口的越复杂的流应用中,这可能会产生深远的影响。

为了使状态被异步快照,应用程序必须做两件事:

  1. 使用由Flink管理的状态: 状态托管意味着Flink提供状态存储的数据结构。当前,keyed state是这样的,它是接口ValueState, ListState, ReducingState背后的抽象。

  2. 使用支持异步快照的状态后端。在Flink 1.2中,只有RocksDB状态后端使用完全异步快照。从Flink 1.3 开始,基于堆的状态后端也支持异步快照。

上述两点意味着,大状态通常应该保存为keyed状态,而不是operator状态。

优化RocksDB

许多大型FLink流式应用的状态存储使用的RocksDB State Backend。它的规模远远超出主内存,并且能够可靠的存储大keyed状态

不幸的是,RocksDB的性能可能因配置而异,而且几乎没有关于如何正确地调优RocksDB的文档。例如,默认的配置是针对SSD的,在机械磁盘上执行次级的优化。

增量式的Checkpoint

相对于全量式的Checkpoint,增量式的Checkpoint能够显著的减少Checkpoint的时间,而代价(潜在的)是更长的恢复时间。核心思想是,增量式的Checkpoint只记录对之前已完成的Checkpoint的变更,而不是一个完整的,自包含的状态后端备份。就像这样,增量式的Checkpoint基于前一个Checkpoint。Flink利用了RocksDB内部的备份机制,该机制随着时间的推移而自我整合。因此,Flink中的增量式Checkpoint不会无限期的增长,旧的Checkpoint最终会被自动的整合和删除。

虽然我们强烈建议在大型状态下使用增量式Checkpoint,但是请注意,这是一个新特性,并且当前默认是不开启的。要开启这个特征,用户可以在初始化 RocksDBStateBackend 时将构造函数中相应的布尔标识设为 true,例如:

    RocksDBStateBackend backend =
        new RocksDBStateBackend(filebackend, true);

向RocksDB传递选项

RocksDBStateBackend.setOptions(new MyOptions());

public class MyOptions implements OptionsFactory {

    @Override
    public DBOptions createDBOptions() {
        return new DBOptions()
            .setIncreaseParallelism(4)
            .setUseFsync(false)
            .setDisableDataSync(true);
    }

    @Override
    public ColumnFamilyOptions createColumnOptions() {

        return new ColumnFamilyOptions()
            .setTableFormatConfig(
                new BlockBasedTableConfig()
                    .setBlockCacheSize(256 * 1024 * 1024)  // 256 MB
                    .setBlockSize(128 * 1024));            // 128 KB
    }
}

预定义选项

Flink对于不同的设置,为RocksDB提供了一些预定义的选项集合,它们可以通过如下示例进行设置RocksDBStateBacked.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM).

我们希望随着时间的推移可以积累更多的文档。当您发现一组可以很好的工作并且似乎代表某些工作负载的选项时,可以自由地提供这些预定义的选项概要文件。

重要: RocksDB是一个本地库,它的内存不是来自JVM,而是处理器的本地内存。分配给RocksDB的任何内存都必须考虑在内,通常是通过减少TaskManager的相同数量的JVM内存。不这样做的结果是,可能会导致YARN/Mesos/等终止JVM进程来分配比配置更多的内存。

容量评估

本节讨论为了使Flink作业可靠的运行,应该如何决定为作业分配多少资源。容量评估的基本原则是:

重要: 为了便于以后添加资源,请确保将数据流程序的最大并行度设置为一个合理的数字。最大的并行度定义了在重新缩放程序(通过保存点)时,可以设置程序并行度的程度。

Flink内部的bookkeeping跟踪在max-parallelism-many key groups粒度下的并行状态。Flink的设计努力使它的效率高到最大的并行度,即使是执行低并行度的程序。

压缩

Flink为所有的Checkpoint和savepoint提供了压缩选项(默认:关闭)。当前,压缩总是使用snappy压缩算法(version 1.1.4),但是我们计划在未来支持自定义的压缩算法。在keyed状态下,压缩工作在key-groups的粒度下,即,每个key-group能被单独的压缩,这对于弹性部署是很重要的。

可以通过ExecutionConfig激活压缩:

        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(true);

注意: 压缩选项对增量快照没有影响,因为它们使用的是RocksDB的内部格式,它总是使用box的snapp压缩。

上一篇下一篇

猜你喜欢

热点阅读