flink

Flink_四大基石-state && Checkpoint

2022-08-22  本文已影响0人  Eqo

一 state

1. 什么是 状态

状态是算子处理数据的中间结果,或者记录的历史数据
状态的使用 , 取决于 我整个算子的处理逻辑需不需要 历史数据

2.状态的分类

Flink中State状态划分,按照是否被管理划分:

3.状态的使用

步骤

  • 定义状态
    private ValueState<Long> valueState = null ;
  • 初始状态值 从上下文对象中获取 创建状态描述符
    valueState = getRuntimeContext().getState(
    new ValueStateDescriptor<Long>("maxState", Long.class)
    );
  • 获取key 历史状态
    Long historyValue = valueState.value();
  • 按照逻辑 与历史状态, 更新状态
    valueState.update(currentValue);

4.状态的生命周期

- 使用

相当于给状态State 加了一个时间戳,当状态多久没有使用时,清理key的状态值

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

                   // 2-a. 创建状态描述符
                    ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("maxState", Long.class);
                    // 2-b. 创建StateTtlConfig对象,设置属性值
                    StateTtlConfig stateTtlConfig = StateTtlConfig
                        // 设置状态有效期时间
                        .newBuilder(Time.days(1))
                        // 设置何时更新状态时间戳: 创建状态(第1次)和更新状态
                        .updateTtlOnCreateAndWrite()
                        // 当状态达到设置ttl后,对状态数据可见性
                        .neverReturnExpired()
                        .build();
                    // 2-c. 启用state ttl
                    stateDescriptor.enableTimeToLive(stateTtlConfig);
                    // 2-d. 传递状态描述符,实例化状态
                    this.maxState = getRuntimeContext().getState(stateDescriptor);

二 Checkpoint

1. 什么是CheckPoint
3、Flink State状态引入.png

2 .重点 checkpoint 执行流程

Checkpoint是Flink实现容错机制最核心的功能,根据配置周期性地基于Stream中各个Operator/task的状态State来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。


image.png

过程
场景: 从kafka消费数据,然后处理完成后 写入redis中
每过5s chekpoint一次, flink会将所有 算子只要有状态的,就保存快照一次, 且快照过程是异步的

当flink job设置 checkpoint 并且提交给JobManger会发生一下过程
1.JobManger会先启动 Checkpoint Coordinator 协调器
2.协调器每5s job的数据流( 在进入data source之前的数据流) 注入一个barrier 栅栏信号
3.当数据流到达source时,source 中的每个task 接收到信号,将本算子的 state状态,快照并保存到HDFS中,并将快照信息汇报给协调器
4.向下游发送barrie栅栏信号
5.当数据流到达 各个Transfdormation 算子的时候,每个算子接收到信号,也是将 本算子中的state状态 snaphot&save 快照并保存到HDFS中,并且将信息汇报给协调器
6.转换算子向下游继续发送 barrie栅栏信号
7.当data sink接收到之后,如果有状态 也会将 state 状态快照和保存到HDFS当中,并且向协调器汇报
当协调器收i到所有算子的汇报之后,认为此次checkpoint 完成

栅栏对齐

下游的subtask 必须全部接收完,上游subtask发送的栅栏信号,才会进行checkpoint操作


image.png
2.StateBackend 状态后端

flink运行过程中,state保存在哪里checkpoint在哪里

1.13版本之前 state和checkpoint 功能是放在一起的
    内存状态后端 
        state存在 TaskManager内存当中
        checkpoint 保存在jobManager内存中
    文件状态后端
        state存在TaskManager内存当中
        checkpoint 保存在 文件系统中 本地文件 hdfs
    RocksDB状态后端
        state存在于RocksDB中
        checkpoint 保存在文件系统中
1.14版本之后
    state
        Java hashmap
            java内存当中
            存储hash table
        rocksDB
            内存不足存磁盘
    checkpoint
        JobManager内存
        FileSystem文件系统
image.png image.png

checkpoint 手动恢复
checkpoint 自动恢复
savapoint 相当于手动 checkpoint

3.检查点设置
package cn.itcast.flink.ckpt;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * Flink 流式计算程序检查点Checkpoint配置
 * @author xuyuan
 */
public class StreamCheckpointSettingDemo {

    /**
     * 自定义数据源,每隔1秒产生1条数据
     */
    private static class DataSource extends RichParallelSourceFunction<String> {
        private boolean isRunning = true ;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning){
                // 发送数据
                ctx.collect("spark flink flink");

                // 每隔1秒发送1条数据
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isRunning = false ;
        }
    }

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        Configuration configuration = new Configuration();
        configuration.setString("rest.port", "8081");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(1) ;
        // todo: 设置检查点Checkpoint属性,状态保存和快照保存
        setEnvCheckpoint(env);

        // 2. 数据源-source
        DataStreamSource<String> dataStream = env.addSource(new DataSource());

        // 3. 数据转换-transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> outputStream = dataStream
            .flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] words = value.split("\\s+");
                    for (String word : words) {
                        out.collect(word);
                    }
                }
            })
            .map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            })
            .keyBy(tuple -> tuple.f0).sum(1);

        // 4. 数据终端-sink
        outputStream.printToErr();

        // 5. 触发执行-execute
        env.execute("StreamCheckpointDemo");
    }

    /**
     * Flink 流式应用,Checkpoint 检查点设置
     */
    private static void setEnvCheckpoint(StreamExecutionEnvironment env) {
        // 1. 启用检查点,设置时间间隔
        env.enableCheckpointing(5000) ;
        // 2. 状态后端,state存储
        env.setStateBackend(new HashMapStateBackend());
        // 3. 检查点存储,Checkpoint存储
        env.getCheckpointConfig().setCheckpointStorage("file:///D:/BigDataSH34/ckpts");
        // todo: 设置Checkpoint检查相关属性
        // 4. 相邻两个Checkpoint间隔最小时间
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 5. 容忍Checkpoint失败最大次数
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
        // 6. 当job取消,保存Checkpoint数据,默认自动删除数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        // 7. 允许同时进行Checkpoint数目:1个
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 8. Checkpoint超时时间,如果超过时间,就表示失败
        env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
        // 9. Checkpoint执行模式化:精确性一次语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    }

}  
上一篇 下一篇

猜你喜欢

热点阅读