spark||flink||scala

[Flink原理]-一文入门Flink中对状态的管理

2020-04-06  本文已影响0人  延眠万里

概述:
状态作为流计算的核心属性,Flink针对状态做了很多的处理,即你可以将中间的计算结果进行保存,并提供给后续的计算使用。

分类:

原始状态和托管状态:

Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态。

托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。

而raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

状态后端:

代码示例:

KeyedStateCustom:

package state;

import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义KeyedState
 *
 * @author lixiyan
 * @date 2020/4/5 8:34 PM
 */
public class KeyedStateCustom extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

    private ListState<Long> abnormalData;


    // 需要监控的阈值
    private Long threshold;
    // 触发报警的次数
    private Integer numberOfTimes;

    KeyedStateCustom(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        StateTtlConfig ttl = StateTtlConfig
                // 设置有效期为 1 秒
                .newBuilder(Time.seconds(1))
                // 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                /*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
                        代表即使值过期了,但如果还没有被物理删除,就是可见的*/
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        ListStateDescriptor<Long> abnormalDataDesc = new ListStateDescriptor<>("abnormalData", Long.class);
        abnormalDataDesc.enableTimeToLive(ttl);
        this.abnormalData = getRuntimeContext().getListState(abnormalDataDesc);
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
        Long inputValue = value.f1;
        // 如果超过阀值,记录不正常信息
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }

        ArrayList<Long> lists = Lists.newArrayList(abnormalData.get().iterator());
        // 如果不正常的数据出现一定次数,则输出报警信息
        if (lists.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + "超过指定阈值", lists));
            // 输出后清空状态
            abnormalData.clear();
        }
    }
}

keyedState主要继承了RichFlatMapFunction,从而能从RuntimeContext中获取和更state改状态,这里还在open中加入了ttl过期时间的设置,在实际的生产中,很容易因为状态不清理,或者状态延时堆积导致系统崩溃,所以就需要我们通过ttl配置,设置最大过期时间,当到达最大过期时间还没有处理,状态就会被清空。

KeyedState测试类:

package state;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 测试类
 *
 * @author lixiyan
 * @date 2020/4/5 9:01 PM
 */
public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
                Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 40L));

        tuple2DataStreamSource.keyBy(0).flatMap(new KeyedStateCustom( 100L,3)).uid("keyed").print();

        env.execute("keyed State");
    }
}

结果演示:

结果演示

从输出中我们看到记录了3个状态的输出,达到我们预期效果。

OperatorStateCustom

package state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义operator
 *
 * @author lixiyan
 * @date 2020/4/5 9:11 PM
 */
public class OperatorStateCustom extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {

    // 非正常数据
    private List<Tuple2<String, Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String, Long>> checkPointedState;
    // 需要监控的阈值
    private Long threshold;
    // 次数
    private Integer numberOfTimes;


    OperatorStateCustom(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }


    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) throws Exception {
        Long inputVal = value.f1;
        // 超过阀值记录
        if (inputVal > threshold){
            bufferedData.add(value);
        }
        // 超过指定次数则输出报警信息
        if (bufferedData.size()>= numberOfTimes){
            // 输出状态实例的hashcode
            out.collect(Tuple2.of(checkPointedState.hashCode()+" 超过阀值",bufferedData));
            bufferedData.clear();

        }

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在进行快照时,将数据存储到checkPointedState
        checkPointedState.clear();
        System.out.println("snapshotState");
        for (Tuple2<String, Long> element : bufferedData) {
            System.out.println(element);
            checkPointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意这里获取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore()
                .getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
        })));

        System.out.println("initializeState:"+checkPointedState.get());
        // 如果发生重启,则需要从快照中将状态进行恢复
        if (context.isRestored()){
            System.out.println("aaaaaa");
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }

    }
}

operatorState在实现的不同之处在于,还需要实现CheckpointedFunction接口,接口中我们主要实现两个方法initializeState方法,负责初始化状态,在这里我们可以获取我们的状态,snapshotState方法主要用于在状态即将写入checkpoint时,将数据存入状态中,从而写入checkpoint数据,当配置不开启checkpoint不会执行此方法。

operator state测试类:

package state;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 测试operator state
 *
 * @author lixiyan
 * @date 2020/4/5 10:35 PM
 */
public class Main2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1);
        //设置statebackend
        env.setStateBackend(new FsStateBackend("file:///Users/lionli/lixiyan/flink/flink_practice/flink/src/main/resources/"));
        CheckpointConfig config = env.getCheckpointConfig();
        // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置checkpoint的周期, 每隔100 ms进行启动一个检查点
        config.setCheckpointInterval(100);
        // 设置模式为exactly-once
        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        config.setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        config.setMaxConcurrentCheckpoints(1);
        // 执行逻辑
        DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
                Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));

        tuple2DataStreamSource.flatMap(new OperatorStateCustom(100L, 3)).printToErr();
        env.execute("Operator State");
    }
}

测试结果

可以看到输出了对应的数据

区别:

通过上面的结果我们可以发现keyed state和operator state的区别,operator state更像多个keyed state的集合,因为一个operator上可能会有多个key。

keyed state

keyed状态

Operator State

operator 状态

总结:

本篇文章主要介绍了在Flink中的状态管理,分为keyed state和operator state,并且针对这两种状态进行了实例演示,在实际的开发中可以根据自己实际情况进行合适的选择。

本文由博客群发一文多发等运营工具平台 OpenWrite 发布

上一篇下一篇

猜你喜欢

热点阅读