flink 学习笔记 — 状态 State

2019-12-18  本文已影响0人  飞不高的老鸟

回顾

    在之前的学习中我们了解到,flink 作为低延时的流式数据处理框架,本身是有状态的。状态 state 是为了保存一些操作符 operator 的中间结果,同时,通过状态可以保证精确一致语义。

State 分类

    State 从其实现方式可分为:Keyed State 和 Operator State,从管理方式可分为:Raw State 和 Managed State。

Keyed State
DataStream<Tuple2<String, Integer>> counts =
            text.flatMap(new Tokenizer())
            .keyBy(0)   // 使用 keyby 方法进行划分,不同的 task 之间不会出现相同的 key
            .sum(1);
keyby.png
Operator State
Managed State

    Managed State 是由 Flink Runtime 中管理的 State ,并将状态数据转换为 hashtable 或者 RocksDB 的对象进行存储。

mysu_bj.png
public interface ValueState<T> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
     * this will return {@code null} when to value was previously set using {@link #update(Object)}.
     *
     * @return The state value corresponding to the current input.
     *
     * @throws IOException Thrown if the system cannot access the state.
     */
    T value() throws IOException;

    /**
     * Updates the operator state accessible by {@link #value()} to the given
     * value. The next time {@link #value()} is called (for the same state
     * partition) the returned state will represent the updated value. When a
     * partitioned state is updated with null, the state for the current key
     * will be removed and the default value is returned on the next access.
     *
     * @param value The new value for the state.
     *
     * @throws IOException Thrown if the system cannot access the state.
     */
    void update(T value) throws IOException;

}
public interface ListState<T> extends MergingState<T, Iterable<T>> {

    /**
     * Updates the operator state accessible by {@link #get()} by updating existing values to
     * to the given list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value will be null.
     *
     * @param values The new values for the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void update(List<T> values) throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given values
     * to existing list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value remains unchanged.
     *
     * @param values The new values to be added to the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void addAll(List<T> values) throws Exception;
}
Raw State

    Raw State 是由算子本身进行管理的 State ,此时状态都是以字节数组的形式保存到 Checkpoint 中,Flink 并不清楚状态数据的内部结构,每次状态的写入和读取都需要算子进行序列化和反序列化。

状态管理

    Flink 中状态管理有三种方案:MemoryStateBackend、FSStateBackend、RocksDBStateBackend。

MemoryStateBackend
 env.setStateBackend(new MemoryStateBackend());
FSStateBackend
env.setStateBackend(new FsStateBackend(""));

// 源码
public FsStateBackend(Path checkpointDataUri) {
        this(checkpointDataUri.toUri());
    }

RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend(""));

总结

    状态是 Flink 容错机制的基石,了解 State 的机制,可以更好的管理 Checkpoint,更好的进行失败任务的恢复。

上一篇 下一篇

猜你喜欢

热点阅读