(十)ack确认

2021-07-22  本文已影响0人  guessguess

在前面看了消费者并发消费的源码。得知消费者在消费消息失败之后,会尝试将消息sendback, 当sendback失败之后再进行重试消费。但是对于sendback这个机制还是不太清楚。所以看了一下sendback的源码。

首先还是定位一下入口。

public class MQClientAPIImpl {
    public void consumerSendMessageBack(
        final String addr,
        final MessageExt msg,
        final String consumerGroup,
        final int delayLevel,
        final long timeoutMillis,
        final int maxConsumeRetryTimes
    ) throws RemotingException, MQBrokerException, InterruptedException {
        ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

        requestHeader.setGroup(consumerGroup);
        requestHeader.setOriginTopic(msg.getTopic());
        requestHeader.setOffset(msg.getCommitLogOffset());
        requestHeader.setDelayLevel(delayLevel);
        requestHeader.setOriginMsgId(msg.getMsgId());
        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }
}

从请求头上来看,设置了消费者对应的消费组,以及原有的topic,以及消息的偏移量,以及延迟级别,以及原有的消息id,还有设置最大重试次数。
RequestCode.CONSUMER_SEND_MSG_BACK = 36.

那么Broker是如何处理sendback的消息
还是先来看看源码,首先找到入口。在rocketmq里面,NettyRequestProcessor为专门用于处理请求的一个channelHandler.有不同的请求处理器针对不同的请求。

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
            省略无关代码。
        }
    }
}

broker是如何处理返回的消息

从上面代码看,无疑是asyncConsumerSendMsgBack方法。
因此下面来看看实现的细节

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
                (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        省略部分代码。
        根据消费组的group,找到对应的SubscriptionGroupConfig。
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
        如果为Null,直接返回响应
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return CompletableFuture.completedFuture(response);
        }
        如果没有写权限,直接返回响应
        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return CompletableFuture.completedFuture(response);
        }
        如果可重试的队列<=0,直接返回,说明不可以进行重试。
        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return CompletableFuture.completedFuture(response);
        }
        生成新的topic,%RETRY% + 消费组的group
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        随机找一个重试队列
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }
        创建topic,并且将新的topic配置同步到注册中心,用于生成新的路由数据,这样子消费者才找到的消息。
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
            newTopic,
            subscriptionGroupConfig.getRetryQueueNums(),
            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        如果top创建失败,或者不存在,则返回响应
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return CompletableFuture.completedFuture(response);
        }
        如果没有写的权限,则返回响应
        if (!PermName.isWriteable(topicConfig.getPerm())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
            return CompletableFuture.completedFuture(response);
        }
        根据偏移量找回以前的消息
        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
       如果消息不存在的话,则直接返回响应
        if (null == msgExt) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return CompletableFuture.completedFuture(response);
        }
        如果该消息没有被重试过,不会有对应的重试主题属性,需要设置
        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (null == retryTopic) {
            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
        }
        msgExt.setWaitStoreMsgOK(false);
        延迟级别
        int delayLevel = requestHeader.getDelayLevel();
      
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        如果消息的重试次数以及大于最大的重试次数,或者delayLevel<0.
        则将消息放入死信队列
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
            || delayLevel < 0) {
            生成新的主题,%DLQ%+消费组的group
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            随机生成队列id
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
            在broker中创建新的topic,并且同步到注册中心
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);
            主题不存在,或者创建失败,则直接返回响应
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return CompletableFuture.completedFuture(response);
            }
        } else {
            if (0 == delayLevel) {
                设置延迟级别,在保存之前会调整topic改为SCHEDULE_TOPIC_XXXX。暂时先不管这块是如何实现的
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }
            msgExt.setDelayTimeLevel(delayLevel);
        }
-----------------------------------------------------------------------------------------------------------------------------------------------
        生成新消息,绝大多数属性都是原有消息的。
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        //设置新主题
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
--------------------------------------------------------------------------------------------------------------------------------------------
        对新生成的消息进行存储。存储后返回响应
        CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        return putMessageResult.thenApply((r) -> {
            if (r != null) {
                switch (r.getPutMessageStatus()) {
                    case PUT_OK:
                        String backTopic = msgExt.getTopic();
                        String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                        if (correctTopic != null) {
                            backTopic = correctTopic;
                        }
                        this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                        response.setCode(ResponseCode.SUCCESS);
                        response.setRemark(null);
                        return response;
                    default:
                        break;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(r.getPutMessageStatus().name());
                return response;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("putMessageResult is null");
            return response;
        });
    }
}

从上面的代码来看,消息如果消费失败,那么在mq中,是不会将一个消息进行重复消费的(除非broker挂了,消费者无法Sendback,则只能对一个消息多消费几次),其实是采取不断生成新消息的方式来(大多数内容采取复制,新消息有自己的msgId,以及不同的topic,但是会将一些原有的信息保留,比如原来的topic),来进行重复消费。

topic最后分为几类。

1.自定义topic

这里没太特殊,其实就是我们自定义的topic

2.重试主题

%RETRT%前缀 + GROUP 比如一个消费者的GROUP为group1。那么重试主题就为%RETRY%group1
如果消息消费失败,且允许重试消费的话,就会复制一个新消息,保存到%RETRY%GROUP中。
消费者一开始会默认创建重试的主题的订阅信息,用来保证对重试消息的消费。
源码如下

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    集群模式下,需要设置重试主题
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
}
3.调度主题

在重复次数没超的情况下,会根据重复次数设置延时级别,如果延时级别越高的话,那么延时的时间则越长,放入SCHEDULE_TOPIC_XXXX队列。在保存之前,根据消息的延时级别,判断是否要修改主题。

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        msg.setStoreTimestamp(System.currentTimeMillis());
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                修改主题为"SCHEDULE_TOPIC_XXXX"
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                备份原有的topic以及队列id,用于后续转发
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                更新主题
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            return putMessageResult;
        });
    }
}

SCHEDULE_TOPIC_XXXX是broker内部的一个主题,对于消费者生产者而言都是不可见的,仅仅用于消息的延时生成,会有定时器去将延时消息消费,将延时消息转发到原来对应的队列。
举个例子,某消息被消费失败了,随后进行重试消费,topic改为%RETRY%group1,随后发现需要延时,在保存消息时候,则将原来的主题(%RETRY%group1)以及队列id等相关信息保存,将消息的主题改为SCHEDULE_TOPIC_XXXX。最后进行保存。
而这个内部的主题SCHEDULE_TOPIC_XXXX是如何被消费的,其实就是有定时器去把主题里面的消息读出来,然后根据那些消息原有的消息以及队列,将消息放到对应的队列中去。
说白了这个叫SCHEDULE_TOPIC_XXXX的主题,其实就是用于将重试的消息延迟放入到对应的队列中。而它的作用相当于,先保存,后转发。

4.死信主题

如果重复次数超过配置的最大重试次数,则进入死信队列。
%DLQ%+消费组的group,死信队列的主题为%DLQ%+消费者的group。
这个是需要人工去干预的。

上一篇 下一篇

猜你喜欢

热点阅读