flutter

RocketMQ系列4-长轮询模式实现推送消息

2021-02-22  本文已影响0人  过去今天和未来

1、Consumer消费消息两种模式比较

      RocketMQ提供两种方式进行消费消息pull vs push,这也是很多涉及到Client和Server之间的交互模型

1.1 pull模式

     主要是Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数。

public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
         MessageQueue mq = new MessageQueue();
         mq.setQueueId(0);
         mq.setTopic("mq-test");
         mq.setBrokerName("broker-a");
        long offset = 26;
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
        System.out.printf("%s%n", pullResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
    consumer.shutdown();
}

1.2 push模式

     Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费。

public static void main(String[] args) throws InterruptedException, MQClientException {
    // 构造方法
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("mq-test", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
} 

     在现实中更多根据实际场景进行选择,大多场景更喜欢使用Push模式进行消费消息,那么Push是真正Broker端发送给Consumer的吗?答案肯定不是的,现实场景会有成百上千的Consumer对应的消息队列,Broker不会主动发送消息请求的。所以消息队列如何进行设计消息推送的呢?答案是长轮询。

2、RocketMQ如何实现长轮询

     长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据。consumer收到响应后根据状态判断是否有消息。

2.1 Consumer端处理

2.1.1 Consumer启动

     首先是Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下。

MQClientInstance#start
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 前后省略
                this.pullMessageService.start();
                     break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

2.1.2 Consumer请求

     可以看到启动后Consumer则不断轮询 Broker 获取消息。 Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息。

PullMessageService# 客户端发起拉取消息请求
public void run() {
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();  // 将返回结果添加到Queue
            this.pullMessage(pullRequest);
        }
}

Consumer处理的逻辑包括:

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } 
}
DefaultMQPushConsumerImpl#pullMessage
try {
    // 真正拉取消息的地方,首先获取Broker信息
    this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression,subscriptionData.getExpressionType(),
        subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(),
        sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC, pullCallback);
}

2.1.3 Consumer响应处理

     PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求。

DefaultMQPushConsumerImpl#pullMessage
 // 当拉取的请求有响应时
PullCallback pullCallback = new PullCallback() {
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                   
                    // 统计消费组下消息主题拉取耗时
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                        // 提交拉取到的消息到消息处理队列
                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        // 提交消费请求  ConsumeRequest#run 拉取消息响应listener.consumeMessage最终返回给客户端,同时也包括执行前和执行后逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            // 消费者拉取完消息后,立马就有开始下一个拉取任务
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }
                    if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {
                    }
                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                    //消费者没有消息,立马就有开始下一个拉取任务
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                default:
                    break;
            }
        }
    }

2.2 Broker收到Consumer请求

可以思考一下Broker端需要面临哪些设计?

2.2.1 没有收到消息?如何hold请求

     如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。
     hold的请求存放在 ConcurrentHashMap<String, ManyPullRequest> 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List<PullRequest> 可以理解对应的多个相同的topic客户端。

1. Consumer发起拉取消息请求,Broker端无消息
Broker端
PullMessageProcessor#processRequest
// broker端没有拉取到消息
case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }
    
// 先将拉取请求放在this.pullRequestTable中,进行挂载起来
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}

2.2.2 hold请求超时处理

     Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测。

PullRequestHoldService  轮训遍历是否阻塞请求快到超时时间,进行唤醒    
public void run() {
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) { }
        }  
    }
}
// 
private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                }  
            }
        }
}

2.2.3 服务端收到Producer消息

     Producer写入消息,Broker端有消息通知Consumer端。
     当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest。

DefaultMessageStore#doReput
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
NotifyMessageArrivingListener#arriving
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
}

PullRequestHoldService
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();  // 克隆挂起的请求列表
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));                       
                       // 从挂起的请求列表中找到当前新的消息的匹配的,匹配到了则唤起请求立即给客户端返回。
                       if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                            }  
                            continue;
                        }
                     }
                    // 如果列表中挂起的请求快超时了则立即唤醒返回给客户端
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                        }  
                        continue;
                    }
                    replayList.add(request);
                }
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
}

3、总结

     当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户。
     如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE。


image.jpeg
上一篇下一篇

猜你喜欢

热点阅读