FlinkFlink

Flink Checkpoint变迁与实践

2018-12-19  本文已影响39人  远o_O

一、历史变迁

提供了RocksDB的支持,这个版本之前所有的状态都只能存在进程的内存里面,这个内存总有存不下的一天,如果存不下则会发生OOM。如果想要存更多数据、更大量State就要用到RocksDB。RocksDB是一款基于文件的嵌入式数据库,它会把数据存到磁盘,但是同时它又提供高效读写能力。所以使用RocksDB不会发生OOM这种事情。

在Flink1.1.0里面,提供了纯异步化的RocksDB的snapshot。以前版本在做RocksDB的snapshot时它会同步阻塞主数据流的处理,很影响吞吐量,即每当checkpoint时主数据流就会卡住。纯异步化处理之后不会卡住数据流,于是吞吐量也得到了提升。

引入了增量的checkpoint这个比较重要的功能。只有基于增量的checkpoint才能更好地支持含有超大State的Job。如果每一次都把全量上TB的State都刷到远程的HDFS上那么这个效率是很低下的。而增量checkpoint只是把checkpoint间隔新增的那些状态发到远程做存储,每一次checkpoint发的数据就少了很多,效率得到提高。在这个版本里面还引入了一个细粒度的recovery,细粒度的recovery在做恢复的时候,有时不需要对整个Job做恢复,可能只需要恢复这个Job中的某一个子图,这样便能够提高恢复效率。

引入了Task local 的State的recovery。因为基于checkpoint机制,会把State持久化地存储到某一个远程存储,比如HDFS,当发生Failover的时候需要重新把这个数据从远程HDFS再download下来,如果这个状态特别大那么该download操作的过程就会很漫长,导致Failover恢复所花的时间会很长。Task local state recovery提供的机制是当Job发生Failover之后,能够保证该Job状态在本地不会丢失,进行恢复时只需在本地直接恢复,不需从远程HDFS重新把状态download下来,于是就提升了Failover recovery的效率。

二、Asynchronous State Snapshots

我们注意到上面描述的机制意味着当 operator 向后端存储快照时,会停止处理输入的数据。这种同步操作会在每次快照创建时引入延迟。

我们完全可以在存储快照时,让 operator 继续处理数据,让快照存储在后台异步运行。为了做到这一点,operator 必须能够生成一个后续修改不影响之前状态的状态对象。例如 RocksDB 中使用的写时复制( copy-on-write )类型的数据结构

接收到输入的 barrier 时,operator异步快照复制出的状态(注:checkpoint的同步部分,复制状态可能会花费较多的时间,这也是为什么checkpoint同步部分时间很长的原因)。然后立即发射 barrier 到输出流,继续正常的流处理。一旦后台异步快照完成,它就会向 checkpoint coordinator(JobManager)确认 checkpoint 完成。现在 checkpoint 完成的充分条件是:所有 sink 接收到了 barrier,所有有状态 operator 都确认完成了状态备份(可能会比 sink 接收到 barrier 晚)。

RocksDBStateBackend 模式对于较大的 Key 进行更新操作时序列化和反序列化耗时很多。可以考虑使用 FsStateBackend 模式替代。

三、理解Checkpoint

image.png image.png

heavy alignments

image.png image.png

如何选用合适的状态后端

image.png
上一篇下一篇

猜你喜欢

热点阅读