flink的状态与容错
状态
状态性的函数和操作通过处理单个(元素/事件)存储数据,使任何类型的state可以构建更复杂的操作。
flink可以使用checkpoints对statue进行容错管理,并且允许对流应用程序执行savepoint。
按基本类型划分:Flink有两种基本的状态:
- Keyed State:和Key有关的状态类型,它只能被基于KeyedStream(通过stream.keyBy(…)创建)之上的操作或方法所使用。
- Operator State:它是和Key无关的一种状态类型。它相当于一个并行度实例对应一份状态数据。
按组织形式划分:又可以分为一下两类:
- Managed State,这类State的内部结构完全由Flink runtime内部来控制,包括如何将它们编码写入到checkpoint中等等。
- Raw State,这类State就比较显得灵活一些,它们被保留在操作运行实例内部的数据结构中。从Flink系统角度来观察,在checkpoint时,它只知道的是这些状态数据是以连续字节的形式被写入checkpoint中。等待进行状态恢复时,又从字节数据反序列化为状态对象。
Managed State可以在所有的data stream相关方法中被使用,官方也是推荐优先使用这类State,因为它能被Flink runtime内部做自动重分布而且能被更好地进行内存管理。
在Flink内部,我们能够对State设置TTL,使其状态过期然后被系统清理掉。
Broadcast State 是 Flink 支持的另一种扩展方式。用来支持将某一个流的数据广播到所有下游任务,数据被存储在本地,接受到广播的流在操作时可以使用这些数据。
容错
为了使状态容错,Flink需要对状态进行checkpoint。checkpoint允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。
检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认checkpoint功能是disabled的,想要使用的时候需要先启用
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
//checkpoint的checkPointMode有两种,Exactly-once和At-least-once
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Flink提供了不同的状态后端,用于设置状态的存储方式和位置
rocksdb(RocksDBStateBackend)
val env = StreamExecutionEnvironment.getExecutionEnvironment()
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
env.setStateBackend(...)
//或者修改flink-conf.yaml
//提供三种方式:jobmanager(MemoryStateBackend), filesystem(FsStateBackend),
Flink内部支持定制化的State序列化器/反序列化实现。这里的序列化过程指的是将状态数据序列为字节数据写到checkpoint中,再从checkpoint文件字节数据反序列为状态对象数据。针对不同类型的State数据,可以定义各自不同的序列化/反序列的实现。
DataSet API中程序的容错性是通过重试失败的执行来实现的。
val env = ExecutionEnvironment.getExecutionEnvironment()
//重试次数
env.setNumberOfExecutionRetries(3)
//重试间隔时长
env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
//或者在flink-conf.yaml中配置
execution-retries.default: 3
execution-retries.delay: 10 s