Kafka Consumer Rebalance

2018-09-07  本文已影响42人  04974ba324f9
0.8.2.2之前版本版本:

  Kafka的Consumer Rebalance方案是基于Zookeeper的Watcher来实现的。每个consumer group(cg)在zk下都维护一个”/consumers/[group_name]/ids”路径,在此路径下,使用临时节点记录属于此cg的消费者的Id,该Id信息由对应的consumer在启动时创建。还有两个与ids同一级别的节点,他们分别是:owners节点,记录了分区与消费者的对应关系(即:Topic A的Partition 0在Group G上实际由哪个Id的消费者负责消费);offset节点,记录了此cg在某个分区上的消费位置(即:Group G对Topic A的Partition 0实际现在消费到了第几的offset)。每个Broker、Topic以及分区在zk上也都对应一个路径:

/brokers/ids/[broker_id]:记录了该broker的host、port信息,同时还记录了分配在此Broker上的Topic的分区列表。
/brokers/topics/[topic_name]:记录了每个partition的leader、isr等信息。
/brokers/topics/[topic_name]/partitions/[partition_num]/stat:记录了当前leader、选举epoch等信息


image.png

    每个consumer都分别在”/consumers/[group_name]/ids”和”/brokers/ids”路径上注册一个Watcher。当”/consumers/[group_name]/ids”路径的子节点发生变化时,表示consumer group中的消费者出现了变化;当”/brokers/ids”路径的子节点发生变化时,表示Broker出现了增减。这样,通过Watcher,每个消费者就可以监听Consumer Group和Kafka集群的状态变化了。

image.png
Consumer rebalacne算法:
1. 将目标 topic 下的所有 partirtion 排序,存于PT
2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
3. N=size(PT)/size(CG),向上取整
4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci  
控制策略:
1. 在/consumers/[consumer-group]/下注册id
2. 设置对/consumers/[consumer-group] 的watcher
3. 设置对/brokers/ids的watcher
4. zk下设置watcher的路径节点更改,触发consumer rebalance

但由于验证依赖于zk集群,有两个比较严重的问题:

常见现象:

    目前有三种方式会触发rebalance,其一Topic与partition映射表发生变化;其二同一订阅组中消费节点数发生变化,其三zk会话过期 。
    前两种直接就做rebalance操作了,最后一种消费节点重新注册临时znode到zk上,然后再做rebalance操作,rebalance失败会导致消费节点下线,其他活跃消费节点也无法分配到分片数据,其实质原因是消费节点rebalance失败下线时并没有从zk的Consumer Group下删除自身临时节点,而每个消费节点的分片数据是根据Consumer Group下数量按照计算规则分配的,所以活跃消费节点无法分配到分片数据。
==========================
    归根到底就是 原有consumer没有释放掉上次分配的partition,导致了下次rebalance的时候发现分配的partition还没有被释放,所以这个consumer就报出ConsumerRebalanceFailedException;如果等到释放了该partition,并再次触发了rebalance,则可以分配成功。

针对业务经常遇到rebalance failed:
  1. 检查代码和配置(确保 rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms ),并重启
  2. 检查业务系统,有无出现OOM等情况(这种经常会导致所有的partition同时不消费)
  3. 检查zookeeper上节点状况,如ids、owner正常,则手动new consumer 触发一下rebalance,如果不是线程卡死造成资源占用等情况,应该就回成功消费
  4. 如果还是rebalance失败,是因为资源没有被释放,只能重启下业务的consumer进程
上一篇下一篇

猜你喜欢

热点阅读