RocketMQ-Producer

2020-04-03  本文已影响0人  丑人林宗己

rocketmq 发送端的设计
1、需要考虑什么?

容错设计

重试

2、消息投递需要的参数

原则是:根据topic选择相应的逻辑队列,逻辑队列中相应的指定了broker, topic

3、rocketmq究竟怎么做?

3.1、TopicPublishInfo
这个数据结构保存的是该topic下的所有的逻辑队列List<MessageQueue>,以及本次发起请求时选择队列的下标值ThreadLocalIndex(index & messagequeues.size 才是真实的下标值)。
PS:这里有点绕,为何是保存所有的topic下的逻辑队列?而不是按照broker先分组,每个broker下再分逻辑队列呢?

3.2、FaultItem
这个数据结构保存着broker name, 以及对应的隔离时间,具体要看isAvailable()。该数据接口维系在faultItemTable这个Map中(ConncurrentHashMap,保证并发安全)

3.3、失败之后做了什么?

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 

参数1是名称,参数2一次调用耗费的时间,参数2是是否隔离(隔离意味着熔断)

如果开启了容错保护机制,那么会根据是否隔离来计算broker的隔离时间。currentLatency + 30000 -> 并在notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}这个数组中找到第一个大于该结果的值作为隔离的持续时间。

比如:currentLatency 0:30000,那么结果无疑就是60000,如果是30000:90000,那么结果就是120000,以此类推。所以隔离的持续时间其实是数组中的值。

PS: faultItem中记录的是隔离时间是:System.currentTimeMillis() + notAvailableDuration。而判断broker有效是:(System.currentTimeMillis() - startTimestamp) >= 0

3.4、选择队列的依据是什么?

随机下标(ThreadLocalIndex + 1), 以messagequeues.size作为循环次数,找到可用的broker(判断可用的依据是如上),如果找不到,则依据某个算法(随机打乱排序取二分之一位置的节点)在整个队列中选择一个队列(队列的id为随机值%broker可写队列长度)。

容错设计

1、第一次有地失败,并且该服务端被标记为隔离状态,时间参考notAvailableDuration值。
2、在这个持续时间内,该服务不会提供服务(运维这个时候需要去排查原因并重新启动)
3、超过该持续时间后,该节点会重新变为可用(但是不一定真实可用,如果不可用将会重新回到第1步)

重试

核心的代码包括:

DefaultMQProducerImpl#sendDefaultImpl()
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
DefaultMQProducerImpl#selectOneMessageQueue()
DefaultMQProducerImpl#updateFaultItem()
上一篇下一篇

猜你喜欢

热点阅读