rebalance,rebalance过程学习笔记
2021-01-15 本文已影响0人
HannahLi_9f1c
- 什么是再均衡(rebalance)
再均衡是指分区的所有权从一个消费者转移到另一个消费者的行为。在均衡需要重新分配分区方案 - 再均衡的影响
rebalance期间,消费者无法消费读取消息,所以在这期间会变得不可用,应该避免rebalance的出现。 - 再均衡监听器ConsumerRebalanceListener
有两个接口onPartitionsRevoked再均衡之前,消费者停止读取时调用,可以实现该接口,保存偏移量
onPartitionsAssigned,再均衡之后,消费者读取之前调用,可以实现该接口,从取出上一步保存的偏移量,这样可以防止重复消费。因为再均衡之前,可能消费者已经处理完偏移量,但是没有提交位移,再均衡之后会读取偏移量,导致重复消费。 - 旧版消费者客户端的问题
每个消费者在启动时都会在/ consumers/<group>/ids 和/brokers/ids 路径上注册
个监听器。当/consumers/<group>/ids 路径下 子节点发生变化时,表示消费组中的消
费者发生了变化;当/brokers/ids 路径下的子节点发生变化时,表示 broker 出现了增减
样通过 ZooKeeper 提供的 Watcher 每个消费者就可以监昕消费组和 Kafka 集群的状态了
这种方式下每个消费者对 ZooKeeper 相关路径分别进行监听, 触发再均衡操作时,
每个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这
样可能导致 Kafka 工作在不正确的状态,会造成羊群效应和脑裂问题。 - 触发再均衡的操作
- 有新的消费者加入
- 有消费者宕机下线,可能是因为网络原因
- 消费者主动退出消费者
- 消费者对应的GroupCoorinator节点发生变更
- 消费组内订阅的主题或者主题分区数量发生变化
- 再均衡的过程
- FIND_COORDINATOR
确定消费者所属的GroupCoordinator所在的broker,如果消费者已经保存了GroupCordinator信息,可以进入下一个阶段,否则需要查找_consumer_offset对应的分区的leader副本所对应的broker,以此broker作为GroupCordinator角色,又扮演分区分配和组内消费者位移的角色。 - JOIN_GROUP
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的
消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
如果是原有的消费者重新加入消费组,那么在真正发送 JoinGroupRequest 请求之前还要执
行一些准备工作:
( )如果消费端参数 enable.auto.commit 设置为 true (默认值也为 true ), 即开启自
动提交位移功能,那么在请求加入消费组之前需要向 GroupCoordinator 提交消费位移。这个过
程是阻塞执行的,要么成功提交消费位移,要么超时。
)如果消费者添加了自定义的再均衡监听器( ConsumerRebalanceListener ),那么此时
会调用 onPartitionsRevoked()方法在重新加入消费组之前实施自定义的规则逻辑,比如清除一些
状态,或者提交消费位移等。
(3 )因为是重新加入消费组,之前与 GroupCoordinator 节点之间的心跳检测也就不需要了,
所以在成功地重新加入消费组之前需要禁止心跳检测的运作。
选举消费组的 leader
GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 leader ,这个选举的算法也
很简单,分两种情况分析。如果消费组内还没有 leader ,那么第1 个加入消费组的消费者即为
消费组的 leader 。如果某一时刻 leader 消费者由于某些原因退出了消费组, 那么会重新选举
个新的 leader ,这个重新选举 leader 的过程又更“随意”了,相关代码如下
//scala code.
private val members = new mutable .HashMap[String, MemberMetadata]
var leaderid = members.keys.head
在 GroupCoordinator 中消费者的信息是以 HashMap 的形式存储的,
其中 key 为消 费者的 member id ,而 value 是消 费者相关的元数据信息。 leaderld 表示 leader
消费者的 member id ,它的取值为 HashMap 中的第一个键值对的 key ,这种选举的方式基本
上和随机无异 总体上来说,消费组的 leader 选举过程是很随意的。
选举分区分配某咯
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的
各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配 这个分区分配的选
举并非由 leader 费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的 “根
据组内的各个消费者投票来决定” Group Coordinator 还要再与各个消费者进行进一步交
互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个
消费者支持的最多的策略,具体的选举过程如下:
(1 )收集各个消费者支持的所有分配策略,组成候选集 candidates
(2 )每个消费者从候选集 candidates 找出第 1个自身支持的策略,为这个策略投上一票。
(3 )计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果有消费者并不支持选出的分配策略,那么就会报出异常 IllegalArgumentException:
Member does not support。此 protocol 需要注意的是,这里所说的“消费者所支持的分配策略”是
partition.assignment.strategy 参数配置的策略,如果这个参数值只配置了
RangeAssignor 那么这个消费者客户端只支持 RangeAssignor 分配策略,而不是消费者客户端
代码中实现的 种分配策略及可能的自定义分配策略
在此之后, Kafka 服务端就要发送 JoinGroupResponse 响应给各个消费者, leader 消费者和
其他普通消费者收到的响应内容。
JoinGroupRespons巳包含了多个域, 用来标识当前消费组的年代信
息,避免受到过期请求的影响。 -
SYNC_GROUP
leader 消费者根据在第二阶段中选出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时 leader 消费者并不是直接和其余的普通消费者同
步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的 在第
阶段,也就是同步阶段,每个消费者会 GroupCoordinator 发送 SyncGroupRequest 请求来同步
分配方案。
当消费者收到所属的分配方案之后会调用 PartitionAssignor 中的 onAssignment() 方法。随后
再调用 CoumerRebalanceListener 中的 OnPartitionAssigned() 方法 之后开启 跳任务 消费者
定期 向服务端的 GroupCoordinator 发送 HeartbeatRequest 来确定彼此在线。
image.png - HEARTBEAT
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分
区的所有权关系。只要消费者以正常的时间间隔发送 就被认为是活跃的 ,说明它还在
取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停
发送心跳的时间足够长,则整个会话就被判定为过期, GroupCoordinator 会认为这个消费者
己经死亡,就会触发一次再均衡行为,消费者的心跳间隔时间由参数 heartbeat.interval.ms
指定,默认值为 3000,这个参数必须比 session.timeout.ms 参数设定的值要小,
般情况下 heartbeat.interval.ms的配置值不能超过 session.timeout.ms 配置值的
1/3 。这个参数可以调整得更低,以控制正常重新平衡的预期时间
如果消费者发生崩溃,并停止读取消息 那么 GroupCoordinator 会等待一 段时间
确认这个消费者死亡之后才会触发再均衡。在这一小段时间内, 死掉的消费者井不会读取分区
里的消息。这个一小段时间由 session.timeout.ms 参数控制,该参数的配置值必须在 broker
端参数 group.m n.sessio timeout . ms (默认值为 6000 ,即 6秒)和 group max.
session. timeout. ms (默认值为 300000 ,即 5分钟)允许的范围内。
还有一个参数 max.poll.interval.ms ,它用来指定使用消费者组管理时 poll () 方法调
用之间的最大延迟,就是消费者在获取更多消息之前可以空闲的时间量的上限。如果超时
时间期满之前 poll ()没有调用, 消费者被视为失败,并且分组将重新平衡, 便将分区重新分
配给别的成员。
除了被动退出消费组,还可 以使 LeaveGroupRquest 请求主动退出消费组,比如客户端
调用了 unsubscrible() 方法取消对某些主题的订阅
rebalance的参数的对比
-
consumer_offsets
《深入理解Kafka:核心设计与实践原理》朱忠华_2019-01-01 读书笔记