RocketMQ

Consume Rebalance in RocketMQ &

2017-06-19  本文已影响0人  lysu

今天的计划看下两个消息中间件RocketMQ和Kafka的Rebalance方式- -

首先说下Rebalance是做啥...为啥需要rebalance并介绍一些参与rebalance的基本概念~

Kafka(RocketMQ)在Broker中会将一个topic划分为多个Partition(ConsumeQueue), 消息在生产后会被投递到某个Partition(ConsumeQueue)中.(PS: partition可以被分配不同broker)

而对于消费者,为了解决一条消息如何消费的问题, 引入了ConsumeGroup并将Consumer分配到某个consumeGroup中, MQ在处理消息时会保证消息,在一个Group中只会被一个Consumer消费(In ClusterMode也是正常大家使用的方式).(所以N台机器如果在Group中只会被一个Consumer收到)

所以,MQ需要

而上面的过程就是今天要讨论的Rebalance

RocketMQ

粗看流程

RocketMQ的Rebalance逻辑实际是发生在Consume客户端的(当然也必须会从Broker或Nameserver获取一些信息), 处理思路简单说是这样:(注意RocketMQ里的ConsumeQueue可以理解为Kafka的Partition,所以这节里都用ConsumeQueue表述)

reblance核心逻辑可以参看RebalanceImpl#topicSubscribeInfoTable

触发条件这个rebalance的条件有:

上面简单的描述了分配过程,不过我们接下来会看下各个细节~

获取所有ConsumeQueue信息

这里我们从设置开始一起走到获取~

问题:...等等一会儿看

获取Group中的其他Consume信息

这里我们倒过来从获取走到数据来源..

好了这里画个图总结下~

所有Conume都会向所有broker建立连接并心跳上报,所以所以任意一台broker都有当前group的所有节点信息(正常情况), 客户端想要获取当前group的所有consumer信息直接乱选一台活着的获取就好了

几个内置的分配策略

首先前面说过分配前提是已经获取到所有可用Queue和所有当前Group的Consumer,并都做了排序,各个Consumer各自执行分配, 分配逻辑实现是AllocateMessageQueueStrategy的几个实现

总结下最后结果就是能获取到属于当前consumer的ConsumeQueue(代码里叫MessageQueue)

标记已Queue已被占用

上面我们看到整个标记过程都是在consumer本地就完成了,各个consumer间通过排序+一个一致的算法就完成了分配,并没有和其他consumer的交互。

然而这是有问题的,因为rebalance是各自执行,不排除某个时刻两个同一个Group的两个Consumer都怼到一个Queue上,而这个从设计上是绝对不允许的,所以这里需要一个机制保证永远不会出现同Group两个Consume怼到一个Queue上。

RocketMQ目前是选择在Broker上维护一个LockMap来实现(后面会讨论这个也许有问题??)

RebalanceImpl#updateProcessQueueTableInRebalance中, 如果是新分配的Queue, 会调用this.lock(mq)

for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
         if (isOrder && !this.lock(mq)) { // !!! here
             log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
               continue;
         }

继续往下跟代码(为了避免太长这里不贴代码了),会发现lock会向masterNode(brokerId=0)的节点发LockBatchRequestBody(只有Master?Master挂了的话- -?)

最后在masterNode内存中会通过RebalanceLockManager#mqLockTable实现加锁占用(带超时默认1分钟超时类似租期), 如果master时这lock信息会丢失掉?当依赖定时rebalance可以恢复,不过那次rebalance如果有冲突之类的情况发生的话...? 好吧 后面再来看这些check特殊场景

如果加锁失败(别人已经占用或者锁请求失败)会不对这个queue不做处理。。然后等下次rebalance, 再来看别人是否释放锁和masterbroker是否恢复...

同样在RebalanceImpl#updateProcessQueueTableInRebalance会将无需处理的队列从当前处理中remove掉~这部分逻辑跟下去位于RebalancePushImpl#removeUnnecessaryMessageQueue,会等待当前正在执行的消费并等processQueue处理干净才尝试向maserNode发起unlock(看代码这里好像没处理masterBroker网路不通的情况如果unlock不成功直接算成功了???)

好了如果按照预期正常unlock,其他consumer可以lock并开始消费,或者等20s下次rebalance可以开始消费(如果本次因次序没竞争lock上)

Challenge

最后,我们来假设些场景,看看能否正常work

1. Consume加入

整体看没啥问题,虽然新加入的consumer要等一阵才能接手消费(有间隙消费小lag), 另外那个等一会儿unlock特殊情况下一会儿小概率会有问题

2. Consume离开(断线)

感觉这部分等锁超时有些无奈- -,消费不会乱但会有消费lag增加

3. Consume同时并发加入

所以我们看到可以保证消费不会乱,不过代价是要过一阵新加入的consume才能真正开始接手消费(间隙小lag)

4. Topic调整Queue数量

上面提到过Client每10s会从NameServer刷一次TopicRoutine(MQClientInstance#startScheduledTask), 所以Queue变化正常会在这里被收到并更新本地缓存。

然后,正常情况下等下次rebalance时就会用新的Queue信息进行重新分配,然后基于上面说的lock和定期重rebalance规则,最终可以保证ok且中途不乱

异常情况下,想到的几个NameServer数据不一致或交换routine刷新和rebalance次序,看好像最终也都能达到期望状态 - -

5. Broker挂了

上面提到过,lock信息是放到每个BrokerGroup中的master(id0)上的,所以如果Master挂了的话,lock会用永远不成功,可以理解为新Consume无法加入,老Consume无法退出,必须等待broker活过来,但之前在跑的的还可以正常运行(只要别离开了还没加入然后broker挂了- -这种情况部分queue会有lag)

(PS背景介绍: 在rocketmq的设计里brokerGroup的master挂了group不可以写入,但可以改写其他brokerGroup来完成写入HA,消费者HA可以通过brokerGroup里的slave消费之前堆在brokerGroup里的内容)

broker活过来后,因为是内存,所以下次触发rebalance会重新恢复lock的map。。

不过感觉有个极端情况。。就是master挂了,然后这时消费者有变化或者队列数目有调整。。。因为启动时内存为空等于没占锁,而实际之前consumer已经在跑,在还没来得及rebalance就发生了变更,这时可能出现同group里两个consumer同时消费一个queue????

RocketMQ小结

RocketMQ在master不挂的情况下rebalance可以保证消费不乱,虽然可能会有消息lag问题但感觉并不关键;而master挂且同时发生rebalance这个的确有些问题。。此外rebalance完全由客户端控制其他人有没有用上相互之前并不知道;并且各自拉namesrv可能会看到不一致的数据虽然最终通过定期重rebalance可以一致会导致不必要的rebalance的感觉- -

看似有些问题待解决,如果理解有误欢迎讨论~~哈哈哈

下面开始看下kafka0.9版本之后的方式,据说kafka在很久之前和rocketmq目前用的很像, 但后来改了..

Kafka

kafka还在看, 没按照预期搞完,见下偏文章哈~

上一篇下一篇

猜你喜欢

热点阅读