关于消费者
消费模型
- Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
- 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
- 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
- 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
- Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。
- 为加速消费,可以增加topic分区并增加消费者实例
- 既支持点对点又支持订阅/发布模型
消费者内部线程模型
KafkaConsumer 采用双线程的设计,即用户主线程和心跳线程。所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性
引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中。KafkaConsumer 中有个方法是例外的,它就是 wakeup(),你可以在其他线程中安全地调用 KafkaConsumer.wakeup() 来唤醒 Consumer
消费者业务线程模型
为了加速消费,提高并行性,消费者端引入多线程。
多消费实例
消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。
image.png
单消费实例,多业务线程
消费者程序使用单消费实例,但创建多个消费线程并行消费。实现难度较大,容易造成乱序。
image.png
多消费示例,每个实例又多消费线程
该方案是上述两种方案的整合
image.png
重平衡reblance
Coordinator 是消费位移消息所提交的分区的leader所在broker,负责消费者组的组成员管理和各个消费者的位移提交管理。
某个组的所有消费者保持向该组对应的Coordinator 发送心跳,heartbeat.interval.ms既设置了心跳间隔,也控制重平衡通知的频率。重平衡的通知机制也是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。
出现场景
- consumer发送心跳(heartbeat.interval.ms)超时(session.timeout.ms),被Coordinator 踢出组;
- consumer消费太久poll超时(max.poll.interval.ms),自己主动退组
缺点:不仅导致reblance,再提交offset时还会遇到CommitFailedException异常
解决办法:控制一次性拉取的消息数(max.poll.records)、多线程、控制业务逻辑复杂度; - topic变动
- topic的分区变动
负面影响
- 所有消费者需要暂停消费
- 任务重新分配,tcp连接也要重新建立
- 可能导致消费者offset提交异常,导致重复消费
- 整个过程图很慢,浪费系统性能
重平衡状态机
kafka借助于状态机实现重平衡
状态含义
状态切换
当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。在 Kafka 的日志中一定经常看到下面这个输出:Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.这就是 Kafka 在尝试定期删除过期位移。只有 Empty 状态下的组,才会执行过期位移删除的操作。
重平衡过程
在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
目前服务端做成无状态,既是优点也是缺点,优点是降低了服务端的维护成本,但是缺点是每次重平衡,历史分区信息都要临时搜集,过程复杂,而且所有消费者都要停下手中工作并参与重平衡。并且之前任务重分配也不考虑历史分配,在0.11.0.0版本才引入粘性冲平衡策略。
-
消费者端
image.png
image.png - 协调者端
-
新成员入组
image.png -
组成员主动离组
image.png -
组成员崩溃离组
image.png -
重平衡时协调者对组内成员提交位移的处理
image.png