flink状态和容错
flink是有状态的计算,可以存储一些中间过程和结果在内部存储里。
状态有三种存储方案MemoryStateBackend、FsStateBackend、 RocksDBStateBackend。
其中MemoryStateBackend等价于新版本的HashMapStateBackend 和 JobManagerCheckpointStorage。
FsStateBackend等价于新版本的HashMapStateBackend和FileSystemCheckpointStorage。
RocksDBStateBackend等价于新版本的EmbeddedRocksDBStateBackend和FileSystemCheckpointStorage。
MemoryStateBackend下,state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中。
FsStateBackend下state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中。
RocksDBStateBackend存储方案下它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中,fail over的时候从filesystem中恢复到本地。
通过checkpoint机制。checkpoint是应用状态的一个一致性副本,包括了输入的读取位点。
flink通过检查点加载应用状态来恢复,并从恢复的读取位点继续处理。检查点存储在内部,定期做checkpoint的持久化,存在分布式文件系统中,如果发生故障就从最近checkpoint节点恢复。
checkpoint基本配置
// 指定Checkpoint间隔时间
• StreamExecutionEnvironment enableCheckpointing(long interval)
// 指定Checkpoint间隔时间以及CheckpointingMode(EXACTLY_ONCE ,AT_LEAST_ONCE)
• StreamExecutionEnvironment enableCheckpointing(long interval,
CheckpointingMode mode)
// 指定是否强制在迭代作业中执行Checkpoint(@Deprecated )
• StreamExecutionEnvironment enableCheckpointing(long interval,
CheckpointingMode mode, boolean force)
// 默认Checkpoint Interval 为500 ms (@Deprecated )
• StreamExecutionEnvironment enableCheckpointing()
Savapoint用户可以手动出发并从Savapoint节点恢复。
Savapoint客户端命令
触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]
使用 YARN 触发 Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
使用 Savepoint 取消作业
$ bin/flink cancel -s [:targetDirectory] :jobId
从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]
跳过无法映射的状态恢复
$ bin/flink run -s :savepointPath -n [:runArgs]
删除 Savepoint
$ bin/flink savepoint -d :savepointPath