Flink详解之七--状态管理
一、概念
实时处理中的很多操作都是一次对单个事件做处理,也有一些操作需要历史事件的信息,这些操作被称为有状态的。也就是说,所谓的状态就是由历史事件处理供后续操作使用的信息。
二、用途
状态是的使用场景有很多,主要有以下几种:
- 窗口计算 比如天/小时/分钟窗口计算,需要保存窗口内计算所需的每个事件的信息或累积信息。
- 机器学习 可以使用状态保存当天版本模型参数信息。
- 历史数据对比 需要记录历史上某一段时间的数据。
- 数据恢复 主要用于Flink的容错机制。
三、分类
3.1 keyed state vs operator state
根据是否处理keyed stream,可以将状态分为两类keyed state和operator state。
3.1.1 keyed state
这类状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream。每个key对应一个state, 一个算子实例会处理多个key, 可以访问多个state。算子实例并发改变时,state会随着key在实例间迁移。所有keyed state都可以设置有效期,所有状态都支持单元素有效期设置,过期的状态会被清除。
keyed state是最常用的一类状态,主要类型包括
- ValueState<T> 保存一个可以更新和检索的值。
- ListState<T> 保存一个元素的列表,可追加,可检索。
- MapState<UK, UV> 维护了一个映射列表。
- ReducingState<T> 所有状态聚合为一个单值,输入和输出数据类型一致。
- AggregatingState<IN, OUT> 所有状态聚合为一个单值,输入和输出数据类型可以不一致。
以ValueState<T>为例:
public class TestState extends RichFlatMapFunction<Integer, Integer> {
private transient ValueState<Integer> sum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> state = new ValueStateDescriptor<>("state", Integer.class);
sum = getRuntimeContext().getState(state);
}
@Override
public void flatMap(Integer input, Collector<Integer> out) throws Exception {
sum.update(sum.value()+input);
}
}
3.1.2 operater state
Operator State也叫non-keyed state,每个算子状态都绑定一个并行算子实例。Kafka连接器是在Flink中使用算子状态的一个很好的例子。Kafka consumer的每个并行实例都维护主题分区和偏移量的映射作为其算子状态。
当并发发生变化时,Operator State接口支持在并行算子实例之间重新分配状态。进行这种再分配有多种方式可选。
3.2 managed state vs raw state
根据状态是否有Flink运行时托管,将状态分为managed state和raw state。
Managed State由Flink运行时控制的数据结构表示,如内部哈希表或RocksDB。例如ValueState、ListState等,Flink的运行时对状态进行编码并将它们写入checkpoints。
Raw State是算子保存在自定义数据结构中的状态。checkpoint时,只向检查点写入一个字节序列。Flink不关注数据结构,只看原始字节。
所有数据流函数都可以使用managed state, 但是raw state接口只能在实现operators后才能使用。开发中除非必要,否则推荐使用managed state。因为对于managed state, 当并行度改变时,Flink会自动重新分配状态,也可以更好的进行内存管理。
四、存储方式
flink内部状态的存储,取决于state backend的选择,目前有3种内置的state backend可供使用:MemoryStateBackend, FsStateBackend, RocksDBStateBackend。我们可以在配置文件 flink-conf.yaml 中配置所有Flink作业的默认StateBackend,也可以在作业中设置StateBackend,这样会覆盖默认值。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
4.1 MemoryStateBackend
当用户没有设置时,默认使用MemoryStateBackend。运行时将状态保存在TaskManager堆内存中,CheckPoint时,状态快照会发往JobManager, 并存储在JobManager的堆内存中。为防止数据流阻塞,这个状态快照尽量配置成异步快照,默认也是异步的,除非构建时手动将异步参数置成false。
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
MemoryStateBackend的一些限制:
- 独立状态默认最大为5M,上限(MAX_MEM_STATE_SIZE)可调。
- MAX_MEM_STATE_SIZE最大不能超过akka frame大小
- 所有状态聚合后,要放在JobManager的内存里,所以最大不能超过JobManager的内存。
由于存储空间的限制,以及当作业重启时状态会丢失,所以,MemoryStateBackend更适合本地开发与调试,或者状态较小并且作业重启时对状态丢失不敏感的场景,不太适合普遍意义上的生产场景中使用。
4.2 FsStateBackend
FsStateBackend将运行时状态保存在TaskManager内存中,CheckPoint时,会将状态快照保存在指定的文件系统目录中,只会将少量元数据保存在JobManager,而高可用模式下,会将元数据保存在CheckPoint元数据文件中。状态快照默认也是异步的,除非构建时手动将异步参数置成false。
new FsStateBackend(path, false); //path为文件系统路径
状态大小只受限于TaskManager的内存,适用于窗口比较大,状态比较大的场景,支持高可用,适合在生产环境中使用。
4.3 RocksDBStateBackend
RocksDBStateBackend将运行时状态保存在RocksDB数据库中,RocksDB默认将文件存储在TaskManager文件目录中,checkpoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中,支持增量checkpoint。只支持异步快照。
RocksDBStateBackend的一些限制:
- RocksDB支持Key和Value最大分别为2G
- 运行时状态存入RocksDB,读写都需要序列化和反序列化,比写入内存效率低,状态较大时,也影响吞吐量。
由于保存的状态只受磁盘大小限制,可以支持比FsStateBackend更大的状态场景。