flinkflink

Flink--Checkpoint机制原理

2021-01-07  本文已影响0人  tracy_668

[TOC]

如何理解flink中state(状态)

state泛指

  1. 当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。
  2. 在每分钟/小时/天聚合事件时,状态保存待处理的聚合。
  3. 当在数据点流上训练机器学习模型时,状态保持模型参数的当前版本。
  4. 当需要管理历史数据时,状态允许有效访问过去发生的事件。

案例理解state

无状态计算指的是数据进入Flink后经过算子时只需要对当前数据进行处理就能得到想要的结果;有状态计算就是需要和历史的一些状态或进行相关操作,才能计算出正确的结果;
无状态计算的例子:

幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用;

有状态计算的例子:
以wordcount中计算pv/uv为例:
输出的结果跟之前的状态有关系,不符合幂等性,访问多次,pv会增加;

为什么需要state管理
流式作业的特点是7*24小时运行,数据不重复消费,不丢失,保证只计算一次,数据实时产出不延迟,但是当状态很大,内存容量限制,或者实例运行奔溃,或需要扩展并发度等情况下,如何保证状态正确的管理,在任务重新执行的时候能正确执行,状态管理就显得尤为重要。

理想中的state管理
理想的状态管理是:

flink中checkpoint执行流程
image.png
  1. CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。
    2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理

3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。

  1. 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
  2. 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 ;

checkpoint中保存的是什么信息
带着问题找答案

以flink消费kafka数据wordcount为例:

我们从Kafka读取到一条条的日志,从日志中解析出app_id,然后将统计的结果放到内存中一个Map集合,app_id做为key,对应的pv做为value,每次只需要将相应app_id 的pv值+1后put到Map中即可;

kafka topic:test;
flink运算流程如下:


image.png

kafka topic有且只有一个分区

例:(0,1000)
表示0号partition目前消费到offset为1000的数据

flink的pv task记录了当前计算的各app的pv值,为了方便讲解,我这里有两个app:app1、app2

例:(app1,50000)(app2,10000)

每来一条数据,只需要确定相应app_id,将相应的value值+1后put到map中即可;

该案例中,CheckPoint到底记录了什么信息呢?

记录的其实就是第n次CheckPoint消费的offset信息和各app的pv值信息,记录一下发生CheckPoint当前的状态信息,并将该状态信息保存到相应的状态后端。(注:状态后端是保存状态的地方,决定状态如何保存,如何保障状态高可用,我们只需要知道,我们能从状态后端拿到offset信息和pv信息即可。状态后端必须是高可用的,否则我们的状态后端经常出现故障,会导致无法通过checkpoint来恢复我们的应用程序)。

eg:

该状态信息表示第100次CheckPoint的时候, partition 0 offset消费到了1000,pv统计结果为(app1,50000)(app2,10000)

任务挂了,如何恢复?

假如我们设置了三分钟进行一次CheckPoint,保存了上述所说的 chk-100 的CheckPoint状态后,过了十秒钟,offset已经消费到 (0,1100),pv统计结果变成了(app1,50080)(app2,10020),但是突然任务挂了,怎么办?

莫慌,其实很简单,flink只需要从最近一次成功的CheckPoint保存的offset(0,1000)处接着消费即可,当然pv值也要按照状态里的pv值(app1,50000)(app2,10000)进行累加,不能从(app1,50080)(app2,10020)处进行累加,因为 partition 0 offset消费到 1000时,pv统计结果为(app1,50000)(app2,10000)

当然如果你想从offset (0,1100)pv(app1,50080)(app2,10020)这个状态恢复,也是做不到的,因为那个时刻程序突然挂了,这个状态根本没有保存下来。我们能做的最高效方式就是从最近一次成功的CheckPoint处恢复,也就是我一直所说的chk-100;

以上讲解,基本就是CheckPoint承担的工作,描述的场景比较简单.

疑问,计算pv的task在一直运行,它怎么知道什么时候去做这个快照?或者说计算pv的task怎么保障它自己计算的pv值(app1,50000)(app2,10000)就是offset(0,1000)那一刻的统计结果呢?

image.png

对应到pv案例中就是,Source Task接收到JobManager的编号为chk-100的CheckPoint触发请求后,发现自己恰好接收到kafka offset(0,1000)处的数据,所以会往offset(0,1000)数据之后offset(0,1001)数据之前安插一个barrier,然后自己开始做快照,也就是将offset(0,1000)保存到状态后端chk-100中。然后barrier接着往下游发送,当统计pv的task接收到barrier后,也会暂停处理数据,将自己内存中保存的pv信息(app1,50000)(app2,10000)保存到状态后端chk-100中。OK,flink大概就是通过这个原理来保存快照的;

如果做快照的同时,也在处理数据,那么处理的数据可能会修改快照内容,所以先暂停处理数据,把内存中快照保存好后,再处理数据

结合案例来讲就是,统计pv的task想对(app1,50000)(app2,10000)做快照,但是如果数据还在处理,可能快照还没保存下来,状态已经变成了(app1,50001)(app2,10001),快照就不准确了,就不能保障Exactly Once了;

多并行度、多Operator情况下,CheckPoint过程

分布式状态容错面临的问题与挑战

所有的Operator运行过程中遇到barrier后,都对自身的状态进行一次快照,保存到相应状态后端

对应到pv案例:有的Operator计算的app1的pv,有的Operator计算的app2的pv,当他们碰到barrier时,都需要将目前统计的pv信息快照到状态后端。

多并行图简易快照

image.png

多Operator状态恢复

image.png

具体怎么做这个快照呢?
JobManager向Source Task发送CheckPointTrigger,Source Task会在数据流中安插CheckPoint barrier;

image.png

Source Task自身做快照,并保存到状态后端;

image.png

Source Task将barrier跟数据流一块往下游发送;

image.png

当下游的Operator实例接收到CheckPoint barrier后,对自身做快照

image.png image.png

上述图中,有4个带状态的Operator实例,相应的状态后端就可以想象成填4个格子。整个CheckPoint 的过程可以当做Operator实例填自己格子的过程,Operator实例将自身的状态写到状态后端中相应的格子,当所有的格子填满可以简单的认为一次完整的CheckPoint做完了

上面只是快照的过程,整个CheckPoint执行过程如下

1、JobManager端的 CheckPointCoordinator向 所有SourceTask发送CheckPointTrigger,Source Task会在数据流中安插CheckPoint barrier

2、当task收到所有的barrier后,向自己的下游继续传递barrier,然后自身执行快照,并将自己的状态异步写入到持久化存储中。增量CheckPoint只是把最新的一部分更新写入到 外部存储;为了下游尽快做CheckPoint,所以会先发送barrier到下游,自身再同步进行快照

3、当task完成备份后,会将备份数据的地址(state handle)通知给JobManager的CheckPointCoordinator;
如果CheckPoint的持续时长超过 了CheckPoint设定的超时时间,CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator就会认为本次CheckPoint失败,会把这次CheckPoint产生的所有 状态数据全部删除。
4、 最后 CheckPoint Coordinator 会把整个 StateHandle 封装成 completed CheckPoint Meta,写入到hdfs。

barrier对齐
什么是barrier对齐?

image.png

什么是barrier不对齐?

  1. 上述图2中,当还有其他输入流的barrier还没有到达时,会把已到达的barrier之后的数据1、2、3搁置在缓冲区,等待其他流的barrier到达后才能处理
  2. barrier不对齐就是指当还有其他流的barrier还没到达时,为了不影响性能,也不用理会,直接处理barrier之后的数据。等到所有流的barrier的都到达后,就可以对该Operator做CheckPoint了;

为什么要进行barrier对齐?不对齐到底行不行?

Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;
后面的部分主要证明这句话;

CheckPoint的目的就是为了保存快照,如果不对齐,那么在chk-100快照之前,已经处理了一些chk-100 对应的offset之后的数据,当程序从chk-100恢复任务时,chk-100对应的offset之后的数据还会被处理一次,所以就出现了重复消费。

结合pv案例来看,之前的案例为了简单,描述的kafka的topic只有1个partition,这里为了讲述barrier对齐,所以topic有2个partittion;

image.png

结合业务,先介绍一下上述所有算子在业务中的功能

TaskA中的map将读取到的一条kafka日志转换为我们需要统计的app_id
keyBy 按照app_id进行keyBy,相同的app_id 会分到下游TaskB的同一个实例中
TaskB的map在状态中查出该app_id 对应的pv值,然后+1,存储到状态中
利用Sink将统计的pv值写入到外部存储介质中;

我们从kafka的两个partition消费数据,TaskA和TaskB都有两个并行度,所以总共flink有4个Operator实例,这里我们称之为 TaskA0、TaskA1、TaskB0、TaskB1;

假设已经成功做了99次CheckPoint,这里详细解释第100次CheckPoint过程;

现在我们假设TaskB0做CheckPoint的时候barrier对齐了,TaskB1做CheckPoint的时候barrier不对齐,当然不能这么配置,我就是举这么个例子,带大家分析一下barrier对不对齐到底对统计结果有什么影响?

上面说了chk-100的这次CheckPoint,offset位置为(0,10000)(1,10005),TaskB0使用barrier对齐,也就是说TaskB0不会处理barrier之后的数据,所以TaskB0在chk-100快照的时候,状态后端保存的app0的pv数据是从程序开始启动到kafka offset位置为(0,10000)(1,10005)的所有数据计算出来的pv值,一条不多(没处理barrier之后,所以不会重复),一条不少(barrier之前的所有数据都处理了,所以不会丢失),假如保存的状态信息为(app0,8000)表示消费到(0,10000)(1,10005)offset的时候,app0的pv值为8000

分析到这里,我们先梳理一下我们的状态保存了什么:

chk-100

offset:(0,10000)(1,10005)
pv:(app0,8000) (app1,12050)

接着程序在继续运行,过了10秒,由于某个服务器挂了,导致我们的四个Operator实例有一个Operator挂了,所以Flink会从最近一次的状态恢复,也就是我们刚刚详细讲的chk-100处恢复,那具体是怎么恢复的呢?

Flink 同样会起四个Operator实例,我还称他们是 TaskA0、TaskA1、TaskB0、TaskB1。四个Operator会从状态后端读取保存的状态信息。

从offset:(0,10000)(1,10005) 开始消费,并且基于 pv:(app0,8000) (app1,12050)值进行累加统计

然后你就应该会发现这个app1的pv值12050实际上已经包含了partition1的offset 10005~10200的数据,所以partition1从offset 10005恢复任务时,partition1的offset 10005~10200的数据被消费了两次

TaskB1设置的barrier不对齐,所以CheckPoint chk-100对应的状态中多消费了barrier之后的一些数据(TaskA1发送),重启后是从chk-100保存的offset恢复,这就是所说的At Least Once

由于上面说TaskB0设置的barrier对齐,所以app0不会出现重复消费,因为app0没有消费offset:(0,10000)(1,10005) 之后的数据,也就是所谓的Exactly Once;

看到这里你应该已经知道了哪种情况会出现重复消费了,也应该要掌握为什么barrier对齐就是Exactly Once,为什么barrier不对齐就是 At Least Once

分析了这么多,这里我再补充一个问题,到底什么时候会出现barrier对齐?

对齐,汉语词汇,释义为使两个以上事物配合或接触得整齐。由汉语解释可得对齐肯定需要两个以上事物,所以,必须有多个流才叫对齐。barrier对齐其实也就是上游多个流配合使得数据对齐的过程;
言外之意:如果Operator实例只有一个输入流,就根本不存在barrier对齐,自己跟自己默认永远都是对齐的;

答:如果只有一个partition,对应flink任务的Source Task并行度只能是1,确实没有区别,不会有至少一次的存在了,肯定是精确一次。因为只有barrier不对齐才会有可能重复处理,这里并行度都已经为1,默认就是对齐的,只有当上游有多个并行度的时候,多个并行度发到下游的barrier才需要对齐,单并行度不会出现barrier不对齐,所以必然精确一次。其实还是要理解barrier对齐就是Exactly Once不会重复消费,barrier不对齐就是 At Least Once可能重复消费,这里只有单个并行度根本不会存在barrier不对齐,所以不会存在至少一次语义;

为了下游尽快做CheckPoint,所以会先发送barrier到下游,自身再同步进行快照;这一步,如果向下发送barrier后,自己同步快照慢怎么办?下游已经同步好了,自己还没?

答: 可能会出现下游比上游快照还早的情况,但是这不影响快照结果,只是下游快照的更及时了,我只要保障下游把barrier之前的数据都处理了,并且不处理barrier之后的数据,然后做快照,那么下游也同样支持精确一次。这个问题你不要从全局思考,你单独思考上游和下游的实例,你会发现上下游的状态都是准确的,既没有丢,也没有重复计算。这里需要注意一点,如果有一个Operator 的CheckPoint失败了或者因为CheckPoint超时也会导致失败,那么JobManager会认为整个CheckPoint失败。失败的CheckPoint是不能用来恢复任务的,必须所有的算子的CheckPoint都成功,那么这次CheckPoint才能认为是成功的,才能用来恢复任务;

我程序中Flink的CheckPoint语义设置了 Exactly Once,但是我的mysql中看到数据重复了?程序中设置了1分钟1次CheckPoint,但是5秒向mysql写一次数据,并commit;

答:Flink要求end to end的精确一次都必须实现TwoPhaseCommitSinkFunction。如果你的chk-100成功了,过了30秒,由于5秒commit一次,所以实际上已经写入了6批数据进入mysql,但是突然程序挂了,从chk100处恢复,这样的话,之前提交的6批数据就会重复写入,所以出现了重复消费。

上一篇 下一篇

猜你喜欢

热点阅读