rocketMq理论与实践

RocketMq 堆积查询

2020-05-12  本文已影响0人  晴天哥_王志

系列

开篇

例子

#Topic                            #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%consumer_group_test        broker-a                          0     0                     0                     N/A                  0                     N/A
TopicTest                         broker-a                          0     1155                  1004                  N/A                  151                   2020-05-08 22:53:52
TopicTest                         broker-a                          1     1159                  1007                  N/A                  152                   2020-05-08 22:54:58
TopicTest                         broker-a                          2     1124                  982                   N/A                  142                   2020-05-08 22:53:54
TopicTest                         broker-a                          3     1141                  987                   N/A                  154                   2020-05-08 22:54:58
TopicTest                         broker-a                          4     1126                  985                   N/A                  141                   2020-05-08 22:53:17
TopicTest                         broker-a                          5     1119                  980                   N/A                  139                   2020-05-08 22:53:17
TopicTest                         broker-a                          6     1127                  983                   N/A                  144                   2020-05-08 22:53:52
TopicTest                         broker-a                          7     1131                  987                   N/A                  144                   2020-05-08 22:53:52

AdminBrokerProcessor

public class AdminBrokerProcessor implements NettyRequestProcessor {

    private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);

        ConsumeStats consumeStats = new ConsumeStats();

        Set<String> topics = new HashSet<String>();
        // 1、先获取这个消费分组订阅的topic信息
        if (UtilAll.isBlank(requestHeader.getTopic())) {
            topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
        } else {
            topics.add(requestHeader.getTopic());
        }
        // 2、根据topic维度进行统计
        for (String topic : topics) {
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
            if (null == topicConfig) {
                log.warn("consumeStats, topic config not exist, {}", topic);
                continue;
            }

            {
                SubscriptionData findSubscriptionData =
                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);

                if (null == findSubscriptionData
                    && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                    log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                    continue;
                }
            }
            // 3、根据topic下可读队列进行统计
            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                MessageQueue mq = new MessageQueue();
                mq.setTopic(topic);
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setQueueId(i);

                OffsetWrapper offsetWrapper = new OffsetWrapper();

                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
                if (brokerOffset < 0)
                    brokerOffset = 0;

                long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
                    requestHeader.getConsumerGroup(),
                    topic,
                    i);
                if (consumerOffset < 0)
                    consumerOffset = 0;

                offsetWrapper.setBrokerOffset(brokerOffset);
                offsetWrapper.setConsumerOffset(consumerOffset);

                long timeOffset = consumerOffset - 1;
                if (timeOffset >= 0) {
                    long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
                    if (lastTimestamp > 0) {
                        offsetWrapper.setLastTimestamp(lastTimestamp);
                    }
                }
                // 4、按照MessageQueue维度进行统计
                consumeStats.getOffsetTable().put(mq, offsetWrapper);
            }

            double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic);

            consumeTps += consumeStats.getConsumeTps();
            consumeStats.setConsumeTps(consumeTps);
        }

        byte[] body = consumeStats.encode();
        response.setBody(body);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读