flinkFlink实践

关于checkpoint在flink生产的应用

2019-05-28  本文已影响0人  神奇的考拉

一.简述

Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的checkpoint机制。但是在实际应用中由于对checkpoint的使用不当会带来不恰当的影响:比如两次checkpoint的间隔太短,导致应用一直处于checkpoint的状态下,甚至会导致整个应用变得不可用。接下来会讨论下checkpoint相关内容以及优化参数参考

二.checkpoint是否合理参考参数

对checkpoint进行优化,我们需要参考对应的metrics:

2.1 Checkpoint间隔时间

在实际应用情况下,面对超大数据集规模,每次checkpoint的时间都超过我们设定的或系统的时间,结果会如何?
那就是应用会一直处于checkpoint,甚至导致整个应用都变得不可用了。面对该情况我们提供的方案比如:
1.设置并行checkpoint数 ???
2.增量checkpoint:每次只checkpoint出对前一次checkpoint内的状态数据的增量改动。然后恢复的时候做状态改动的重放???
这里我们来说下第三种方案:强制设置两次checkpoint的空闲间隔


checkpoint的间隔

通过flink提供的config参数来控制,通过该方法我们就可以控制前后checkpoint的间隔不会导致应用一直处于checkpoint。

getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

该参数并未没有彻底解决大规模状态集下checkpoint慢的问题,只是降低慢带来的风险和影响,接下来看看如果解决大规模数据集下的“慢”问题本质方案

2.2 外部state的存储

一般来说checkpoint之所以慢 还是因为数据规模大,那如果我们能找到一种更快的存储状态的介质(或者策略),来使得这个过程变快。比如可以选择更加高效的外部存储介质来做State的存储(比如RocksDB),而不仅限于存储于有限的内存空间里,甚至完全落地到磁盘上。

2.2.1 资源设置

由于checkpoint是在每个task上先做数据checkpoint,然后在外部存储中做checkpoint持久化。在总状态数据相对固定的情况下,若是减少每个task平均所checkpoint的数据,那么相应地checkpoint的总时间也会变短。所以为每个task设置更多的并行度来加速checkpoint的执行过程。
例如2000W的数据设定100个parallelism,平均=2000W/100;若是将parallelism增大变成200,则平均=2000W/200,相对每份需要处理的数据比较小些,处理的时长就会变少

2.2.2 task恢复

由于checkpoint是分散在每个task上执行,再做汇总持久化。这些task做的checkpoint数据在后面应用恢复时包括并行度扩增或减少时能够重新打散分布。
那么每个task会为了支持快速恢复,会同时写checkpoint数据到本地磁盘和远程分布式存储,只要task本地的checkpoint数据没有被破坏,系统在应用恢复时会优先加载本地的checkpoint数据,这样就大大减少了远程拉取状态数据的过程。


checkpoint task数据存储
2.2.3 常见的配置参数
// checkpoint周期
env.enableCheckpointing(1000);
// checkpoint mode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
​
// checkpoint执行有效期:要么1min完成 要么1min放弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
​
// 确保checkpoint时间空闲间隔500ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
​
// 允许同一时间只存在一个checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
​
// job cancellation启用保留的外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
​
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
上一篇 下一篇

猜你喜欢

热点阅读