RocketMQ源码:有序消息分析
作者:Jacktanger
本文主要分析RocketMQ中如何保证消息有序的。
RocketMQ的版本为:4.2.0 release。
一.时序图
还是老规矩,先把分析过程的时序图摆出来:
1.Producer发送顺序消息
RMQ1.jpg
2.Consumer接收顺序消息(一)
RMQ2.jpg
3.Consumer接收顺序消息(二)
RMQ3.jpg
二.源码分析 - Producer发送顺序消息
1 DefaultMQProducer#send:发送消息,入参中有自定义的消息队列选择器。
// DefaultMQProducer#send
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
1.1 DefaultMQProducerImpl#makeSureStateOK:确保Producer的状态是运行状态-ServiceState.RUNNING。
// DefaultMQProducerImpl#makeSureStateOK
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, "+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}
1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:根据Topic获取发布Topic用到的路由信息。
// DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer更新获取,false,不传入 defaultMQProducer
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由信息而且状态OK,则返回
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1.3 调用自定义消息队列选择器的select方法。
// DefaultMQProducerImpl#sendSelectImpl
MessageQueue mq = null;
try {
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
// Producer#main
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
1.4 DefaultMQProducerImpl#sendKernelImpl:发送消息的核心实现方法。
// DefaultMQProducerImpl#sendKernelImpl
......
switch (communicationMode) {
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
......
1.4.1 MQClientAPIImpl#sendMessage:发送消息。
// MQClientAPIImpl#sendMessage
......
switch (communicationMode) {// 根据发送消息的模式(同步/异步)选择不同的方式,默认是同步
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
......
1.4.1.1 MQClientAPIImpl#sendMessageSync:发送同步消息。
// MQClientAPIImpl#sendMessageSync
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
1.4.1.1.1 NettyRemotingClient#invokeSync:构造RemotingCommand,调用的方式是同步。
// NettyRemotingClient#invokeSync
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
三.源码分析 - Consumer接收顺序消息(一)
1 DefaultMQPushConsumer#registerMessageListener:把Consumer传入的消息监听器加入到messageListener中。
// DefaultMQPushConsumer#registerMessageListener
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传入的消息监听器加入到messageListenerInner中。
// DefaultMQPushConsumerImpl#registerMessageListener
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}
2 DefaultMQPushConsumer#start:启动Consumer。
// DefaultMQPushConsumer#start
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
2.1 DefaultMQPushConsumerImpl#start:启动ConsumerImpl。
// DefaultMQPushConsumerImpl#start
switch (this.serviceState) {
case CREATE_JUST:// 刚刚创建
......
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 有序消息服务
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 并发无序消息服务
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
......
this.consumeMessageService.start();// 启动消息服务
......
mQClientFactory.start();// 启动MQClientInstance
......
2.1.1 new ConsumeMessageOrderlyService():构造顺序消息服务。
// ConsumeMessageOrderlyService#ConsumeMessageOrderlyService
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(// 主消息消费线程池,正常执行收到的ConsumeRequest。多线程
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
2.1.2 ConsumeMessageOrderlyService#start:启动消息队列客户端实例。
// DefaultMQPushConsumerImpl#start
this.consumeMessageService.start();
// ConsumeMessageOrderlyService#start
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();// 定时向broker发送批量锁住当前正在消费的队列集合的消息
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
2.1.2.1 ConsumeMessageOrderlyService#lockMQPeriodically:定时向broker发送批量锁住当前正在消费的队列集合的消息。
2.1.2.1.1 RebalanceImpl#lockAll:锁住所有正在消息的队列。
// ConsumeMessageOrderlyService#lockMQPeriodically
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
// RebalanceImpl#lockAll
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 根据brokerName从processQueueTable获取正在消费的队列集合
......
Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker发送锁住消息队列的指令
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
......
2.1.3 MQClientInstance#start:启动MQClientInstance。过程较复杂,放到大标题四中分析。
// DefaultMQPushConsumerImpl#start
mQClientFactory.start();
四.源码分析 - Consumer接收顺序消息(二)
1 MQClientInstance#start:启动客户端实例MQClientInstance。
// MQClientInstance#start
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
......
// Start pull service 启动拉取消息服务
this.pullMessageService.start();
// Start rebalance service 启动消费端负载均衡服务
this.rebalanceService.start();
......
1.1 PullMessageService#run:启动拉取消息服务。实际调用的是DefaultMQPushConsumerImpl的pullMessage方法。
// PullMessageService#run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
// PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);// 调用DefaultMQPushConsumerImpl的pullMessage
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息。提交到ConsumeMessageOrderlyService的线程池consumeExecutor中执行。
// DefaultMQPushConsumerImpl#pullMessage
......
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
......
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
......
1.1.1.1.1.1.1 ConsumeRequest#run:处理消息消费的线程。
// ConsumeMessageOrderlyService.ConsumeRequest#run
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
......
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 实际消费消息的地方,回调消息监听器的consumeMessage方法
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
......
1.2 RebalanceService#run:启动消息端负载均衡服务。
// RebalanceService#run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
// MQClientInstance#doRebalance
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
// DefaultMQPushConsumerImpl#doRebalance
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
1.2.1.1.1 RebalanceImpl#doRebalance:负载均衡服务类处理。
// RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
// RebalanceImpl#rebalanceByTopic
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 根据Toipc去除queue
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
......
// RebalanceImpl#updateProcessQueueTableInRebalance
this.dispatchPullRequest(pullRequestList);// RebalancePushImpl分发消息
1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分发。
// RebalancePushImpl#dispatchPullRequest
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
五.总结
相比Producer的发送流程,Consumer的接收流程稍微复杂一点。通过上面的源码分析,可以知道RocketMQ是怎样保证消息的有序的:
1.通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。设置本地队列ProcessQueue的locked属性为true。保证broker中的每个消息队列只对应一个消费端;
2.另外,消费端也是通过锁,保证每个ProcessQueue只有一个线程消费。
文章来源:https://my.oschina.net/javamaster/blog/2051910
推荐阅读:https://www.roncoo.com/course/list.html?courseName=MQ