Flink的状态管理和容错(上)

2021-08-29  本文已影响0人  xiao_xian

1.有状态计算概念

Storm Flink
计算层跟存储层分离,需要依赖Redis、HBase等外部数据库存储状态 Compute+Application State,状态由Flink内部进行管理,保存在内存、RockDB,定期checkpoint到文件系统
网络耗时长,无法做到exactly one,重启无法保存状态,算子blocking在网络等待中,吞吐率低 内存消耗大

有状态流处理架构 需考虑的问题点?

  1. 数据一致性
  2. 系统的宕机时中间数据的丢失
  3. 版本升级,代码逻辑发生变化,状态需要装换
  4. 内存大小有限,无法存储过多数据

2.状态类型及应用

状态类型
Keyed State Operator State
使用算子类型 只用于KeyedStream中的Operator 所有的Operator
状态分配 每个 Key 对应一个状态,单个Operator 中可能涵盖多个 Keys 单个 Operator 对应一个 State
创建和访问方式 重写RichFunctionn,通过RuntimeContext对象获取 实现checkpointedFunction 或ListCheckpointed接口
横向扩展 状态随着 Key 自动在多个算子 Task上迁移 有多种状态重新分配的方式:
- 均匀分配
- 将所有状态合并,再分发给每个实例上
支持数据类型 ValueState
ListState
ReducingState
AggregatingState
MapState
ListState
BroadcastState
状态类关系图

3.Keyed State

ValueState的简单应用

  1. 继承RichMapFunction(或对应RichFlatMapFunction,...)
  2. open方法创建状态描述符,通过RutimeContext创建对应的状态
  3. 通过value()获取状态值,update(xxx)更新状态值

例子:

// extends RichMapFunction
class MapWithCounterFunction extends RichMapFunction<Tuple2<String, String>, Long> {

    private ValueState<Long> totalLengthByKey; // 状态

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ValueStateDescriptor<Long> stateDescriptor = // 定义状态描述符,用于创建状态
                new ValueStateDescriptor<Long>("sum of length", LongSerializer.INSTANCE);
        totalLengthByKey = getRuntimeContext().getState(stateDescriptor); // 通过RuntimeContext创建,注册状态
    }

    @Override
    public Long map(Tuple2<String, String> value) throws Exception {
        Long length = totalLengthByKey.value(); // 获取状态值
        if (length == null) {
            length = 0L;
        }
        long newTotalLength = length + value.f1.length();
        totalLengthByKey.update(newTotalLength); // 设置状态值
        return newTotalLength;
    }
}

4. Operator State

定义:

  1. 单Operator具有一个状态,不区分Key
  2. State需要支持重新分布
  3. 常用于Source和Sink节点,像KafkaConsumer中,维护Offset,Topic等信息

类型:

  1. ListState:所有的状态后端元素平均分配给新的tasks
  2. UnionListState:所有的状态后端元素都会分配给新的tasks
  3. BroadcastState:所有的tasks都保存有所有广播的状态后端

两种定义方式:、

  1. 实现 CheckpointedFunction 接口定义
  2. 实现 ListCheckpointed 接口定义 (Deprecated)

例子:

// 1.实现CheckpointedFunction
static class CustomMapFunction<T> implements MapFunction<T, T>, CheckpointedFunction {

    private ReducingState<Long> countPerKey;

    private ListState<Long> countPerPartition; // ListState类型,所有算子公用一个

    private long localCount;

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception { // 初始化,异常自动恢复时调用

        countPerKey = context.getKeyedStateStore().getReducingState(
                new ReducingStateDescriptor<>("perKeyCount", new AddFunction(), Long.class));

        countPerPartition = context.getOperatorStateStore().getUnionListState(
                new ListStateDescriptor<>("perPartitionCount", Long.class)); // 2.定义状态描述符  3.通过context注册状态后端
        for (Long l : countPerPartition.get()) {
            localCount += l;
        };
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        countPerPartition.clear();
        countPerPartition.add(localCount);
    }

    @Override
    public T map(T value) throws Exception {
        countPerKey.add(1L); // 4.使用状态后端
        localCount++;
        return value;
    }
}

static class AddFunction implements ReduceFunction<Long> {
    @Override
    public Long reduce(Long aLong, Long t1) throws Exception {
        return aLong + t1;
    }
}

5. Broadcast State

  1. Broadcast State 使得 Flink 用户能够以容错、一致、可扩缩容地将来自广播的低吞吐的事件流数据存储下来,被广播到某个 operator 的所有并发实例中,然后与另一条流数据连接进行计算

Broadcast State与其他operator state区别

  1. Broadcast State是Map格式类型,通过MapStateDescriptor获取
  2. 需要有一条广播的输入流
  3. operator可以有多个不同名称的广播状态

应用场景

  1. 动态规则:例如,当一个报警规则时触发报警信息等,将规则广播到算子的所有并发实例中

例子:

public class CartDetectPatternEvaluatorExample {

    public static void main(String[] args) throws Exception {

        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

        // 行为事件输入流(非广播流)
        DataStream<Action> actions = env
                .addSource(
                        new FlinkKafkaConsumer010<>(
                                parameterTool.getRequired("action-topic"),
                                new ActionEventSchema(),
                                parameterTool.getProperties()));

        // 规则输入流(非广播流)
        DataStream<Pattern> patterns = env
                .addSource(
                        new FlinkKafkaConsumer010<>(
                                parameterTool.getRequired("pattern-topic"),
                                new PatternEventSchema(),
                                parameterTool.getProperties()));

        // 使用userId进行keyBy
        KeyedStream<Action, Long> actionsByUser = actions
                .keyBy((KeySelector<Action, Long>) action -> action.userId);

        // 创建Map类型的状态描述符
        MapStateDescriptor<Void, Pattern> bcStateDescriptor =
                new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));

        // 非广播流调用broadcast方法装换为广播流
        BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);

        DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
                .connect(bcedPatterns) // 非广播流调用connect与广播流进行关联
                .process(new PatternEvaluator());  // process一个KeyedBroadcastProcessFunction

        matches.print();

        env.execute("CartDetectPatternEvaluatorExample");

    }

    // KeyedBroadcastProcessFunction 中的类型参数表示:
    //   1. key stream 中的 key 类型
    //   2. 非广播流中的元素类型
    //   3. 广播流中的元素类型
    //   4. 输出结果的类型
    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {

        // handle for keyed state (per user)
        ValueState<String> prevActionState;
        // broadcast state descriptor
        MapStateDescriptor<Void, Pattern> patternDesc; // 通过该描述符获取状态

        @Override
        public void open(Configuration conf) {
            // initialize keyed state
            prevActionState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("lastAction", Types.STRING));
            // 跟上面的定义一样
            patternDesc =
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
        }

        /**
         * Called for each user action.
         * Evaluates the current pattern against the previous and
         * current action of the user.
         */
        @Override
        public void processElement(
                Action action,
                ReadOnlyContext ctx, // 获取的状态只读不能修改,为了保证所有task的状态一致
                Collector<Tuple2<Long, Pattern>> out) throws Exception {
            // get current pattern from broadcast state
            Pattern pattern = ctx
                    .getBroadcastState(this.patternDesc)
                    // access MapState with null as VOID default value
                    .get(null);
            // get previous action of current user from keyed state
            String prevAction = prevActionState.value();
            if (pattern != null && prevAction != null) {
                // user had an action before, check if pattern matches
                if (pattern.firstAction.equals(prevAction) &&
                        pattern.secondAction.equals(action.action)) {
                    // MATCH
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }
            // update keyed state and remember action for next pattern evaluation
            prevActionState.update(action.action);
        }

        /**
         * Called for each new pattern.
         * Overwrites the current pattern with the new pattern.
         */
        @Override
        public void processBroadcastElement(
                Pattern pattern,
                Context ctx, // 获取的状态有读写权限,state改变后会被同步给其他的算子
                Collector<Tuple2<Long, Pattern>> out) throws Exception {
            // store the new pattern by updating the broadcast state
            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
            // storing in MapState with null as VOID default value
            bcState.put(null, pattern);
        }
    }
}

注意事项

  1. processBroadcastElement获取的state只有读权限;
  2. Broadcast state暂不支持RocksDB,只支持内存
上一篇下一篇

猜你喜欢

热点阅读