Flink Kafka重复消费
2021-05-15 本文已影响0人
淡淡的小番茄
项目中使用了Flink平台实现了规则引擎功能,主要包括:数据转发和场景联动。其间多job问题一直困扰着我们,目前也没有一个完美的方案。
说一千道一万,总结为一句话:管理平台的规则执行状态与Flink平台job执行状态的一致性保证。
目前采取的方案是:前后端异步提交+Flink Job固定线程池提交+失败错误日志。
这两天也把之前的遇到的Flink多job重复消费问题,重拾起来,准备定位一把。因为我们多job的分组名称设置的是一样的。理论上是不应该重复消费kafka的消息的。但是,我们使用Flink自带的FlinkKafkaConsumer来消费,当存在多job的时候,确实是重复消费了。所以你懂的,熟悉的味道。我们Flink版本使用的是1.11.2版本,先梳理下相关的类图:
查看SourceFunction的注释基本上能了解大体的调用流程:Flink平台调度job的时候,当有数据过来的时候,直接会调用run方法。
再具体的看SourceFunciton的run方法实现:
FlinkKafkaConsumerBase中会调用createFetcher方法,创建KafkaFetcher。然后一步一步看下去,找到消费的代码处KafkaConsumerThread的run方法:
每次运行的时候,会获取新的KafkaConsumer。
使用完成后会把consumer关掉。这也许就是重复消费的原因。基于此,我们只能自己加业务逻辑来限制重复消费,有两个思路:
1、消息有唯一ID,借助redis进行判断是否已经消费过。
2、使用flink status,将状态信息存储到status中,待研究确认。