flink程序员

Flink详解之七--状态管理

2021-01-31  本文已影响0人  王吉吉real

一、概念

实时处理中的很多操作都是一次对单个事件做处理,也有一些操作需要历史事件的信息,这些操作被称为有状态的。也就是说,所谓的状态就是由历史事件处理供后续操作使用的信息。

二、用途

状态是的使用场景有很多,主要有以下几种:

三、分类

3.1 keyed state vs operator state

根据是否处理keyed stream,可以将状态分为两类keyed state和operator state。

3.1.1 keyed state

这类状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream。每个key对应一个state, 一个算子实例会处理多个key, 可以访问多个state。算子实例并发改变时,state会随着key在实例间迁移。所有keyed state都可以设置有效期,所有状态都支持单元素有效期设置,过期的状态会被清除。

keyed state是最常用的一类状态,主要类型包括

以ValueState<T>为例:

public class TestState extends RichFlatMapFunction<Integer, Integer> {

    private transient ValueState<Integer> sum;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Integer> state = new ValueStateDescriptor<>("state", Integer.class);
        sum = getRuntimeContext().getState(state);
    }

    @Override
    public void flatMap(Integer input, Collector<Integer> out) throws Exception {
        sum.update(sum.value()+input);
    }
}

3.1.2 operater state

Operator State也叫non-keyed state,每个算子状态都绑定一个并行算子实例。Kafka连接器是在Flink中使用算子状态的一个很好的例子。Kafka consumer的每个并行实例都维护主题分区和偏移量的映射作为其算子状态。

当并发发生变化时,Operator State接口支持在并行算子实例之间重新分配状态。进行这种再分配有多种方式可选。

3.2 managed state vs raw state

根据状态是否有Flink运行时托管,将状态分为managed state和raw state。

Managed State由Flink运行时控制的数据结构表示,如内部哈希表或RocksDB。例如ValueState、ListState等,Flink的运行时对状态进行编码并将它们写入checkpoints。

Raw State是算子保存在自定义数据结构中的状态。checkpoint时,只向检查点写入一个字节序列。Flink不关注数据结构,只看原始字节。

所有数据流函数都可以使用managed state, 但是raw state接口只能在实现operators后才能使用。开发中除非必要,否则推荐使用managed state。因为对于managed state, 当并行度改变时,Flink会自动重新分配状态,也可以更好的进行内存管理。

四、存储方式

flink内部状态的存储,取决于state backend的选择,目前有3种内置的state backend可供使用:MemoryStateBackend, FsStateBackend, RocksDBStateBackend。我们可以在配置文件 flink-conf.yaml 中配置所有Flink作业的默认StateBackend,也可以在作业中设置StateBackend,这样会覆盖默认值。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
4.1 MemoryStateBackend

当用户没有设置时,默认使用MemoryStateBackend。运行时将状态保存在TaskManager堆内存中,CheckPoint时,状态快照会发往JobManager, 并存储在JobManager的堆内存中。为防止数据流阻塞,这个状态快照尽量配置成异步快照,默认也是异步的,除非构建时手动将异步参数置成false。

new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

MemoryStateBackend的一些限制:

由于存储空间的限制,以及当作业重启时状态会丢失,所以,MemoryStateBackend更适合本地开发与调试,或者状态较小并且作业重启时对状态丢失不敏感的场景,不太适合普遍意义上的生产场景中使用。

4.2 FsStateBackend

FsStateBackend将运行时状态保存在TaskManager内存中,CheckPoint时,会将状态快照保存在指定的文件系统目录中,只会将少量元数据保存在JobManager,而高可用模式下,会将元数据保存在CheckPoint元数据文件中。状态快照默认也是异步的,除非构建时手动将异步参数置成false。

new FsStateBackend(path, false); //path为文件系统路径

状态大小只受限于TaskManager的内存,适用于窗口比较大,状态比较大的场景,支持高可用,适合在生产环境中使用。

4.3 RocksDBStateBackend

RocksDBStateBackend将运行时状态保存在RocksDB数据库中,RocksDB默认将文件存储在TaskManager文件目录中,checkpoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中,支持增量checkpoint。只支持异步快照。

RocksDBStateBackend的一些限制:

由于保存的状态只受磁盘大小限制,可以支持比FsStateBackend更大的状态场景。

上一篇下一篇

猜你喜欢

热点阅读