Flink 2.1State

2021-06-16  本文已影响0人  caster

无状态算子:map/flatmap/filter等简单算子
有状态算子:window,reduce等操作,需要依赖之前的计算结果:newState = oldState + anEvent
State与特定算子关联在一起,算子需要注册状态
State分类:
1.Operator State:状态作用于当前算子任务,并行子任务之间state独立。
非keyed的state snapshots很少用到,如需使用,需要在算子实现类上实现ListCheckpointed(旧版本)或者CheckpointedFunction(新版本)
2.Keyed State:即分区状态,根据数据中的key来维护访问,相同的key会到同一个tasks子任务分区中,每个key一个状态。
(ValueState/ListState/MapState/ReducingState AggState)
Keyed State 只能在 RichFuction 中使用,RichFuction 与普通、传统的 Function 相比,最大的不同就是它有自己的生命周期。

ValueStateDescriptor<Boolean> flagDescriptor=new ValueStateDescriptor<Boolean>(
        "flag",
        Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
flagState.value()
flagState.update(true);

State使用示例:

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

SingleOutputStreamOperator<Tuple2<String, Long>> ds = sEnv.fromCollection(Arrays.asList(
        new Tuple2("a", 1623588192345L),
        new Tuple2("b", 1623588193345L),
        new Tuple2("a", 1623588194345L),
        new Tuple2("b", 1623588195345L),
        new Tuple2("a", 1623588196345L),
        new Tuple2("b", 1623588197345L),
        new Tuple2("a", 1623588198345L),
        new Tuple2("b", 1623588199345L)
));
ds.keyBy(value -> value.f0).map(new RichMapFunction<Tuple2<String, Long>, Long>() {
    //1. 声明状态
    private transient ValueState<Long> count;
    //2.初始化状态
    @Override
    public void open(Configuration parameters) throws Exception {
        count = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
    }

    @Override
    public Long map(Tuple2<String, Long> value) throws Exception {
        Long currentCount = count.value()==null?0L: count.value();
        currentCount++;
        //3.赋值
        count.update(currentCount);
        return currentCount;
    }

}).print();

sEnv.execute("1");

状态后端(State Back):
每个任务都会本地维护状态,内存中。StateBack负责本地的状态存储访问和维护,以及快照检查点(checkpoint)状态写入远程存储。

上一篇 下一篇

猜你喜欢

热点阅读