rocketMq理论与实践

RocketMq 死信队列

2020-06-25  本文已影响0人  晴天哥_王志

系列

开篇

    // 读权限为4
    public static final int PERM_READ = 0x1 << 2;
    // 写权限为2
    public static final int PERM_WRITE = 0x1 << 1;
    // 继承权限为1
    public static final int PERM_INHERIT = 0x1 << 0;

死信队列Topic配置

{
    "topicConfigTable":{
        "%DLQ%quickstart_consumer_dlq":{
            "order":false,
            "perm":2,
            "readQueueNums":1,
            "topicFilterType":"SINGLE_TAG",
            "topicName":"%DLQ%quickstart_consumer_dlq",
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    }
}
命令格式
usage: mqadmin updateTopicPerm [-b <arg>] [-c <arg>] [-h] [-n <arg>] -p <arg> -t <arg>
 -b,--brokerAddr <arg>    create topic to which broker
 -c,--clusterName <arg>   create topic to which cluster
 -h,--help                Print help
 -n,--namesrvAddr <arg>   Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -p,--perm <arg>          set topic's permission(2|4|6), intro[2:W; 4:R; 6:RW]
 -t,--topic <arg>         topic name

命令执行
./mqadmin updateTopicPerm -c DefaultCluster  -n localhost:9876 -p 6 -t %DLQ%quickstart_consumer_dlq

命令执行过程
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update topic perm from 2 to 6 in 192.168.0.10:10911 success.
{
    "topicConfigTable":{
        "%RETRY%quickstart_consumer_dlq":{
            "order":false,
            "perm":6,
            "readQueueNums":1,
            "topicFilterType":"SINGLE_TAG",
            "topicName":"%RETRY%quickstart_consumer_dlq",
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    }
}

死信队列消息投递过程

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
        throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
        
        // 重试队列的Topic,RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }

        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
            newTopic,
            subscriptionGroupConfig.getRetryQueueNums(),
            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);

        // 省略相关代码

        // 首先判断最大的重新消费次数
        int delayLevel = requestHeader.getDelayLevel();
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            // 当前版本由consumer端的MaxReconsumeTimes指定
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        // 当超过最大的重新消费次数后选择死信队列DLQTopic
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
            || delayLevel < 0) {

            // DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
            // 创建消费分组对应的死信队列的Topic
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE,// 死信队列只有写权限
                0
            );

        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }

            msgExt.setDelayTimeLevel(delayLevel);
        }

        // 重新生成消息体写入到新的topic当中,如果死信队列就重新写入commitLog和consumeQueue
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        if (putMessageResult != null) {
            switch (putMessageResult.getPutMessageStatus()) {
                case PUT_OK:
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }

                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

                    response.setCode(ResponseCode.SUCCESS);
                    response.setRemark(null);

                    return response;
                default:
                    break;
            }

            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(putMessageResult.getPutMessageStatus().name());
            return response;
        }

        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("putMessageResult is null");
        return response;
    }
}

查询死信队列状态

./mqadmin topicStatus -n localhost:9876 -t %DLQ%quickstart_consumer_dlq
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
broker-a                          0     0                     1999                    2020-06-25 08:23:28,277

根据offset查询消息内容

./mqadmin queryMsgByOffset -b broker-a -n localhost:9876 -i 0 -o 1 -t %DLQ%quickstart_consumer_dlq
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
OffsetID:            C0A8000A00002A9F00000000007E6157
Topic:               %DLQ%quickstart_consumer_dlq
Tags:                [TagA]
Keys:                [null]
Queue ID:            0
Queue Offset:        1
CommitLog Offset:    8282455
Reconsume Times:     2
Born Timestamp:      2020-05-16 22:28:54,301
Store Timestamp:     2020-06-25 08:23:26,974
Born Host:           192.168.0.8:57604
Store Host:          192.168.0.10:10911
System Flag:         0
Properties:          {MIN_OFFSET=0, REAL_TOPIC=%DLQ%quickstart_consumer_dlq, ORIGIN_MESSAGE_ID=C0A8000800002A9F0000000000000000, RETRY_TOPIC=TopicTest, MAX_OFFSET=1999, UNIQ_KEY=C0A80008723518B4AAC25212599B0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=3, TAGS=TagA, REAL_QID=0}
Message Body Path:   /tmp/rocketmq/msgbodys/C0A80008723518B4AAC25212599B0000

实际查看内容
cat /tmp/rocketmq/msgbodys/C0A80008723518B4AAC25212599B0000
Hello RocketMQ 1

consumer消费死信队列

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer_dlq");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅死信队列
        consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");
        consumer.setMaxReconsumeTimes(1);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
上一篇下一篇

猜你喜欢

热点阅读