Flink_四大基石-state && Checkpoint
一 state
1. 什么是 状态
状态是算子处理数据的中间结果,或者记录的历史数据
状态的使用 , 取决于 我整个算子的处理逻辑需不需要 历史数据
2.状态的分类
- keyed State 键控状态
DataStram 分组后的状态
基于KeyedStream上的状态,是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。 -
Operator State 算子状态 非键控状态
不常用,一般用在 Source 上出现 列如,从kafka中消费数据,消费的offset偏移量等
image.png
Flink中State状态划分,按照是否被管理划分:
- 1、Managered State 管理状态,状态数据被Flink程序管理
- 比如:ValueState、ListState、MapState等
- 2、Raw State 原始状态,由用户自己管理状态
- 存储数据结构:byte[] ,相对来说很麻烦
3.状态的使用
步骤
- 定义状态
private ValueState<Long> valueState = null ;- 初始状态值 从上下文对象中获取 创建状态描述符
valueState = getRuntimeContext().getState(
new ValueStateDescriptor<Long>("maxState", Long.class)
);- 获取key 历史状态
Long historyValue = valueState.value();- 按照逻辑 与历史状态, 更新状态
valueState.update(currentValue);
4.状态的生命周期
- 状态 也是一种变量 存储在内存当中,当状态过多,或者保存时间太长,或者状态一定时间不用之后,就会造成资源浪费,影响性能
- 在Flink 1.6版本更新后 状态引入了TTL(time-to-live,生存时间)机制,支持Keyed State 的自动过期,有效解决了状态数据在无干预情况下无限增长导致 OOM 的问题。
- 使用
相当于给状态State 加了一个时间戳,当状态多久没有使用时,清理key的状态值
- 创建状态描述符
- 定义状态TTL配置文件 设置属性
- 更新状态描述符,讲配置文件传入
- 传递状态描述符
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
// 2-a. 创建状态描述符
ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("maxState", Long.class);
// 2-b. 创建StateTtlConfig对象,设置属性值
StateTtlConfig stateTtlConfig = StateTtlConfig
// 设置状态有效期时间
.newBuilder(Time.days(1))
// 设置何时更新状态时间戳: 创建状态(第1次)和更新状态
.updateTtlOnCreateAndWrite()
// 当状态达到设置ttl后,对状态数据可见性
.neverReturnExpired()
.build();
// 2-c. 启用state ttl
stateDescriptor.enableTimeToLive(stateTtlConfig);
// 2-d. 传递状态描述符,实例化状态
this.maxState = getRuntimeContext().getState(stateDescriptor);
二 Checkpoint
1. 什么是CheckPoint

- checkpoint 检查点机制是==用来故障恢复的一种机制== ,将某一刻或者某一段时间 state 状态 保存到HDFS 或者其他存储系统中
- checkpoint 开启后,由Flink应用程序 自动保存
问题一
STATE状态是 变量,存储在内存当中,为产生一下问题 - 内存数据 不安全,容易丢失
解决:将state checkpoint到可靠文件系统中去 - 状态数据太大,内存放不下
解决.state存在于 TaskManager的内存当中,可以将其保存在RocksDB 嵌入式kv内存数据库中,该数据库有一个特点,内存不足的时候将其 写入磁盘当中
2 .重点 checkpoint 执行流程
Checkpoint是Flink实现容错机制最核心的功能,根据配置周期性地基于Stream中各个Operator/task的状态State来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

过程
场景: 从kafka消费数据,然后处理完成后 写入redis中
每过5s chekpoint一次, flink会将所有 算子只要有状态的,就保存快照一次, 且快照过程是异步的
当flink job设置 checkpoint 并且提交给JobManger会发生一下过程
1.JobManger会先启动 Checkpoint Coordinator 协调器
2.协调器每5s job的数据流( 在进入data source之前的数据流) 注入一个barrier 栅栏信号
3.当数据流到达source时,source 中的每个task 接收到信号,将本算子的 state状态,快照并保存到HDFS中,并将快照信息汇报给协调器
4.向下游发送barrie栅栏信号
5.当数据流到达 各个Transfdormation 算子的时候,每个算子接收到信号,也是将 本算子中的state状态 snaphot&save 快照并保存到HDFS中,并且将信息汇报给协调器
6.转换算子向下游继续发送 barrie栅栏信号
7.当data sink接收到之后,如果有状态 也会将 state 状态快照和保存到HDFS当中,并且向协调器汇报
当协调器收i到所有算子的汇报之后,认为此次checkpoint 完成
- 对于HDFS来说,HDFS上存储的状态快照都是最新的,他会将之前的删掉
- 写入快照数据是异步的,不会影响算子的执行
- 分布式快照执行的数据一致性,由
Chandy-Lamport
算法实现
栅栏对齐
下游的subtask 必须全部接收完,上游subtask发送的栅栏信号,才会进行checkpoint操作

2.StateBackend 状态后端
flink运行过程中,state保存在哪里checkpoint在哪里
1.13版本之前 state和checkpoint 功能是放在一起的
内存状态后端
state存在 TaskManager内存当中
checkpoint 保存在jobManager内存中
文件状态后端
state存在TaskManager内存当中
checkpoint 保存在 文件系统中 本地文件 hdfs
RocksDB状态后端
state存在于RocksDB中
checkpoint 保存在文件系统中
1.14版本之后
state
Java hashmap
java内存当中
存储hash table
rocksDB
内存不足存磁盘
checkpoint
JobManager内存
FileSystem文件系统


checkpoint 手动恢复
checkpoint 自动恢复
savapoint 相当于手动 checkpoint
3.检查点设置
package cn.itcast.flink.ckpt;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/**
* Flink 流式计算程序检查点Checkpoint配置
* @author xuyuan
*/
public class StreamCheckpointSettingDemo {
/**
* 自定义数据源,每隔1秒产生1条数据
*/
private static class DataSource extends RichParallelSourceFunction<String> {
private boolean isRunning = true ;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning){
// 发送数据
ctx.collect("spark flink flink");
// 每隔1秒发送1条数据
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isRunning = false ;
}
}
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
Configuration configuration = new Configuration();
configuration.setString("rest.port", "8081");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1) ;
// todo: 设置检查点Checkpoint属性,状态保存和快照保存
setEnvCheckpoint(env);
// 2. 数据源-source
DataStreamSource<String> dataStream = env.addSource(new DataSource());
// 3. 数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> outputStream = dataStream
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split("\\s+");
for (String word : words) {
out.collect(word);
}
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy(tuple -> tuple.f0).sum(1);
// 4. 数据终端-sink
outputStream.printToErr();
// 5. 触发执行-execute
env.execute("StreamCheckpointDemo");
}
/**
* Flink 流式应用,Checkpoint 检查点设置
*/
private static void setEnvCheckpoint(StreamExecutionEnvironment env) {
// 1. 启用检查点,设置时间间隔
env.enableCheckpointing(5000) ;
// 2. 状态后端,state存储
env.setStateBackend(new HashMapStateBackend());
// 3. 检查点存储,Checkpoint存储
env.getCheckpointConfig().setCheckpointStorage("file:///D:/BigDataSH34/ckpts");
// todo: 设置Checkpoint检查相关属性
// 4. 相邻两个Checkpoint间隔最小时间
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 5. 容忍Checkpoint失败最大次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 6. 当job取消,保存Checkpoint数据,默认自动删除数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 7. 允许同时进行Checkpoint数目:1个
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 8. Checkpoint超时时间,如果超过时间,就表示失败
env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000L);
// 9. Checkpoint执行模式化:精确性一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
}
}