RocketMQ源码分析之消息拉取流程
在《RocketMQ源码分析之RebalanceService》中回答了消费者在第一次启动后是如何来获取消息这个问题,那么在构建PullRequest(消息拉取任务)后,消费者与broker之间是如何交互来完成消息拉取任务?本篇文章就来分析消息拉取流程。在consumer端与消息拉取流程相关的服务主要是RebalanceService和PullMessageService,RebalanceService主要负责consumer端消息队列负载均衡及构建PullRequest,PullMessageService主要负责consumer端消息拉取。下面从PullMessageService入手来分析。
PullMessageService是在consumer启动过程中启动MQClientInstance实例时启动的,具体如下:
publicvoid start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif(null==this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr(); }// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK",this.clientId);this.serviceState = ServiceState.RUNNING;break; case START_FAILED:thrownew MQClientException("The Factory object["+this.getClientId() +"] has been created before, and failed.",null);default:break; } }}
PullMessageService继承ServiceThread,其本质是一个线程,在执行this.pullMessageService.start()时会执行其run方法,run方法的实现逻辑是:从pullRequestQueue中获取一个PullRequest,如果pullRequestQueue为空,则线程将会阻塞,直到有任务被放入,然后调用pullMessage方法进行消息拉取。
@Overridepublicvoid run() { log.info(this.getServiceName() +" service started");while(!this.isStopped()) {try{ PullRequest pullRequest =this.pullRequestQueue.take();this.pullMessage(pullRequest); }catch(InterruptedException ignored) { }catch(Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() +" service end");}
接着再来看pullMessage(final PullRequest pullRequest)方法:在这个方法中会根据consumerGroup来获取消费者的内部实现MQConsumerInner,然后将其强制转换为DefaultMQPushConsumerImpl,最后会调用DefaultMQPushConsumerImpl的pullMessage方法。在这里面我们也不难发现PullMessageService只为PUSH模式服务。
privatevoidpullMessage(finalPullRequest pullRequest){finalMQConsumerInner consumer =this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if(consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); }else{log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
接着来看DefaultMQPushConsumerImpl的pullMessage方法:
publicvoid pullMessage(finalPullRequest pullRequest) {//从pullRequest中获取其ProcessQueue,如果ProcessQueue没有被丢弃则将其lastPullTimestamp属性更新为当前时间finalProcessQueue processQueue = pullRequest.getProcessQueue();if(processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString());return; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try{/*
判断消费者的状态是否正常,如果消费状态异常则将拉取任务pullRequest延迟3s再次放入到PullMessageService的拉取任务队列中
结束本次消息拉取
*/this.makeSureStateOK(); }catch(MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return; }//如果消费者被挂起则将拉取任务pullRequest延迟1s再次放入到PullMessageService的拉取任务队列中if(this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}",this.defaultMQPushConsumer.getInstanceName(),this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return; }//消息拉取流控long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024*1024);if(cachedMessageCount >this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if((queueFlowControlTimes++ %1000) ==0) { log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); }return; }if(cachedMessageSizeInMiB >this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if((queueFlowControlTimes++ %1000) ==0) { log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); }return; }if(!this.consumeOrderly) {if(processQueue.getMaxSpan() >this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if((queueMaxSpanFlowControlTimes++ %1000) ==0) { log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes); }return; } }else{if(processQueue.isLocked()) {if(!pullRequest.isLockedFirst()) {finallong offset =this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if(brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } }else{this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest);return; } }/*
根据pullRequest中的topic信息,从topic的订阅信息中获取其对应的订阅信息,
如果订阅信息为空则将拉取任务pullRequest延迟3s再次放入到PullMessageService的拉取任务队列中并结束本次消息拉取
*/finalSubscriptionData subscriptionData =this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if(null== subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest);return; }finallong beginTimestamp = System.currentTimeMillis();//构建回调pullCallback,当broker端返回response给consumer端时会执行这个回调PullCallback pullCallback = new PullCallback() {@Overridepublicvoid onSuccess(PullResult pullResult) {if(pullResult !=null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset =Long.MAX_VALUE;if(pullResult.getMsgFoundList() ==null|| pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }else{ firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);if(DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() >0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); }else{ DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } }if(pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); }break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublicvoid run() {try{ DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(),false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); }catch(Throwable e) { log.error("executeTaskLater Exception", e); } } },10000);break;default:break; } } }@Overridepublicvoid onException(Throwable e) {if(!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; boolean commitOffsetEnable =false; long commitOffsetValue =0L;if(MessageModel.CLUSTERING ==this.defaultMQPushConsumer.getMessageModel()) {//从内存中获取pullRequest中MessageQueue的消费进度commitOffsetValue =this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if(commitOffsetValue >0) { commitOffsetEnable =true; } } String subExpression =null; boolean classFilter =false; SubscriptionData sd =this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if(sd !=null) {if(this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); }//构建消息拉取的系统标记int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable,// commitOffsettrue,// suspendsubExpression !=null,// subscriptionclassFilter// class filter);try{//与broker端交互this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); }catch(Exception e) { log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }}
pullKernelImpl方法具体如下:
publicPullResultpullKernelImpl(finalMessageQueue mq,finalString subExpression,finalString expressionType,finallongsubVersion,finallongoffset,finalintmaxNums,finalintsysFlag,finallongcommitOffset,finallongbrokerSuspendMaxTimeMillis,finallongtimeoutMillis,finalCommunicationMode communicationMode,finalPullCallback pullCallback )throwsMQClientException, RemotingException, MQBrokerException, InterruptedException{/*
根据brokerName、brokerId从mQClientFactory中获取broker的地址
在RocketMQ中相同名称的broker会有多个(主broker和从broker),但是brokerId会不一样
在每次拉取消息后会给出下次拉取消息时的建议,即从主broker上拉取还是从从broker上拉取
*/FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq),false);//如果findBrokerResult为空,则首先会更新客户端topic路由信息表//然后再次执行findBrokerAddressInSubscribe方法获取broker的地址if(null== findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq),false); }if(findBrokerResult !=null) { {// check versionif(!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {thrownewMQClientException("The broker["+ mq.getBrokerName() +", "+ findBrokerResult.getBrokerVersion() +"] does not upgrade to support for filter message by "+ expressionType,null); } }intsysFlagInner = sysFlag;if(findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); }//构建PullMessageRequestHeaderPullMessageRequestHeader requestHeader =newPullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);/*
如果消息过滤的模式是类过滤,则根据topic、broker地址找到注册在broker上的FilterServer地址,从FilterServer上拉取信息,
否则从broker上拉取信息
*/String brokerAddr = findBrokerResult.getBrokerAddr();if(PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult =this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);returnpullResult; }thrownewMQClientException("The broker["+ mq.getBrokerName() +"] not exist",null); }
进入public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) 方法中会发现客户端向broker发送的请求类型是“RequestCode.PULL_MESSAGE”,通过在代码中查找可以发现broker端处理该类型请求的是PullMessageProcessor的processRequest方法。
/**
* PullMessageProcessor
*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,this.pullMessageProcessor,this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
下面来看broker端是如何处理客户端发送的拉取消息的请求。
1.构建返回给consumer端的response并解析发送到broker端的request
2.检查broker的权限是否可读,如果不可读则将response的code设置为ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST并返回给consumer端
3.在broker端获取consumer消费组的信息,如果配置信息中consumeEnable属性为false,则将response的code设置为ResponseCode.NO_PERMISSION并返回给consumer端
4.从请求中获取消息拉取时设置的系统标记
5.在broker端获取消息topic的配置信息,如果配置信息为空则将response的code设置为ResponseCode.TOPIC_NOT_EXIST并返回给consumer端
6.检查topic的权限是否可读,如果不可读则将response的code设置为ResponseCode.NO_PERMISSION并返回给consumer端
7.检查待拉取信息的MessageQueue的queueid是否合法,如果不合法则将response的code设置为ResponseCode.SYSTEM_ERROR并返回给consumer端
8.根据topic、消息过滤表达式构建订阅消息实体,如果不是TAG模式则构建过滤数据consumerFilterData
9.构建消息过滤对象messageFilter
10.根据requestHeader中消费者的消费组名称、topic名称、MessageQueue的queueId、待拉取信息的ConsumeQueue的逻辑偏移量、最大拉取消息条数和消息过滤器来查找消息。getMessage方法中会计算出下次拉取任务的开始偏移量nextBeginOffset
11.如果获取到的getMessageResult不为空,则在response中设置nextBeginOffset、minOffset、maxOffset
12.如果从节点中包含下次拉取的偏移量则设置为下一次拉取任务的brokerId
13.根据getMessageResult的status来设置response中的code,其对应关系如下:
getMessageResult statusResponseCodeFOUNDSUCCESSMESSAGE_WAS_REMOVINGPULL_RETRY_IMMEDIATELYNO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY 、OFFSET_TOO_SMALLPULL_OFFSET_MOVEDNO_MATCHED_MESSAGEPULL_RETRY_IMMEDIATELYOFFSET_FOUND_NULL 、OFFSET_OVERFLOW_ONEPULL_NOT_FOUND
14.如果当前节点是主节点并且commitlog标记可用,则会触发更新消息消费进度
15.将response返回给consumer端
broker将response返回给consumer端时会回调PullCallBack的onSuccess或者onException,PullCallBack就是pullMessage(final PullRequest pullRequest) 方法中创建的。回调PullCallBack的方法如下:
this.remotingClient.invokeAsync(addr, request, timeoutMillis,newInvokeCallback() {@OverridepublicvoidoperationComplete(ResponseFuture responseFuture){ RemotingCommand response = responseFuture.getResponseCommand();if(response !=null) {try{ PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assertpullResult !=null; pullCallback.onSuccess(pullResult); }catch(Exception e) { pullCallback.onException(e); } }else{if(!responseFuture.isSendRequestOK()) { pullCallback.onException(newMQClientException("send request failed to "+ addr +". Request: "+ request, responseFuture.getCause())); }elseif(responseFuture.isTimeout()) { pullCallback.onException(newMQClientException("wait response from "+ addr +" timeout :"+ responseFuture.getTimeoutMillis() +"ms"+". Request: "+ request, responseFuture.getCause())); }else{ pullCallback.onException(newMQClientException("unknown reason. addr: "+ addr +", timeoutMillis: "+ timeoutMillis +". Request: "+ request, responseFuture.getCause())); } } } });
接下来看看consumer端收到broker返回的response会如何处理?
1.根据broker端返回的response将其处理成PullResult,这一过程调用的是processPullResponse方法,该方法会进行状态码转换、构建PullResult对象。
response codepull statusSUCCESSFOUNDPULL_NOT_FOUNDNO_NEW_MSGPULL_RETRY_IMMEDIATELYNO_MATCHED_MSGPULL_OFFSET_MOVEDOFFSET_ILLEGAL
2.根据pullResult更新下一次拉取的偏移量,如果pullResult中的msgFoundList为空则立刻把PullRequest放入PullMessageService的pullRequestQueue队列中
3.将拉取到的消息放入processQueue中,然后再将消息提交到ConsumeMessageQueue(ConsumeMessageQueue分为两种,分别是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService)中用于consumer消费
4.如果pullInterval大于0,则将pullRequest延迟pullInterval毫秒后放入PullMessageService的pullRequestQueue队列中,这样形成持续拉取消息流程
最后,总结下消息拉取流程,该流程总体上分为三步:
1.consumer端封装消息拉取请求PullRequest并将其发送给broker
2.broker根据请求查找并返回消息给consumer端
3.consumer端将返回的消息消费
那么consumer端获取到消息后如何进行消费,下篇文章再分析。