2.Flink中状态的保存和检查点机制(CheckPoint)
1.前言
前一篇文章聊了聊Flink中JDBCSink的使用方法,今天就说一说Flink中最具有特色的“状态”的保存和检查点机制。虽然我嘴上说着“状态”是Flink的一大特色,但是实际上我也是听别人提起过,或许是书籍上看到的、亦或者是文章中读到了,有些记不起来了。
但从我目前对Flink使用过程中所领悟到的浅薄心得来说,我认为状态的使用还是非常有特点的,合理的使用能够让代码逻辑更“轻薄”。但这篇文章我就先不介绍状态都是怎么使用的了,因为与对状态的使用相比,它的保存思路是更加值得学习的。接下来我就对Flink中状态的保存和检查点机制进行比较详细的描述。尤其是检查点机制,它是能够实现精准一次性的重要技术支持。如果有人看到了我写的文章中的错误,非常希望大家能及时指出,我会非常感谢。
2.Flink中状态的保存
对Flink有简单使用经验的人都会知道,在Flink中有一个机制叫做“状态”,利用这个状态进行代码编写的时候,也可以说是在进行状态编程。
这个状态在我看来实际上就是用来辅助主逻辑程序进行计算来获得想要的计算结果的一个“物件”,说物件未免有些太过通俗,还是说它是一个“数据内容”吧。不同的状态类型也就代表着这个“数据内容”中的东西不一样。
既然计算结果需要使用到这个“数据内容”,也就是要使用状态。那么这个状态就需要有一个保存的地方,能够让流动中的每一条数据都能够使用到这个状态完成计算,就像是高速路上的收费站一样。那在对这个状态进行保存的时候,就需要使用到一个相对应的机制,也就是状态后端。
这个状态后端实际上就是用来保存每一个使用到状态编程的程序在运行过程中所产生的状态的一个机制。试想一下,一个状态诞生之后,就一定是在内存中或者是磁盘中出现了的,不然数据流中的数据也不会访问到它。那它到底是在内存中还是在磁盘上就需要“状态后端”机制来指定了。
在Flink中,状态后端机制为开发者们提供了两种不同的状态保存点,分别是基于内存的“Hash表状态后端”和基于本地磁盘维护的“RocksDB”状态后端。
2.1 状态后端的介绍
整个介绍的步骤主要是以:状态后端介绍->状态后端使用来进行的。切记:状态后端指的是程序运行时状态的保存点!!!
2.1.1 RocksDB状态后端
先来讲讲工作中比较常用到的RocksDB数据库状态后端,这种状态后端会将程序运行时的状态保存在Flink内置的RocksDB数据库中。它是很强的一个状态后端,虽然它要把状态保存在磁盘上需要经历序列化和反序列化,但它不会占用正在运行的TaskManager的JVM堆内存,对于长时间的状态来说,该状态后端对计算系统会宽容一些。
如果想要在代码中开启这个状态后端,需要在Pom文件中引入对应的依赖,具体内容如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>1.13.0</version>
</dependency>
引入依赖之后,才能在代码中开启
setStateBackend(new EmbeddedRocksDBStateBackend());
2.1.2 HashMapStateBackend
这种状态后端突出的一个特点就是快,因为它会将所有的状态保存在TaskManager的JVM堆内存上,也正是由于处在堆内存中,所以也不需要序列化和反序列化了。
至于它为什么叫做HashMap,是因为在这个状态后端的内部,是将程序中所有的状态当成了对象,在连带着窗口收集到的所有数据、窗口的触发器一起,都以键值对的形式存储了起来,构成了一张哈希表,所以也就叫这个名字了。
在调用方面,也不需要引入什么依赖,直接就设置开启就能够使用了:
env.setStateBackend(new HashMapStateBackend());
那到这里为止也就说完了状态后端是干嘛的,是怎么使用的了,那下一章节就聊聊另一个维护状态的机制——“CheckPoint”。
3.检查点机制
在第二章节中讲述了Flink状态后端相关的内容,如果你仔细阅读了之后应该能够对我理解的状态后端机制有一个小小的认识。接下来就要对Flink中与状态相关的另外一个机制“checkPoint”进行简单描述。
前面说了,状态后端是能够为程序运行时的状态提供“寄存点”的一个机制,但是如果程序突然出现了问题,那这些“寄存点”中的数据就会随着程序的停止而消失,即使故障修复之后再重新运行程序,也会因为之前状态的消失导致计算失败。这对于流式计算是很致命的。
所以为了能够让程序运行时的所有状态都能够在故障恢复之后重新拥有故障前的所有“寄存点”中的状态,就引入了一个检查点机制,通过这个检查点机制就能够让程序恢复到故障前的形态,仿若无事发生一样继续正确的处理数据。
虽然我说起来轻描淡写,但是实际上这个机制还是很牛的,用它就能够轻松的实现数据源到Flink程序之间的精准一次性,前提是数据源需要具备数据重放的能力。所谓的数据重放就是,它能够让Source源算子读取到每条数据的偏移量做状态保存,然后还能够在状态恢复之后,重新用这个记录的偏移量来读取已经被Source处理过的数据。
接下来就通过对检查点机制是如何使用->流程->原理,这三个阶段来展开描述。
3.1 检查点的介绍
3.1.1 如何使用
检查点机制使用起来还比较方便,大致的思路就是先开启,再配置。具体的调用方法如下。
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启检查点,30秒保存一次检查点.语义就是精准一次性
env.enableCheckpointing(30 * 1000L,CheckpointingMode.EXACTLY_ONCE);
//设置检查点的超时时间,如果超过这个时间检查点还没保存完成,那么这次检查点保存动作就不要了
env.getCheckpointConfig().setCheckpointTimeout(60000);
//两次检查点保存之间的间隔,不能超过的最小时间。
//举个例子,如果5秒进行一次检查点,但是上一次用了4.5秒才保存完成,而这个最小值设置的1秒,那就要等到这一秒到了才能开启下一次保存
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//检查点中状态的管理机制,有两种。一种是程序坏了就删除,另外一种是程序坏了仍然保存。工作中肯定是需要手动维护的
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
//指定检查点故障重启的配置
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1L),Time.minutes(1L)));
//设置检查点外部保存的地址
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
3.1.2使用流程
使用流程就和3.1.1 写的一样,只是具体的参数需要跟你们的业务场景进行自己的判断了。
3.1.3 原理
看到这里,检查点的作用、状态后端的作用其实应该明白了,它们二者存在的必要就是共同维护程序的健康运行,即使出现错误也能够快速的恢复。但是你可以想一下,不同的状态后端在与检查点之间进行交互处理保存状态的时候,是怎么个过程的呢?
不同的状态后端对状态的持久化方式也是不一样的,这里也正是Rocks状态后端比HashMap状态后端出色的地方,因为Rocks状态后端在进行检查点保存的时候是异步快照,就算检查点保存的过程中出现了问题,也对其状态的使用没影响。其次就是Rocks在进行检查点快照时,是增量保存的。而哈希表状态后端是全量保存(每一次都是全量,也就代表着状态积累的越多,保存的内容就越多,造成的压力也会越多)。但是计算Rocks状太后端看起来花里胡哨的,它还是没有哈希表状态后端来的快。
3.2 检查点保存流程
在进行这一部分内容的描述之前,必须要明确一件事情。那就是保存点在对所有状态进行保存的时候,是等到一条数据被程序中所有任务处理完成之后,在对所有的状态进行保存操作。这句话神重要!!!
如果一共有5条数据要交由map->key by -> sum这个执行流程处理。当第三条数据已经完成了所有的处理操作之后,要进行检查点保存,所保存的这个快照就是第三条数据被处理完成之后的状态。那如果此时第四条数据也在程序中的话,那岂不是会有数据丢失的风险?
实际不然,在检查点触发的时候,Source算子保存的有关于数据的偏移量信息也止步于第三条数据的偏移量,当程序恢复计算之后,就会需要让数据源实行数据回放的操作,从保存的偏移量的位置读取数据,这也就保证的数据的精准一次性消费。
3.3 保存点中状态的恢复
上述内容看起来很温馨的保证了因故障出现而发生的不良情况,但是实际上还有一个问题。Flink状态编程有一个问题,那就是它是按照数据流是否通过keyed计算来区别状态类型的。经历过keyby计算的状态叫键控状态,没有经历过keyby处理的数据流中的状态是算子状态,两种不同的状态也就表明了状态的恢复方式也是不一样的。
key by处理之后,流中的数据会按照键值选择器中所指定的key进行分区,所有键值一样的数据都会进入到一个分区里,但是值得注意的是,这个分区并不是物理意义上的分区,仅仅只是逻辑上的分区,但对于检查点的恢复也足够了。因为keyed流中有key作为标记,所以即使是发生故障进行恢复之后,检查点中保存的状态也能够按照各自的key进行恢复,数据流中的数据仍然会按照检查点中保存的对应key的状态真正意义上的恢复到故障前的数据处理形态上,是真正意义上的恢复,犹如神龙许愿一般。
但非键控状态就不太行了,它没有key作为“灯塔”,即使是进行了状态恢复,也没有办法恢复到原来的样子,所以就需要咱们这群倒霉的开发者来进行接口的重写,来帮助它达到我们想要的状态。(ps:这部分内容我在工组中没有用到,所以就当是提供了一个小小的思路吧)。
在进行流程图的绘制之前,首先要明确几个概念,在CheckpointFunction接口中,有两个方法分别是在程序第一次启动时或者重启时被调用的initializeState方法,和做状态快照的snapshotState方法。这两个方法都具有调用当前程序状态的能力,只不过快照方法获取状态时没有办法拿到状态的句柄信息,而initializeState是可以的。
接下来就模拟一个场景,在数据写出时实现一个自定义的检查点方法,对状态进行保存。
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
public class MyCheckpointAndSinkFunction implements CheckpointedFunction,SinkFunction<String> {
//给SinkFunction定义一个缓冲的大小
private final int bufferSize;
//给SinkFunction定义一个缓冲区,这个缓冲区的大小就是BufferSize的大小
private List<String> bufferElements;
//用构造方法给这个缓冲地带赋值
public MyCheckpointAndSinkFunction(int bufferSize) {
this.bufferSize = bufferSize;
bufferElements = new ArrayList<>();
}
//创建一个暂时的列表状态,我在想是不是因为触发检查点的时候不需要保存这个状态,所以要定义成临时的呢?
private transient ListState<String> listState;
//快照方法
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
listState.clear();
for (String bufferElement : bufferElements) {
listState.add(bufferElement);
}
}
//初始化、重启时调用的方法
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> buffered = new ListStateDescriptor<>("buffered", String.class);
listState = context.getOperatorStateStore().getListState(buffered);
//如果是状态恢复,那么就将恢复得到的状态保存到缓冲区中
if (context.isRestored()){
while (listState.get().iterator().hasNext()){
bufferElements.add(listState.get().iterator().next());
}
}
}
//数据写出时调用的方法
@Override
public void invoke(String value, Context context) throws Exception {
SinkFunction.super.invoke(value, context);
bufferElements.add(value);
if (bufferElements.size() == bufferSize){
//数据个数已经达到缓冲区的上限了,这里可以做数据的写出动作
}
//数据写完了,自然要进行一次清理动作
bufferElements.clear();
}
}
4.结语
状态编程对于Flink来说是很重要的,所以维护这些状态更是一件值得认真对待的一件事情,在运行时Flink通过状态后端来完成状态的保存、使用,并且可以通过开启检查点的方式,将程序中所产生、使用到的状态周期性的保存到外部文件系统上(dfs),从某种意义上来说,它是全方位的为状态提供了保障。下一节,我们再一起聊聊Flink是如何通过检查点机制来保证数据的精准一次性语义的。