flinkflink

Checkpointing

2018-01-20  本文已影响15人  小C菜鸟

原文链接


Flink中的每个函数和操作符都可以是有状态的(详细信息请参阅使用状态)。有状态的函数在单个元素/事件的处理过程中存储数据,使状态成为任何类型的复杂操作的关键部分。
为了使状态容错,Flink需要checkpoint状态。checkpoint允许Flink恢复流中的状态和位置,以使应用获得和无故障运行相同的语义。
文档《Data Streaming Fault Tolerance》描述了Flink的流容错机制背后的技术。

先决条件

Flink的checkpoint机制与流和状态的持久化存储交互。一般来说,它要求:

启用和配置checkpoint

默认情况下,checkpoint是禁用的。调用StreamExecutionEnvironment的enableCheckpointing(n)方法开启checkpoint,参数n是相邻checkpoint之间间隔的毫秒数。
checkpoint的其它参数包括:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

相关配置选项

更多的参数和默认值可以通过conf/flink-conf.yaml(请参阅完整配置)设置。

选择一个状态后端

Flink的checkpoint机制存储计时器和有状态的操作符中的所有状态(包括连接器,窗口和任何用户定义的状态)的一致性快照。checkpoint存储在何处(例如,JobManager memory, file system, database)取决于配置的后端状态
默认情况下,状态保存在TaskManager的内存中,checkpoint存储在JobManager的内存中。为了适配大状态的持久化,Flink支持在其它后端状态中存储和checkpoint状态。可以通过StreamExecutionEnvironment.setStateBackend(…)方法配置后端状态。
更多关于可用后端状态和作业范围和集群范围的配置选项请参阅后端状态

迭代作业中的checkpoint状态

Flink当前仅为没有迭代的作业提供处理保证。在迭代作业中启用checkpoint会导致异常。为了在迭代程序上强制checkpoint,当开启checkpoint时用户需要设置一个特殊的标识env.enableCheckpointing(interval, force = true)。
请注意,当失败时,已经通过循环的记录(和跟它们相关的状态修改)会丢失。

重启策略

Flink支持不同的重启策略,该策略控制在失败时如何重新启动作业。更多的信息,见重启策略

上一篇 下一篇

猜你喜欢

热点阅读