RocketMq 广播模式
2020-05-09 本文已影响0人
晴天哥_王志
系列
开篇
-
这个系列主要用来讲解RocketMq的一些常用功能的实现原理,包括 消息过滤、广播模式消费、事务消息、同步调用等功能。
-
这篇文章主要是用来讲解RocketMq广播模式消费实现逻辑,核心差别在于1、广播模式消费位移使用本地文件存储、2、Rebalance过程中同一个ConsumeGroup下的consumer不会进行MessageQueue的分配;而是每个consumer负责消费所有MessageQueue。
广播模式例子
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_test");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "A|B");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 设置广播模式consumer.setMessageModel(MessageModel.BROADCASTING)。
consumer的本地消费位移
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 针对广播模式,我们使用本地文件存储位移
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
}
- 针对广播模式,我们使用本地文件LocalFileOffsetStore来存储消费位移。
public class LocalFileOffsetStore implements OffsetStore {
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
}
}
- LocalFileOffsetStore的目录的规则如上所示。
本地存储消费位移
cat /Users/lebron374/.rocketmq_offsets/192.168.0.8\@DEFAULT/consumer_group_test/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":7,
"topic":"TopicTest"
}:1123,{
"brokerName":"broker-a",
"queueId":6,
"topic":"TopicTest"
}:1119,{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1121,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1116,{
"brokerName":"broker-a",
"queueId":5,
"topic":"TopicTest"
}:1114,{
"brokerName":"broker-a",
"queueId":4,
"topic":"TopicTest"
}:1119,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1141,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1139
}
- 本地消费位移存储的格式以MessageQueue对象作为key,value为对应的消费位移。
广播模式Rebalance过程
public abstract class RebalanceImpl {
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 负载均衡的过程中没有进行AllocateMessageQueueStrategy进行分配
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 省略代码
break;
}
default:
break;
}
}
}
- 广播模式消费场景下的rebalance过程相比集群模式而言不存在分配MessageQueue的过程,每个consumer负责订阅的topic下的所有MessageQueue。
- rebalanceByTopic内部针对BROADCASTING场景直接调用updateProcessQueueTableInRebalance。