Flink Kafka重复消费

2021-05-15  本文已影响0人  淡淡的小番茄

项目中使用了Flink平台实现了规则引擎功能,主要包括:数据转发和场景联动。其间多job问题一直困扰着我们,目前也没有一个完美的方案。

说一千道一万,总结为一句话:管理平台的规则执行状态与Flink平台job执行状态的一致性保证。

目前采取的方案是:前后端异步提交+Flink Job固定线程池提交+失败错误日志。

这两天也把之前的遇到的Flink多job重复消费问题,重拾起来,准备定位一把。因为我们多job的分组名称设置的是一样的。理论上是不应该重复消费kafka的消息的。但是,我们使用Flink自带的FlinkKafkaConsumer来消费,当存在多job的时候,确实是重复消费了。所以你懂的,熟悉的味道。我们Flink版本使用的是1.11.2版本,先梳理下相关的类图:

查看SourceFunction的注释基本上能了解大体的调用流程:Flink平台调度job的时候,当有数据过来的时候,直接会调用run方法。

再具体的看SourceFunciton的run方法实现:

FlinkKafkaConsumerBase中会调用createFetcher方法,创建KafkaFetcher。然后一步一步看下去,找到消费的代码处KafkaConsumerThread的run方法:

每次运行的时候,会获取新的KafkaConsumer。

使用完成后会把consumer关掉。这也许就是重复消费的原因。基于此,我们只能自己加业务逻辑来限制重复消费,有两个思路:

1、消息有唯一ID,借助redis进行判断是否已经消费过。

2、使用flink status,将状态信息存储到status中,待研究确认。

上一篇下一篇

猜你喜欢

热点阅读