RocketMQ consumer Rebalance过程
2020-05-03 本文已影响0人
晴天哥_王志
系列
- RocketMQ consumer 启动流程
- RocketMQ consumer Rebalance过程
- RocketMQ consumer 注册过程
- RocketMQ consumer 并行消费过程
- RocketMQ consumer 有序消费过程
开篇
-
这个系列的主要目的是介绍RocketMq consumer的原理和用法,在这个系列当中会介绍 consumer的启动流程、consumer Rebalance的过程、consumer注册过程、consumer 并行消费过程、consumer 有序消费过程。
-
这篇文章介绍consumer Rebalance的过程,介绍consumer的重平衡过程,Rebalance过程按照订阅的topics依次进行重平衡过程。
consumer example
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 1、创建DefaultMQPushConsumer对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 2、设置消费位移
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3、订阅topic
consumer.subscribe("TopicTest", "*");
// 4、设置消费回调
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5、启动DefaultMQPushConsumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 介绍rocketmq consumer的用法,按照下列步骤进行。
- 创建DefaultMQPushConsumer对象。
- 设置consumer的消费位移。
- 设置consumer的订阅topic。
- 设置consumer的消费回调。
- 启动consumer。
订阅过程
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
// 核心的重平衡过程RebalanceImpl
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final RPCHook rpcHook;
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private volatile boolean pause = false;
private boolean consumeOrderly = false;
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
// 1、构建订阅对象SubscriptionData
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
// 2、注册topic和对应的订阅关系到RebalanceImpl当中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
// 保存订阅关系
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
}
- consumer的订阅过程主要步骤如上源码所示。
- 1、构建订阅对象SubscriptionData。
- 2、注册topic和对应的订阅关系到RebalanceImpl当中。
- consumer的订阅信息以topic维度进行保存,保存在RebalanceImpl中。
public class FilterAPI {
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
}
public class SubscriptionData implements Comparable<SubscriptionData> {
public final static String SUB_ALL = "*";
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
private String expressionType = ExpressionType.TAG;
@JSONField(serialize = false)
private String filterClassSource;
}
- SubscriptionData的数据结构主要包含topic、subString、tags等信息。
- FilterAPI#buildSubscriptionData负责提供构建方法,不传tags默认表示订阅所有tags。
Rebalance过程
RebalanceService
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
- MQClientInstance#start过程中执行rebalanceService.start()启动重平衡服务。
- RebalanceService以20s的间隔执行mqClientFactory.doRebalance()调用MQClientInstance#doRebalance。
MQClientInstance
public class MQClientInstance {
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
if (null == group || null == consumer) {
return false;
}
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
}
- MQClientInstance的consumerTable保存以consumer group作为key,DefaultMQPushConsumerImpl作为value的KV数据结构。
- MQClientInstance的consumerTable通过registerConsumer方法来注册consumer group和对应的DefaultMQPushConsumerImpl对象。
- MQClientInstance#doRebalance遍历所有consumer group,依次调用DefaultMQPushConsumerImpl#doRebalance来实现consumer的重平衡过程。
DefaultMQPushConsumerImpl
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
}
RebalanceImpl
- DefaultMQPushConsumerImpl包含RebalanceImpl对象。
- RebalanceImpl的实现类包括RebalanceLitePullImpl、RebalancePullImpl、RebalancePushImpl。
- 针对消费测我们关注的是RebalancePushImpl对象的方法,通用方法在父类RebalanceImpl当中。
RebalanceImpl
public abstract class RebalanceImpl {
// 保存topic和对应的订阅信息
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// 执行广播模式下的负载均衡
break;
}
case CLUSTERING: {
// 执行集群模式下的负载均衡
break;
}
default:
break;
}
}
}
- RebalanceImpl#subscriptionInner保存topic和对应的订阅信息。
- 遍历subscriptionInner当中的所有topic及其对应的订阅信息依次执行rebalanceByTopic过程。
- rebalanceByTopic过程区分 有序和无序 以及 广播和集群 的两两组合。
- 暂时关注集群模式下rebalance过程。
集群模式下rebalance过程
public abstract class RebalanceImpl {
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// 省略相关代码
}
case CLUSTERING: {
// 1、获取topic下所有的MessageQueue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 2、获取topic下该consumerGroup下所有的consumer对象
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
// 开始重新分配进行rebalance
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 3、针对mqAll和cidAll进行排序
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 4、通过分配策略重新进行分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 5、根据分配结果执行真正的rebalance动作
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
// 6、将rebalance的结果通知broker
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
}
- 针对单个topic的整体rebalance操作如上图代码所示。
- 1、获取topic下所有的MessageQueue。
- 2、获取topic下该consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。
- 3、针对mqAll和cidAll进行排序,mqAll排序顺序按照先brokerName后brokerId,cidAll排序按照字符串排序。
- 4、通过分配策略AllocateMessageQueueStrategy重新进行分配。
- 5、根据分配结果执行真正的rebalance动作。
- 6、将rebalance的结果通知broker。
AllocateMessageQueueStrategy
AllocateMessageQueueStrategypublic class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 核心逻辑计算开始
// 计算当前cid的下标
int index = cidAll.indexOf(currentCID);
// 计算多余的模值
int mod = mqAll.size() % cidAll.size();
// 计算平均大小
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// 计算起始下标
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 计算范围大小
int range = Math.min(averageSize, mqAll.size() - startIndex);
// 组装结果
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
// 核心逻辑计算结束
@Override
public String getName() {
return "AVG";
}
}
分配计算逻辑举例
rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。
rocketMq上存在名为topic_demo的topic,writeQueue为3,分布在3个broker。
排序后的mqAll的大小为9,依次为
[broker_a_0、broker_a_1、broker_a_2、
broker_b_0、broker_b_1、broker_b_2、
broker_c_0、broker_c_1、broker_c_2]
rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为
[192.168.0.6@15956、192.168.0.7@15957、192.168.0.8@15958、192.168.0.9@15959]
192.168.0.6@15956 的分配MessageQueue结算过程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]
192.168.0.6@15957 的分配MessageQueue结算过程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]
192.168.0.6@15958 的分配MessageQueue结算过程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]
192.168.0.6@15959 的分配MessageQueue结算过程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]
ConsumerIdList获取逻辑
public class MQClientInstance {
public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
public String findBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
if (!brokers.isEmpty()) {
int index = random.nextInt(brokers.size());
BrokerData bd = brokers.get(index % brokers.size());
return bd.selectBrokerAddr();
}
}
return null;
}
}
- ConsumerIdList的获取是随机选择一台broker进行通信,从broker中获取该consumerGroup对应的consumers。
- ConsumerIdList是从broker当中获取的,保存在broker当中,而非namesrv。
生效rebalance结果
public class MQClientInstance {
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 遍历已有的processQueueTable,删除不在此次rebalance结果里面的MessageQueue对应的processQueue。
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
// 针对不在本次rebalance结果mqSet当中的情况,设置删除
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
// 针对本次新增的MessageQueue,创建ProcessQueue并进行添加
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 针对本次新增的MessageQueue创建对应ProcessQueue。
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
- 生效rebalance结果的过程阶段一:删除不负责的MessageQueue对应的ProcessQueue。
- 生效rebalance结果的过程阶段二:添加新负责的MessageQueue对应的ProcessQueue。
通知生效rebalance结果
public class RebalancePushImpl extends RebalanceImpl {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
if (pullThresholdForTopic != -1) {
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
log.info("The pullThresholdForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
}
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
if (pullThresholdSizeForTopic != -1) {
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
log.info("The pullThresholdSizeForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
}
}
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
}
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
// 遍历所有的brokerAddrTable依次通知
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
// 获取broker地址后发送消息
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
} catch (Exception e) {
}
}
}
}
}
}
}
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
// clientID
heartbeatData.setClientID(this.clientId);
// Consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
}
- consumer通过prepareHeartbeatData组装心跳数据,遍历所有的broker依次进行发送告知。