程序员

Flink实战(11)-Exactly-Once语义之两阶段提交

2023-11-23  本文已影响0人  JavaEdge

0 大纲

[Apache Flink]2017年12月发布的1.4.0版本开始,为流计算引入里程碑特性:TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持:

包括Apache Kafka 0.11及更高版本。它提供抽象层,用户只需实现少数方法就能实现端到端Exactly-Once语义。

新功能及Flink实现逻辑:

1 Flink应用中的Exactly-Once语义

Exactly-Once,指每个输入的事件只影响最终结果一次。即使机器或软件故障,既没有重复数据,也不会丢数据。

Flink很久就提供Exactly-Once,checkpoint机制是Flink有能力提供Exactly-Once语义的核心。

一次checkpoint是以下内容的一致性快照:

Flink可配置一个固定时间点,定期产生checkpoint,将checkpoint的数据写入持久存储系统,如S3或HDFS。将checkpoint数据写入持久存储是异步,即Flink应用程序在checkpoint过程中可以继续处理数据。

如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。

Flink 1.4.0前,Exactly-Once语义仅限Flink应用程序内部,没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互,开发人员自己维护组件上下文保证Exactly-Once语义。

为提供端到端的Exactly-Once语义 – 即除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制协调。

分布式系统中,协调提交和回滚的常用方法是2pc协议。讨论Flink的TwoPhaseCommitSinkFunction如何利用2pc提供端到端的Exactly-Once语义。

2 Flink应用程序端到端的Exactly-Once语义

Kafka经常与Flink使用。Kafka 0.11版本添加事务支持。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要支持。

Flink对端到端的Exactly-Once语义的支持不仅局限Kafka,可将它与任何一个提供必要的协调机制的源/输出端一起使用。如Pravega,来自DELL/EMC的开源流媒体存储系统,通过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。

image-20231124142310942

示例程序有:

要使数据输出端提供Exactly-Once保证,须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写数据。这确保在故障时,能回滚写入的数据。但分布式系统中,通常有多个并发运行的写入任务,所有组件须在提交或回滚时“一致”才能确保一致结果。Flink使用2PC及预提交阶段解决这问题。

pre-commit

checkpoint开始时,即2PC的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。

brarrier在operator之间传递。对每个operator,它触发operator的状态快照写入state backend。

数据源保存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一operator。

这种方式仅适用于operator具有『内部』状态。

内部状态

指Flink state backend保存和管理的。如第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态时,除了在checkpoint前需将数据变更写入state backend,无需在pre-commit阶段执行其他操作。

Flink负责在checkpoint成功时正确提交这些写入或故障时中止这些写入。

image-20231124150402626

3 Flink应用启动pre-commit阶段

当进程具有『外部』状态,需额外处理。外部状态通常以写入外部系统(如Kafka)的形式出现。此时,为提供Exactly-Once保证,外部系统须【支持事务】,才能和两阶段提交协议集成。

示例数据需写入Kafka,因此数据输出端(Data Sink)有外部状态。此时,在预提交阶段:

当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。若故障,可回滚到上次成功完成快照的时间点。

下一步是通知所有operator,checkpoint已经成功了。这是2PC的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回调。

数据源和 widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。

总结

因此,我们可以确定所有operator都同意checkpoint的最终结果:所有operator都同意数据已提交,或提交被中止并回滚。

4 在Flink中实现两阶段提交Operator

完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。

接下来基于输出到文件的简单示例,说明如何使用TwoPhaseCommitSinkFunction。用户只需要实现四个函数,就能为数据输出端实现Exactly-Once语义:

我们知道,如果发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是,预提交成功了,但在这次commit的通知到达operator之前发生了故障。在这种情况下,Flink会将operator的状态恢复到已经预提交,但尚未真正提交的状态。

我们需要在预提交阶段保存足够多的信息到checkpoint状态中,以便在重启后能正确的中止或提交事务。在这个例子中,这些信息是临时文件和目标目录的路径。

TwoPhaseCommitSinkFunction已经把这种情况考虑在内了,并且在从checkpoint点恢复状态时,会优先发出一个commit。我们需要以幂等方式实现提交,一般来说,这并不难。在这个示例中,我们可以识别出这样的情况:临时文件不在临时目录中,但已经移动到目标目录了。

在TwoPhaseCommitSinkFunction中,还有一些其他边界情况也会考虑在内,请参考Flink文档了解更多信息。

FAQ

flink sink在如果过来一个checkpoint barrier,会去存储state,这个动作会和普通的write并行吗?还是串行?

在Flink的checkpoint机制中,当一个Checkpoint Barrier过来时,sink会触发对状态的snapshot,这个snapshot动作默认是和普通的write操作并行进行的。

具体来说:

但是也可以通过Sink的配置来设置snapshot和write的执行策略,主要有两种模式:

  1. 并行模式(默认):snapshot和write同时进行

  2. 串行模式:snapshot完成后再进行write

综上,Flink sink在默认的并行checkpoint模式下,状态snapshot和普通的write操作是并行执行的。可以通过配置来改变其行为。这样可以根据实际需要进行平衡。

总结

本文由博客一文多发平台 OpenWrite 发布!

上一篇 下一篇

猜你喜欢

热点阅读