Flink 2.2CheckPoint
CheckPoint
CheckPoint用于flink的故障恢复/容错机制,保存任务的状态(所有任务(source和每个transform)恰好处理完一个相同输入数据的时间点快照),source要支持重放。
CheckPoint实现:
spark:stop the world,暂停应用保存状态到检查点。
flink:分布式快照,异步Checkpoint,将检查点的保存和数据分离开,每个任务分别进行状态保存。
检查点分界线(Checkpoint Barrier):
类似watermark,是一种特殊的数据形式。可以将流数据按Barrier分隔开,当task遇到Barrier触发保存状态,当 Barrier 流动到 Sink,所有的状态都保 存完整了之后,它就形成一个全局的快照。
Checkpoint的传递:
JobManager周期性自动负责发送保存检查点命令到全部并行的子任务;上游task会将Barrier向下游每个子任务广播;
下游的task接收到所有上游多个Barrier都到齐后再保存:对于上游已经到达Barrier的分区,后续到达的数据会缓存,等其上游的全部Barrier都到齐。
Savepoints:
Savepoints即为手动的Checkpoint,用于手动备份,更新程序,版本迁移重启应用等操作。
检查点开启&配置&重启策略:
//默认500ms触发一次
//EXACTLY_ONCE:精确一次
//AT_LEAST_ONCE:至少一次
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
//checkpoint任务超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
//最大同时进行的checkpoint任务,即多个Barrier进入处理流程
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//两次checkpoint之间的空闲时间最小值,设置后上一个配置则为1
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
//设置为0表示不容忍checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
//重启策略:每隔10s尝试三次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L));
//重启策略:每隔1m尝试重启一次共三次,超时十分钟则失败
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10),Time.minutes(1)));
状态一致性分级:
- AT_MOST_ONCE:最多一次
- AT_LEAST_ONCE:至少一次
- EXACTLY_ONCE:精确一次
端到端状态一致性:
Source端:可重设数据读取位置。
内部算子:checkpoint。
Sink端:
- 幂等写入(一个操作多次执行结果一样,只能保证最终结果一致,中间会有短暂数据重放)
- 事务写入(原子性操作):checkpoint完成才将对应结果写入sink端,预写日志 or 两阶段提交
Flink + Kafka 端到端一致性:
Source:consumer提交偏移量
内部算子:checkpoint
Sink:producer实现了TwoPhaseCommitSinkFunction两阶段提交
如何从checkpoint、savepoint恢复task
cp比sp成本低,优先使用cp解决。
其他组件需要重启时,flink任务也需要重启,但是flink代码没有修改,可以使用cp恢复,前提是配置取消flink任务时保留外部存储的cp状态信息,配置如下:
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
从cp恢复任务命令如下:
./bin/flink run -s :checkpointPath xxx.jar [:runArgs]
#yarn模式:
./bin/flink run -s :checkpointPath -yid :yarnAppId xxx.jar [:runArgs]
需要在取消任务前获取记录cp的目录:
flink web ui上查看 or http请求(Flink JobManager 的 overview ⻚⾯+URL)获取:
http://XXXX:35524/jobs/a1c70b36d19b3a9fc2713ba98cfc4a4f/metrics?get=lastCheckpointExternalPath
因为开启了保留cp到外部存储,需要定期清理cp目录。
当修改了flink的代码,删除或者新加算子后,需要通过sp进行恢复任务,命令和cp恢复相同。
删除算子:通过--allowNonRestoredState(-n)跳过无法映射到新程序的状态。
新加算子:相当于无状态的算子。
算子重排序:提前分配了算子ID则可以正常恢复。
注意也需要定期清理废弃sp。