Flink State、CheckPoint与Savepoint

2021-03-23  本文已影响0人  小驴小驴

State

State简述

State分类:

State两种形态:

推荐使用托管状态,因为如果使用托管状态,当并行度发生改变时,Flink 可以自动的帮你重分配 state,同时还可以更好的管理内存。

分配策略:

下面代码片对CheckpointedFunction使用案例:

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                //将数据发到外部系统
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

ListCheckpointed
是一种受限的 CheckpointedFunction,只支持 List 风格的状态和 even-spit 的重分配策略。

State状态存储

第一次 Checkpoint 时生成的状态快照信息包含了两个 sstable 文件:sstable1 和 sstable2 及 Checkpoint1 的元数据文件 MANIFEST-chk1,所以第一次 Checkpoint 时需要将 sstable1、sstable2 和 MANIFEST-chk1 上传到外部持久化存储中。第二次 Checkpoint 时生成的快照信息为 sstable1、sstable2、sstable3 及元数据文件 MANIFEST-chk2,由于 sstable 文件的不可变特性,所以状态快照信息的 sstable1、sstable2 这两个文件并没有发生变化,sstable1、sstable2 这两个文件不需要重复上传到外部持久化存储中,因此第二次 Checkpoint 时,只需要将 sstable3 和 MANIFEST-chk2 文件上传到外部持久化存储中即可。这里只将新增的文件上传到外部持久化存储,也就是所谓的增量 Checkpoint。

基于 LSM Tree 实现的数据库为了提高查询效率,都需要定期对磁盘上多个 sstable 文件进行合并操作,合并时会将删除的、过期的以及旧版本的数据进行清理,从而降低 sstable 文件的总大小。图中可以看到第三次 Checkpoint 时生成的快照信息为sstable3、sstable4、sstable5 及元数据文件 MANIFEST-chk3, 其中新增了 sstable4 文件且 sstable1 和 sstable2 文件合并成 sstable5 文件,因此第三次 Checkpoint 时只需要向外部持久化存储上传 sstable4、sstable5 及元数据文件 MANIFEST-chk3。

基于 RocksDB 的增量 Checkpoint 从本质上来讲每次 Checkpoint 时只将本次 Checkpoint 新增的快照信息上传到外部的持久化存储中,依靠的是 LSM Tree 中 sstable 文件不可变的特性。对 LSM Tree 感兴趣的同学可以深入研究 RocksDB 或 HBase 相关原理及实现。

CheckPoint

CheckPoint需要的先决条件:

CheckPoint如何配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
env.enableCheckpointing(1000);

// Checkpoint 语义设置为 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// CheckPoint 的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间,只允许 有 1 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 两次 Checkpoint 之间的最小时间间隔为 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

// Checkpoint 失败后,整个 Flink 任务也会失败(flink 1.9 之前)
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(true)

Checkpoint与SavePoint的区别

image.png

Operate UID的重要性
如何为算子指定UID

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

CheckPoint流程

  1. JobManager中CheckPointCoordinator 会定期向所有 SourceTask 发送 CheckPointTrigger,Source Task 会在数据流中安插 Checkpoint barrier


    image.png

2.当 task 收到上游所有实例的 barrier 后,向自己的下游继续传递 barrier,然后自身同步进行快照,并将自己的状态异步写入到持久化存储中
   - 如果是增量 Checkpoint,则只是把最新的一部分更新写入到外部持久化存储中;
   - 为了下游尽快进行 Checkpoint,所以 task 会先发送 barrier 到下游,自身再同步进行快照;


image.png
  1. 当 task 将状态信息完成备份后,会将备份数据的地址(state handle)通知给 JobManager 的CheckPointCoordinator,如果 Checkpoint 的持续时长超过了 Checkpoint 设定的超时时间CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator 就会认为本次 Checkpoint 失败,会把这次 Checkpoint 产生的所有状态数据全部删除

  2. 如果 CheckPointCoordinator 收集完所有算子的 State Handle,CheckPointCoordinator 会把整个 StateHandle 封装成 completed Checkpoint Meta,写入到外部存储中,Checkpoint 结束


    image.png

Flink Exactly Once

在Flink中的Exactly Once有两种层面,一种是Flink CheckPoint处,还有一处是研究端到端的,因此读者有必要在学习之前先熟悉它的两个层面。

CheckPoint处的Exactly Once
在上述文章中,我们知道了CheckPoint的流程,但此时需要了解Barrier机制。当Exactly Once中Task在收到来自Source的Barrier时,如果停下手中的数据处理任务并等待所有Source的Barrier,与此同时先把数据保存在缓存中,待所有Barrier收集完毕之后再去做CheckPoint,那么这种方式就称呼为Barrier对齐。如果不停下手中的任务(不去等待收集到所有Source实例的Barrier),等到收集到所有的Source Barrier再去做CheckPoint,那么这种方式就被称呼为Barrier不对齐,就会产生整个系统的At least once。在配置上来看是否配置的ExactlyOnce就是配置了Barrier是否对齐。

端到端的ExactlyOnce
仔细思考CheckPoint的机制就会发现,CheckPoint的ExactlyOnce并不能保证数据端到端的ExactlyOnce,比如说Sink在CHK100 ~ CHK101之间挂了,但是最近的一次成功CheckPoint为CHK100,在这期间以及写入的Sink数据无法被删除,那么端到端的就会退化成At least once。
那么如何解决端到端的Exactly Once呢,有两种方案:

补充:
在FlinkKafkaConsumer中分区分配原则是使用assign的方式实现;

在assign中使用的分区分配其实是使用的是round-robin的策略;

当并行度改变时,FlinkKafka使用的是UnionSplit的策略方式,因为使用event-split可能获取到的State和分区得到的Partition并不匹配;

在恢复时,若发现Partition不在内存中的TopPartition --> Offset映射关系中,则让当前的Partition从EARLIEST处开始消费。

上一篇 下一篇

猜你喜欢

热点阅读