
Flink State机制源码解析

在流计算中,事件会持续不断的产生,如果每次计算都是相互独立的,不依赖于上下游事件,则称为无状态 (stateless) 计算;如果每次的计算依赖于之前或者后续的事件,则称为有状态 (stateful) 计算。Flink 中的状态用 State 抽象,用来保存中间计算结果或者缓存数据,State 是 Flink 内部实现 Exactly-Once的基础。


和 redis 类似,Flink 按照数据类型的不同,定义了多种 State 接口,具体如下:

  1. ValueState<T>

    单值状态,与数据的 key 绑定;提供了 update(T value) 方法更新值,value() 方法获取状态值。

  2. ListState<T>

    Key 上的状态值为一个 List;提供了 add ,get 方法来分别增加和获取数据。

  3. MapState<UK, UV>

    Key 上的状态值为一个 Map;提供了 put,putAll, get 方法来增加和获取数据。

  4. ReducingState<T>

    实现的 ReduceFunction 中使用的 state, 在 reduce 方法之前,会先调用 ReducingState 的 add 方法,reduce 方法中的第一个参数就是状态值。

  5. AggregatingState<IN, OUT>

    聚合 state; 在 AggregateFunction 的 add 方法调用之前,会先调用 AggregatingState 的 add 方法,传入 acc。

按照算子是否有 key, Flink State 又被划分为 KeyedState 和 OperatorState。

类型 state
KeyedState ValueState<br/ >ListState<br />ReducingState<br />AggregationState<br />MapState<br />
OperatorState ListState

今天我们重点讲 KeyedState 里的最简单的 ValueState 的实现。KeyedState,顾名思义,要与 key 绑定,即只能用在 KeyedStream 流之后;每一个 key 对应一个 State 值;Key 除了有分区的作用,在状态管理当中,它还用于计算 keyGroupIndex:

// AbstractStreamOperator.java line:463
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
    if (selector != null) {
        Object key = selector.getKey(record.getValue());

// StreamOperatorStateHandler.java line:281
public void setCurrentKey(Object key) {
    if (keyedStateBackend != null) {
        try {
            // need to work around type restrictions
            AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;

        } catch (Exception e) {
            throw new RuntimeException("Exception occurred while setting the current key context.", e);

// 接着看 AbstractKeyedStateBackend.java line:172
public void setCurrentKey(K newKey) {
    this.keyContext.setCurrentKey(newKey);      this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey,numberOfKeyGroups));

从上面可以看出, 先设置 key, 再设置 keyGroupIndex,具体 keyGroupIndex 的作用是什么,我们后面会讲。


State 是暴露给用户的接口,那么就需要指定状态的一些属性,如 name, type, ttl 等。Flink 中用 StateDescriptor 来描述一个状态,在对应的 StateBackend (状态后端) 中,调用 create 方法得到对应的 State 对象。下面以一个简单的 demo 来演示。

private transient ValueState<Tuple2<Integer, Integer>> state;
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor =
                    new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));
    state = getRuntimeContext().getState(descriptor);

我们顺着方法调用链走下去,跳过中间一些无关的代码,直接看 TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法:

public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateBackend<K> stateBackend,
        TtlTimeProvider timeProvider) throws Exception {
        // 是否为状态设置了 ttl,如果设置 ttl,使用TtlStateFactory创建state,如果没有,直接调用 stateBackend创建state
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
                namespaceSerializer, stateDesc, stateBackend, timeProvider)
                .createState() :
            stateBackend.createInternalState(namespaceSerializer, stateDesc);

上面两个创建 state 的不同分支区别就是是否设置了 ttl,其它的基本一样。接着看 stateBackend.createInternalState 方法的实现

public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
        @Nonnull TypeSerializer<N> namespaceSerializer,
        @Nonnull StateDescriptor<S, SV> stateDesc,
        @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        // 这里是状态实现原理的重点, 状态的获取/更新都是通过 statetable 操作的,我们重点看下里面的实现
        StateTable<K, N, SV> stateTable = tryRegisterStateTable(
            namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
        // 根据状态描述的类型,调用对应的构造方法。
        return stateFactory.createState(stateDesc, stateTable, getKeySerializer());

StateTable 是状态实现原理的重点, 状态的获取/更新都是通过 statetable 操作的,我们重点看下里面的实现。

private <N, V> StateTable<K, N, V> tryRegisterStateTable(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<?, V> stateDesc,
        @Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {

        StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());

        TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();

        if (stateTable != null) {
            // ...
        } else {
            RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
            // 得到 statetable对象。启用 statetable的构造方法。初始化size为128(默认最大并行度)的 StateMap数组
            stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
            registeredKVStates.put(stateDesc.getName(), stateTable);

        return stateTable;

StateTable 里提供的 get, put 操作实现了对状态值的 获取和更新,StateTable是一个抽象类,如下:


NestedMapsStateTable 使用两层嵌套的 HashMap 保存状态数据,支持同步快照;CopyOnWriteStateTable 使用 CopyOnWriteStateMap 来保存状态数据,支持异步快照。下面是 CopyOnWriteStateTable里的 put 和 get 方法的实现

private S get(K key, int keyGroupIndex, N namespace) {
        checkKeyNamespacePreconditions(key, namespace);
        // stateMap数组默认size为128,keyGroupIndex为key的hash与128取模得到
 // StateMap内部封装了MapEntry,类似于 HashMap,基于链表加数组的实现
        StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
        if (stateMap == null) {
            return null;
        // 从map中得到state value
        return stateMap.get(key, namespace);

public void put(K key, int keyGroup, N namespace, S state) {
        checkKeyNamespacePreconditions(key, namespace);
        // stateMap数组默认size为128,keyGroupIndex为key的hash与128取模得到
        StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
        stateMap.put(key, namespace, state);



  1. StateDescribe 持有状态的描述, StateBackend 通过它来创建 State 对象
  2. State 对象里封装了 StateTable,StateTable 负责对State 做 snapshot 到对应的 StateBackend。
  3. StateTable 里封装了 StateMap,为存储 state 的内存介质,负责状态的更新/新增/获取 (基于内存)。
  4. StateMap 在 StateTable 中为一个数组,默认 size 为最大并行度 128,所以会存在不同 key 对应一个StateMap对象。里面的实现类似于 HashMap,为链表加数组的数据结构。
