RocketMQ源码之消息轨迹

2019-10-01  本文已影响0人  搬运工来架构

一、消息轨迹数据结构

二、发送消息轨迹流程

三、消息轨迹存储

四、消费消息轨迹流程

五、总结

消息轨迹在不少情况是很重要的,比如消息到底有没有发送出去,消息存储在哪个Broker,消息被哪些消费者消费、消费时间、消费耗时等等。对于我们排查消息问题还是非常重要的。

RocketMQ 4.4.0版本开始支持消息轨迹,目前(2019-09-08)最新release版本是4.5.2。

下图是使用RocketMQ-console查看消息轨迹的数据:

下面就来看RocketMQ是如何实现消息轨迹这个新特性:

一、消息轨迹数据结构

消息轨迹,必然需要对象实体存储相关数据的数据结构,比如生产者IP,发送时间、消息信息等等。从RocketMQ源码里找到其数据结构TraceBean:

public class TraceBean {

    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());

    private String topic = ""; // 消息topic

    private String msgId = ""; // 消息Id

    private String offsetMsgId = ""; // 消息偏移量

    private String tags = ""; // Tags用于过滤

    private String keys = ""; // Keys用于索引

    private String storeHost = LOCAL_ADDRESS; // 存储地址,默认当前机器

    private String clientHost = LOCAL_ADDRESS; // 客户端地址,在生产端就是生产者地址,默认当前机器

    private long storeTime; // 存储时间

    private int retryTimes; // 重试次数

    private int bodyLength; // 消息体长度

    private MessageType msgType; // 消息类型

    ...

}

消息轨迹上下文TraceContext:

public class TraceContext implements Comparable<TraceContext> {

    private TraceType traceType; // 追踪类型

    private long timeStamp = System.currentTimeMillis(); // 当前时间戳

    private String regionId = ""; // Broker所属Region的Id标识

    private String regionName = ""; // Broker所属Region的名称

    private String groupName = "";  // 分组名

    private int costTime = 0; // 耗时

    private boolean isSuccess = true; // 是否成功标识

    private String requestId = MessageClientIDSetter.createUniqID(); // 请求Id

    private int contextCode = 0; // 消费结果返回状态码

    private List<TraceBean> traceBeans; // 轨迹数据集合

消息追踪类型:

public enum TraceType {

    Pub, // 消息发送

    SubBefore, // 消息拉取到客户端,执行业务定义的消费逻辑之前

    SubAfter, // 消费后

}

从上面的数据结构得知,消息轨迹的基本数据结构是TraceBean,定义了关注的数据属性。TraceContext是为了上下文进行处理时进行承载的主题。

二、发送消息轨迹流程

生产者demo:

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        // 指定生产者组testMessageTrace,第二个参数enableMsgTrace是启用消息轨迹功能

        DefaultMQProducer producer = new DefaultMQProducer("testMessageTrace", true);

        producer.setNamesrvAddr("localhost:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {

            try {

                Message msg = new Message("TopicTest" /* Topic */,

                    "TagA" /* Tag */,

                    "VV",

                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */

                );

                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);

            } catch (Exception e) {

                e.printStackTrace();

                Thread.sleep(1000);

            }

        }

        producer.shutdown();

    }

}

由上面的生产者demo,接下来看DefaultMQProducer构造方法

//DefaultMQProducer.java

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {

    this(null, producerGroup, null, enableMsgTrace, null);

}

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,

    boolean enableMsgTrace, final String customizedTraceTopic) {

    this.namespace = namespace;

    this.producerGroup = producerGroup;

    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

    //是否开启消息轨迹

    if (enableMsgTrace) {

        try {

            // 构造异步追踪消息处理分发器

            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);

            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());

            traceDispatcher = dispatcher;

            // 注入钩子,SendMessageTraceHookImpl实现了消息生产者发送消息的钩子实现

            this.getDefaultMQProducerImpl().registerSendMessageHook(

                new SendMessageTraceHookImpl(traceDispatcher));

        } catch (Throwable e) {

            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");

        }

    }

}

@Override

public void start() throws MQClientException {

    this.setProducerGroup(withNamespace(this.producerGroup));

    // 启动生产者

    this.defaultMQProducerImpl.start();

    // 如果消息追踪分发器不为空,则启动

    if (null != traceDispatcher) {

        try {

            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());

        } catch (MQClientException e) {

            log.warn("trace dispatcher start failed ", e);

        }

    }

}

生产者轨迹消息初始化:

异步追踪消息处理分发器接口定义:

public interface TraceDispatcher {

    // 初始化异步转换数据模块

    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

    // 追加数据

    boolean append(Object ctx);

    // 数据写入flush

    void flush() throws IOException;

    // 关闭钩子

    void shutdown();

}

异步追踪消息处理分发器接口实现:

public class AsyncTraceDispatcher implements TraceDispatcher {

    private final int queueSize; // 队列大小,默认2048

    private final int batchSize; // 批次大小,默认100

    private final int maxMsgSize; // 最大消息大小,默认128000=128k

    private final DefaultMQProducer traceProducer; // 生产者

    private final ThreadPoolExecutor traceExecutor; // 线程池

    private AtomicLong discardCount; // 最新log丢弃数量

    private Thread worker; // 工作线程

    private ArrayBlockingQueue<TraceContext> traceContextQueue; // 队列:存放消息追踪上下文TraceContext

    private ArrayBlockingQueue<Runnable> appenderQueue; // 追加队列?

    private volatile Thread shutDownHook; // 关闭构造线程

    private volatile boolean stopped = false; // 是否停止

    private DefaultMQProducerImpl hostProducer; // 生产者实现类

    private DefaultMQPushConsumerImpl hostConsumer; // 消费者实现类

    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 存放发送到哪个Queue的数据

    private String dispatcherId = UUID.randomUUID().toString(); // 分发处理器ID,随机UUID生成

    private String traceTopicName; // 跟踪主题名称

    private AtomicBoolean isStarted = new AtomicBoolean(false); // 是否启动

    private AccessChannel accessChannel = AccessChannel.LOCAL; // 可以指定区分是Cloud或者Local方式

    // 构造初始化参数

    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {

        this.queueSize = 2048;

        this.batchSize = 100;

        this.maxMsgSize = 128000;

        this.discardCount = new AtomicLong(0L);

        this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);

        this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);

        // 如果没有指定消息轨迹主题,那么默认是RMQ_SYS_TRACE_TOPIC

        if (!UtilAll.isBlank(traceTopicName)) {

            this.traceTopicName = traceTopicName;

        } else {

            this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;

        }

        // 构造线程池,最小线程数10,最大20

        this.traceExecutor = new ThreadPoolExecutor(//

            10, //

            20, //

            1000 * 60, //

            TimeUnit.MILLISECONDS, //

            this.appenderQueue, //

            new ThreadFactoryImpl("MQTraceSendThread_"));

        // 构造追踪生产者

        traceProducer = getAndCreateTraceProducer(rpcHook);

    }

    // 异步消息追踪分发器启动,由前面知道生产者启动时会判断分发器不为空就启动当前方法

    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {

        // 进行CAS防止并发启动多次,启动traceProducer,将消息发往Broker,也就是相当于正常的生产者发送消息

        if (isStarted.compareAndSet(false, true)) {

            traceProducer.setNamesrvAddr(nameSrvAddr);

            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);

            traceProducer.start();

        }

        this.accessChannel = accessChannel;

        // 启动工作线程,并且设置为守护线程。AsyncRunnable是工作线程主要的逻辑处理实现

        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);

        this.worker.setDaemon(true);

        this.worker.start();

        // 注入ShutDown钩子,分发器关时需要进行相关的清理工作

        this.registerShutDownHook();

    }

    // 构建追踪Producer

    private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {

        DefaultMQProducer traceProducerInstance = this.traceProducer;

        if (traceProducerInstance == null) {

            traceProducerInstance = new DefaultMQProducer(rpcHook);

            // 生产者组 _INNER_TRACE_PRODUCER

            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);

            // 设置发送超时5秒

            traceProducerInstance.setSendMsgTimeout(5000);

            traceProducerInstance.setVipChannelEnabled(false);

            // 最大消息大小是 128K-1k = 118k,不知道RocketMQ的开发者为啥是这样设计的?

            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);

        }

        return traceProducerInstance;

    }

    // 将追踪上下文存放到队列里

    @Override

    public boolean append(final Object ctx) {

        boolean result = traceContextQueue.offer((TraceContext) ctx);

        if (!result) {

            log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);

        }

        return result;

    }

    // flush操作,如果队列还有未处理完成的,那么让当前线程sleep 1毫秒,直到处理完毕。

    @Override

    public void flush() throws IOException {

        // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.

        long end = System.currentTimeMillis() + 500;

        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {

            try {

                Thread.sleep(1);

            } catch (InterruptedException e) {

                break;

            }

        }

        log.info("------end trace send " + traceContextQueue.size() + "  " + appenderQueue.size());

    }

    // 关闭操作:停止线程池,停止追踪Producer,移除SHUTDOWN钩子

    @Override

    public void shutdown() {

        this.stopped = true;

        this.traceExecutor.shutdown();

        if (isStarted.get()) {

            traceProducer.shutdown();

        }

        this.removeShutdownHook();

    }

    // 注册SHUTDOWN钩子:调用flush方法,主要是等待队列的数据处理完毕。

    public void registerShutDownHook() {

        if (shutDownHook == null) {

            shutDownHook = new Thread(new Runnable() {

                private volatile boolean hasShutdown = false;

                @Override

                public void run() {

                    synchronized (this) {

                        if (!this.hasShutdown) {

                            try {

                                flush();

                            } catch (IOException e) {

                                log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");

                            }

                        }

                    }

                }

            }, "ShutdownHookMQTrace");

            Runtime.getRuntime().addShutdownHook(shutDownHook);

        }

    }

    public void removeShutdownHook() {

        if (shutDownHook != null) {

            Runtime.getRuntime().removeShutdownHook(shutDownHook);

        }

    }

    // 异步线程

    class AsyncRunnable implements Runnable {

        private boolean stopped;

        @Override

        public void run() {

            while (!stopped) {

                // 分批从队列traceContextQueue弹出数据

                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);

                for (int i = 0; i < batchSize; i++) {

                    TraceContext context = null;

                    try {

                        //get trace data element from blocking Queue — traceContextQueue

                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);

                    } catch (InterruptedException e) {

                    }

                    if (context != null) {

                        contexts.add(context);

                    } else {

                        break;

                    }

                }

                if (contexts.size() > 0) {

                    // 将上下文数据分装成AsyncAppenderRequest来处理

                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);

                    // 提交到线程池进行处理

                    traceExecutor.submit(request);

                } else if (AsyncTraceDispatcher.this.stopped) {

                    this.stopped = true;

                }

            }

        }

    }

    // 异步发送轨迹消息线程

    class AsyncAppenderRequest implements Runnable {

        List<TraceContext> contextList;

        public AsyncAppenderRequest(final List<TraceContext> contextList) {

            if (contextList != null) {

                this.contextList = contextList;

            } else {

                this.contextList = new ArrayList<TraceContext>(1);

            }

        }

        @Override

        public void run() {

            sendTraceData(contextList);

        }

        public void sendTraceData(List<TraceContext> contextList) {

            Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();

            for (TraceContext context : contextList) {

                if (context.getTraceBeans().isEmpty()) {

                    continue;

                }

                // Topic value corresponding to original message entity content

                String topic = context.getTraceBeans().get(0).getTopic();

                String regionId = context.getRegionId();

                // Use  original message entity's topic as key

                String key = topic;

                // regionId不为空,将其追加到key

                if (!StringUtils.isBlank(regionId)) {

                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;

                }

                List<TraceTransferBean> transBeanList = transBeanMap.get(key);

                if (transBeanList == null) {

                    transBeanList = new ArrayList<TraceTransferBean>();

                    transBeanMap.put(key, transBeanList);

                }

                // 将上下文数据进行转化组装

                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);

                transBeanList.add(traceData);

            }

            for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {

                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));

                String dataTopic = entry.getKey();

                String regionId = null;

                if (key.length > 1) {

                    dataTopic = key[0];

                    regionId = key[1];

                }

                flushData(entry.getValue(), dataTopic, regionId);

            }

        }

        /**

        * 批量发数据

        */

        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {

            if (transBeanList.size() == 0) {

                return;

            }

            // Temporary buffer

            StringBuilder buffer = new StringBuilder(1024);

            int count = 0;

            Set<String> keySet = new HashSet<String>();

            for (TraceTransferBean bean : transBeanList) {

                // Keyset of message trace includes msgId of or original message

                keySet.addAll(bean.getTransKey());

                buffer.append(bean.getTransData());

                count++;

                // Ensure that the size of the package should not exceed the upper limit.

                if (buffer.length() >= traceProducer.getMaxMessageSize()) {

                    sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);

                    // Clear temporary buffer after finishing

                    buffer.delete(0, buffer.length());

                    keySet.clear();

                    count = 0;

                }

            }

            // 可能还剩下最后一条数据,需要将其发送

            if (count > 0) {

                sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);

            }

            transBeanList.clear();

        }

        // 发送轨迹数据

        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {

            String traceTopic = traceTopicName;

            if (AccessChannel.CLOUD == accessChannel) {

                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;

            }

            final Message message = new Message(traceTopic, data.getBytes());

            // Keyset of message trace includes msgId of or original message

            message.setKeys(keySet);

            try {

                Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);

                // 发送回调,关注发送异常,打印异常日志

                SendCallback callback = new SendCallback() {

                    @Override

                    public void onSuccess(SendResult sendResult) {

                    }

                    @Override

                    public void onException(Throwable e) {

                        log.info("send trace data ,the traceData is " + data);

                    }

                };

                if (traceBrokerSet.isEmpty()) {

                    // 没有轨迹Broker,则直接使用默认方式发送

                    traceProducer.send(message, callback, 5000);

                } else {

                    // 根据轨迹Broker将其发送到对应的MessageQueue

                    traceProducer.send(message, new MessageQueueSelector() {

                        @Override

                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                            Set<String> brokerSet = (Set<String>) arg;

                            List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();

                            for (MessageQueue queue : mqs) {

                                if (brokerSet.contains(queue.getBrokerName())) {

                                    filterMqs.add(queue);

                                }

                            }

                            int index = sendWhichQueue.getAndIncrement();

                            int pos = Math.abs(index) % filterMqs.size();

                            if (pos < 0) {

                                pos = 0;

                            }

                            return filterMqs.get(pos);

                        }

                    }, traceBrokerSet, callback);

                }

            } catch (Exception e) {

                log.info("send trace data,the traceData is" + data);

            }

        }

        // 获得消息队列Broker集合:获得topic路由信息,并且将Broker集合返回

        private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {

            Set<String> brokerSet = new HashSet<String>();

            TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);

            if (null == topicPublishInfo || !topicPublishInfo.ok()) {

                producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());

                producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);

                topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);

            }

            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {

                for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {

                    brokerSet.add(queue.getBrokerName());

                }

            }

            return brokerSet;

        }

    }

}

生产者消息轨迹启动时序图

生产者轨迹消息发送使用了异步追踪分发器,将轨迹消息上下文TraceContext(包含轨迹数据),扔到分发器里的队列,启动分发器时也会启动发送轨迹消息线程,一直从队列里poll出数据进行发送。

发送消息钩子接口:

public interface SendMessageHook {

    String hookName();// 钩子名称

    void sendMessageBefore(final SendMessageContext context);// 发送消息之前处理

    void sendMessageAfter(final SendMessageContext context);// 发送消息之后处理

}

发送消息上下文Context:

public class SendMessageContext {

    private String producerGroup; // 生产者组

    private Message message; // 消息

    private MessageQueue mq; // 消息队列

    private String brokerAddr; // Broker地址

    private String bornHost; // 发生主机,即客户端

    private CommunicationMode communicationMode; // 通信模式

    private SendResult sendResult; // 发送结果

    private Exception exception; // 异常

    private Object mqTraceContext; // 轨迹上下文

    private Map<String, String> props; // 额外属性

    private DefaultMQProducerImpl producer; // 生产者实现类

    private MessageType msgType = MessageType.Normal_Msg; // 消息类型

    private String namespace; // 命名空间

    ...

}

发送消息钩子实现:

public class SendMessageTraceHookImpl implements SendMessageHook {

    private TraceDispatcher localDispatcher;

    public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {

        this.localDispatcher = localDispatcher;

    }

    @Override

    public String hookName() {

        return "SendMessageTraceHook";

    }

    @Override

    public void sendMessageBefore(SendMessageContext context) {

        //如果是轨迹消息本身,那么直接返回无需处理

        if (context == null ||

            context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {

            return;

        }

        //钩子轨迹上下文TraceContext

        TraceContext tuxeContext = new TraceContext();

        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));

        context.setMqTraceContext(tuxeContext);

        tuxeContext.setTraceType(TraceType.Pub);

        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));

        //构建轨迹数据实体

        TraceBean traceBean = new TraceBean();

        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));

        traceBean.setTags(context.getMessage().getTags());

        traceBean.setKeys(context.getMessage().getKeys());

        traceBean.setStoreHost(context.getBrokerAddr());

        traceBean.setBodyLength(context.getMessage().getBody().length);

        traceBean.setMsgType(context.getMsgType());

        traceBean.setSendTime(System.currentTimeMillis());

        tuxeContext.getTraceBeans().add(traceBean);

    }

    @Override

    public void sendMessageAfter(SendMessageContext context) {

        //如果是轨迹消息本身,那么直接返回无需处理

        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())

            || context.getMqTraceContext() == null) {

            return;

        }

        if (context.getSendResult() == null) {

            return;

        }

        if (context.getSendResult().getRegionId() == null

            || !context.getSendResult().isTraceOn()) {

            // if switch is false,skip it

            return;

        }

        // 通过Context获取sendMessageBefore生成的轨迹上下文TraceContext

        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();

        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);

        // 计算耗时

        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());

        tuxeContext.setCostTime(costTime);

        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {

            tuxeContext.setSuccess(true);

        } else {

            tuxeContext.setSuccess(false);

        }

        tuxeContext.setRegionId(context.getSendResult().getRegionId());

        traceBean.setMsgId(context.getSendResult().getMsgId());

        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());

        // 设置存储时间storeTime,这个数值是估值=时间戳+耗时/2

        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);

        // 将上下文数据追加到分发器中的队列里

        localDispatcher.append(tuxeContext);

    }

通过上面的钩子接口,那我们知道发生消息钩子有before、after两个方法,进行将消息发送出去之后进行前后的处理逻辑,计算耗时、获得发送结果等操作。

从上面我们大致了解了生产者发送消息钩子的实现机制,但是似乎我们还没看到在哪里调用钩子进行相应的代码逻辑处理,下面继续看源码:

由Producer消息发送流程得知,我们这里只要关注sendKernelImpl这个核心方法即可。

// DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,

                                  final MessageQueue mq,

                                  final CommunicationMode communicationMode,

                                  final SendCallback sendCallback,

                                  final TopicPublishInfo topicPublishInfo,

                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

            ...

            // 判断是否有发送消息钩子,是则进行发送消息钩子Before处理

            if (this.hasSendMessageHook()) {

                context = new SendMessageContext();

                context.setProducer(this);

                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());

                context.setCommunicationMode(communicationMode);

                context.setBornHost(this.defaultMQProducer.getClientIP());

                context.setBrokerAddr(brokerAddr);

                context.setMessage(msg);

                context.setMq(mq);

                context.setNamespace(this.defaultMQProducer.getNamespace());

                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

                if (isTrans != null && isTrans.equals("true")) {

                    context.setMsgType(MessageType.Trans_Msg_Half);

                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {

                    context.setMsgType(MessageType.Delay_Msg);

                }

                this.executeSendMessageHookBefore(context);

            }

            // 判断是否有发送消息钩子,是则进行发送消息钩子After处理

            if (this.hasSendMessageHook()) {

                context.setSendResult(sendResult);

                this.executeSendMessageHookAfter(context);

            }

      ...           

}

注入生产者发送消息钩子:

//DefaultMQProducerImpl.java

private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

// 将发送消息钩子存放在sendMessageHookList列表

public void registerSendMessageHook(final SendMessageHook hook) {

    this.sendMessageHookList.add(hook);

    log.info("register sendMessage Hook, {}", hook.hookName());

}

public boolean hasSendMessageHook() {

    return !this.sendMessageHookList.isEmpty();

}

public void executeSendMessageHookBefore(final SendMessageContext context) {

    if (!this.sendMessageHookList.isEmpty()) {

        for (SendMessageHook hook : this.sendMessageHookList) {

            try {

                hook.sendMessageBefore(context);

            } catch (Throwable e) {

                log.warn("failed to executeSendMessageHookBefore", e);

            }

        }

    }

}

public void executeSendMessageHookAfter(final SendMessageContext context) {

    if (!this.sendMessageHookList.isEmpty()) {

        for (SendMessageHook hook : this.sendMessageHookList) {

            try {

                hook.sendMessageAfter(context);

            } catch (Throwable e) {

                log.warn("failed to executeSendMessageHookAfter", e);

            }

        }

    }

}

上面就是调用钩子的入口,其实也是很简单的,只要了解了基本原理即可。

三、消息轨迹存储

由上面的源码分析过程我们也知道,轨迹消息topic默认是RMQ_SYS_TRACE_TOPIC,当然也是可以自己指定的。

那么这个Topic是怎么创建的呢?

// TopicConfigManager.java

public TopicConfigManager(BrokerController brokerController) {

    ...

    {

        // Broker配置开启消息轨迹,。那么会自动创建消息轨迹topic,读写队列数都是1。

        if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {

            String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();

            TopicConfig topicConfig = new TopicConfig(topic);

            this.systemTopicList.add(topic);

            topicConfig.setReadQueueNums(1);

            topicConfig.setWriteQueueNums(1);

            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);

        }

    }

    ...

}

一般生产环境都不会自动创建Topic,可以让运维同事帮忙提前创建好消息轨迹topic。

消息轨迹主要就是创建一个Topic来存放我们的消息轨迹数据,其它数据存储方式是跟原来的一致,所以我们关注此次核心的地方即可。

如果我们引进消息轨迹这个特性,那么是否需要将我们的Broker集群都开启消息轨迹呢?如果我们没指定Broker开启消息轨迹又会怎么样呢?从上面的源码我们知道,如果没有找到对应Topic的Broker信息,那么我们会往全部Broker发送轨迹消息数据。但是,这样就会增加全部Broker的压力,因为消息轨迹数据跟业务消息数据是没有什么强关联的,一般也只是辅助我们查看消息的发送情况等。所以,为了降低我们业务处理的Broker压力,官方建议我们,升级使用消息轨迹时,可以增加一台Broker机器,而且只有它开启消息轨迹功能traceTopicEnable即可,这样只有这台新机器会存放轨迹消息,而不会原有业务Broker造成太大的压力。

四、消费消息轨迹流程

消费者在消息轨迹的初始化和启动流程完全跟生产者消息轨迹的流程一模一样,在此就不重复记录了。

我们都知道,消费消息有两种方式:pull和push,那么相应的处理也是有处理钩子的地方,这里也暂不去探究了,只要知道有两个地方会触发钩子的执行,有兴趣的可以去自己去看下吧。

但是,在具体的钩子Hook实现是不一样的,那么现在我们看下消费端是如何做的呢?

public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {

    private TraceDispatcher localDispatcher;

    public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {

        this.localDispatcher = localDispatcher;

    }

    @Override

    public String hookName() {

        return "ConsumeMessageTraceHook";

    }

    // 消息拉在客户端,并且在消费业务消息之前所处理

    @Override

    public void consumeMessageBefore(ConsumeMessageContext context) {

        // 消息列表为空则不处理

        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {

            return;

        }

        TraceContext traceContext = new TraceContext();

        context.setMqTraceContext(traceContext);

        traceContext.setTraceType(TraceType.SubBefore);// 轨迹类型:SubBefore

        traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//

        List<TraceBean> beans = new ArrayList<TraceBean>();

        for (MessageExt msg : context.getMsgList()) {

            if (msg == null) {

                continue;

            }

            String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);

            String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);

            // 从消息属性中获取是否禁用轨迹消息开关,如果设置false,则不处理

            if (traceOn != null && traceOn.equals("false")) {

                continue;

            }

            TraceBean traceBean = new TraceBean();

            traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//

            traceBean.setMsgId(msg.getMsgId());//

            traceBean.setTags(msg.getTags());//

            traceBean.setKeys(msg.getKeys());//

            traceBean.setStoreTime(msg.getStoreTimestamp());//

            traceBean.setBodyLength(msg.getStoreSize());//

            traceBean.setRetryTimes(msg.getReconsumeTimes());//

            traceContext.setRegionId(regionId);//

            beans.add(traceBean);

        }

        if (beans.size() > 0) {

            traceContext.setTraceBeans(beans);

            traceContext.setTimeStamp(System.currentTimeMillis());

            localDispatcher.append(traceContext);

        }

    }

    // 业务消息处理完之后进行处理

    @Override

    public void consumeMessageAfter(ConsumeMessageContext context) {

        // 消息列表为空则不处理

        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {

            return;

        }

        TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();

        if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {

            // subbefore为空或其TraceBean列表为空,则不处理

            return;

        }

        TraceContext subAfterContext = new TraceContext();

        subAfterContext.setTraceType(TraceType.SubAfter);// 轨迹类型为:SubAfter

        subAfterContext.setRegionId(subBeforeContext.getRegionId());//

        subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//

        subAfterContext.setRequestId(subBeforeContext.getRequestId());//

        subAfterContext.setSuccess(context.isSuccess());//

        // 计算耗时:(当前时间-subBefore时间戳)/ 消息列表大小,所以这个值也只是平均值,不是绝对的

        int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());

        subAfterContext.setCostTime(costTime);//

        subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());

        String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);

        if (contextType != null) {

            subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());

        }

        localDispatcher.append(subAfterContext);

    }

}

消费端消费轨迹流程大致跟生产者消息轨迹流程基本一致,不同的是消费端包含SubBefore和SubAfter两种轨迹类型,这是因为消息到消费端还需要等待业务逻辑进行处理,这样处理完成之后,就可以计算消费耗时了。

下图是关于消息轨迹的核心图,来自github上zongtanghu在issue上提供的。

五、总结

RocketMQ消息轨迹实现起来并不是十分复杂,通过源码我们也了解了大致的实现原理和流程处理。

1)RocketMQ消息轨迹是基于Hook钩子机制实现

2)消息轨迹类型包含:Pub、SubBefore、SubAfter。

3)使用一台Broker来启用消息轨迹存储,减少其它业务Broker消息处理压力。

参考资料:

源码分析RocketMQ消息轨迹 https://blog.csdn.net/prestigeding/article/details/98376981

RocketMQ消息轨迹-设计篇 https://blog.csdn.net/prestigeding/article/details/95922489

上一篇下一篇

猜你喜欢

热点阅读