关于消费者

2020-01-11  本文已影响0人  kar_joe

消费模型

消费者内部线程模型

KafkaConsumer 采用双线程的设计,即用户主线程和心跳线程。所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性
引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中。KafkaConsumer 中有个方法是例外的,它就是 wakeup(),你可以在其他线程中安全地调用 KafkaConsumer.wakeup() 来唤醒 Consumer

消费者业务线程模型

为了加速消费,提高并行性,消费者端引入多线程。

多消费实例

消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。


image.png

单消费实例,多业务线程

消费者程序使用单消费实例,但创建多个消费线程并行消费。实现难度较大,容易造成乱序。


image.png

多消费示例,每个实例又多消费线程

该方案是上述两种方案的整合


image.png

重平衡reblance

Coordinator 是消费位移消息所提交的分区的leader所在broker,负责消费者组的组成员管理和各个消费者的位移提交管理。
某个组的所有消费者保持向该组对应的Coordinator 发送心跳,heartbeat.interval.ms既设置了心跳间隔,也控制重平衡通知的频率。重平衡的通知机制也是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

出现场景

负面影响

重平衡状态机

kafka借助于状态机实现重平衡


状态含义
状态切换

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。在 Kafka 的日志中一定经常看到下面这个输出:Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.这就是 Kafka 在尝试定期删除过期位移。只有 Empty 状态下的组,才会执行过期位移删除的操作。

重平衡过程

在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
目前服务端做成无状态,既是优点也是缺点,优点是降低了服务端的维护成本,但是缺点是每次重平衡,历史分区信息都要临时搜集,过程复杂,而且所有消费者都要停下手中工作并参与重平衡。并且之前任务重分配也不考虑历史分配,在0.11.0.0版本才引入粘性冲平衡策略。

  1. 消费者端


    image.png
    image.png
  2. 协调者端
上一篇 下一篇

猜你喜欢

热点阅读