高性能kafka消费

2024-08-24  本文已影响0人  田真的架构人生

在进行spring-kafka消费的过程中,大部分人可能都遇到过kafka消息堆积的情况,尤其是大数据处理的场景,这时候就要想办法提高消费能力。

提高消费者的数量可以吗?我们知道,kafka的消费者个数是与kafka的分区数相关的,一个分区最多只能被一个消费者消费,也就是说,你在客户端开启的消费者个数即使超过了分区数,也不能提高消费能力,反而还占资源!

那既然一个分区只能被一个消费者消费,那我增加分区数不就可以多开消费者了?这在理论上确实是可行的,但是在运维层面会涉及到分区、数据的迁移,而且这期间kafka是不可用的状态,成本太过高昂。

本方案从消费端的代码层面着手,看如何设计实现一个高性能的kafka消费组件。

一、异步

默认情况下,spring-kafka采用的是同步消费模式,这种情况下消费能力被分区数限制,难以提升。所以我们首先想到的异步消费:

异步消费,将消息的拉取与处理交由不同的线程处理,在硬件允许的前提下,可自由提高消费能力。

异步消费虽然提高了消费能力,但是对于offset的提交却带来了挑战:因为默认情况下,offset是自动提交的,自动提交很可能导致异步未处理完的消息丢失;

二、手动提交

方案:

  1. consumer将offset加入本地列表进行维护;
  2. 异步worker回调更新offset状态;
  3. consumer遍历offset,寻找可提交的最大offset;
  4. 提交最大offset到kafka,并清空本地已提交offset;

三、有序offset列表

从上图可知,为了进行手动提交,其中最关键的部分是手动维护的offset列表,为实现offset的提交,要达到的目标:

101~109处理完之后,只有3次offset提交动作,109提交之后,109及之前的offset都会从offset列表中清除。

但现实情况是:

103、105、106虽然都处理完了,但是都还不能提交,因为这时候101和102都还未被处理。

极端情况下会出现以下情形:

随着时间的推移,低位的offset始终未被处理,这会导致后面所有已处理的offset都得不到提交,同时由于高位offset不断写入,低位的offset得不到清除,offset列表将急剧膨胀。



问题:在异步执行的情况下怎样实现offset列表更新的有序性?

四、传统线程池

传统线程池调度策略:

  1. 当线程数< corePoolSize时,直接创建新线程执行任务;
  2. 当corePoolSize<线程数<maxPoolSize时,如果队列未满,任务入队等待;
  3. 当corePoolSize<线程数<maxPoolSize时,如果队列已满,创建新线程执行任务;
  4. 当线程数=maxPoolSize,且队列已满,则执行拒绝策略;

从传统线程池的调度策略来看,它是惰性的,这对我们当前场景来说有哪些问题?

五、饥渴线程池

目标:

方案:

  1. 改变传统线程池调度策略;
  2. 使用公平队列保证任务等待的公平性;
  3. 完全摒除拒绝策略;
  4. put instead offer;
对传统线程池关键部分进行改写

新的调度策略:

  1. 当线程数< corePoolSize时,直接创建新线程执行任务;
  2. 当corePoolSize<线程数<maxPoolSize时,强制创建新线程执行任务;
  3. 当线程数=maxPoolSize,入队列等待;

六、微批处理

目标:

方案:

  1. 基于micro-batch微批思想,consumer线程对批量消息进行切分、整合,将微批数据交给不同的worker线程处理;
  2. offset列表仅记录micro-batch中最大的offset;
  3. 仅提交最大offset到kafka;
从kafka批量拉取200条消息,在内部进行分片、切分、合并,以50一组形成微批,提交给4个worker处理,同时,本地offset列表只记录每个微批中最大的offset:50、100、150、200,这样在提高末端业务执行效率、offset提交效率的同时,也极大地节省了内存开销。



微批处理要求业务必须具有原子性,micro-batch这一批要么全部成功,要么全部失败!

七、last offset处理

问题:

方案:
基于sideCar模式,提供额外的监视器,动态监测本节点的offset列表,如若发现offset列表仍有未提交的offset,则会主动发送探针消息,驱动consumer进行poll及commit;

八、重写与重排

问题:

方案:

  1. 加入offset列表时,检查consumer实例,如果发生变化,则对consumer进行重写;
  2. 滤重检查及offset列表重排序;

九、分区重分配

问题:

问题思考(怎么判断offset列表为脏数据?):

方案:
每次添加offset时记录offset列表的更新时间戳,Offset Monitor定时检测,当经历了一个rebalance周期后,如果时间戳仍未更新,判断offset列表为脏数据,予以清除。

方案思考

关键代码



这个案例给我们的启发:从空间维度转变为时间维度去判断,这种思考维度的转变以及思考尺度的放大,降低了解决问题的复杂性,提供了更多的可能性。

十、极致性能

问题:

传统队列关键代码

Disruptor:
Disruptor是一个高性能的队列,通过以下设计解决传统队列的锁争用及伪共享问题:

cas与cache padding



关键代码

十一、总体架构

十二、效果测试

服务器:3 * 6核8G
消费逻辑:将14万CDC消息计算更新后写入ES
消费配置:kafka 6分区、批量拉取500、32线程、2048队列
效果对比:

上一篇下一篇

猜你喜欢

热点阅读