Big Data

flink的状态与容错

2019-07-03  本文已影响0人  盗梦者_56f2

状态

状态性的函数和操作通过处理单个(元素/事件)存储数据,使任何类型的state可以构建更复杂的操作。
flink可以使用checkpoints对statue进行容错管理,并且允许对流应用程序执行savepoint。

按基本类型划分:Flink有两种基本的状态:

按组织形式划分:又可以分为一下两类:

Managed State可以在所有的data stream相关方法中被使用,官方也是推荐优先使用这类State,因为它能被Flink runtime内部做自动重分布而且能被更好地进行内存管理。

在Flink内部,我们能够对State设置TTL,使其状态过期然后被系统清理掉。

Broadcast State 是 Flink 支持的另一种扩展方式。用来支持将某一个流的数据广播到所有下游任务,数据被存储在本地,接受到广播的流在操作时可以使用这些数据。

容错

为了使状态容错,Flink需要对状态进行checkpointcheckpoint允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。
检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认checkpoint功能是disabled的,想要使用的时候需要先启用
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
//checkpoint的checkPointMode有两种,Exactly-once和At-least-once
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Flink提供了不同的状态后端,用于设置状态的存储方式和位置

rocksdb(RocksDBStateBackend)
val env = StreamExecutionEnvironment.getExecutionEnvironment()
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
env.setStateBackend(...)
//或者修改flink-conf.yaml
//提供三种方式:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), 

Flink内部支持定制化的State序列化器/反序列化实现。这里的序列化过程指的是将状态数据序列为字节数据写到checkpoint中,再从checkpoint文件字节数据反序列为状态对象数据。针对不同类型的State数据,可以定义各自不同的序列化/反序列的实现。

DataSet API中程序的容错性是通过重试失败的执行来实现的。

val env = ExecutionEnvironment.getExecutionEnvironment()
//重试次数
env.setNumberOfExecutionRetries(3)
//重试间隔时长
env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
//或者在flink-conf.yaml中配置
execution-retries.default: 3
execution-retries.delay: 10 s
上一篇 下一篇

猜你喜欢

热点阅读