RocketMQ

RocketMQ源码分析(四)消息的发送

2021-04-03  本文已影响0人  甘_

消息发送的主要步骤为:验证消息,查找路由,消息发送(包含异常机制处理)

我们以DefaultMQProducer#send为切入口:

  1. 以下代码为验证消息
public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验消息(主要是长度是否超出)
        Validators.checkMessage(msg, this);
        // 设置topic
        msg.setTopic(withNamespace(msg.getTopic()));
        // 发送消息
        return this.defaultMQProducerImpl.send(msg);
    }
  1. 经过一些嵌套,我们发现真正处理逻辑的位置位于DefaultMQProducerlmpl#sendDefaultImpl,里面包含了查找路由和消息发送的逻辑。
  2. 它会调用tryToFindTopicPublishInfo去查找Topic,先从本地缓存找,找不到再去nameServer找,如果从nameserver找到了则同时更新本地。都找不到,会根据autoCreateTopicEnable(broker里配置,生产环境一般为false)这个参数判断,为true则创建一个topic返回,为false就爆异常;最终对比本地路由缓存表,更新本地路由缓存。
  3. 选择消息队列并且发送,成功则返回,失败则重试。
  4. 选择消息队列,建议启用broker故障延迟机制,防止broker宕机难以感知到。
  5. 发送所调用的是sendKernelImpl,然后调用this.mQClientFactory.getMQClientAPIImpl().sendMessage(...),接着封装成一个request请求,最终通过底层的netty进行发送。

三种类型消息的区别(同步,异步,单向)

从上面我们可以知道,最终会调用MQClientAPIlmpl#sendMessage,从其中的switch包裹体可以看出不同的处理方式:

switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }
  1. 同步消息,下面详细分析。
  2. 异步发送,重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试 。
  3. 单向发送就是异步发送,只是没有回调函数,不关心结果如何。

关于同步消息和异步消息。我们知道在rocketmq底层,client和broker是用netty进行通信的。在broker端,都是用nio的形式一视同仁地处理;而client这端,同步消息的方法会阻塞住,等待结果的返回再进行下一步处理;而异步消息,则会挂一个回调函数,结果回来了,进行回调。性能来说肯定是异步更好的,但是也得分场景使用。

但是同样是使用netty,为什么一个可以实现同步,一个是异步呢,这个又得往下翻了,以下皆为netty层面的个人分析:

同步消息做了一件很鸡贼的事情,它在底层使用了一个countDownLatch拦住了执行流程,到timeoutMillis就一定返回;要是有值返回,那就没事;如果没有值返回,那就爆一个超时异常……超时默认是3000ms,但是中间经过了很多层处理,我就没算了。
异步消息就比较良心了,常规的nio处理手段,挂一个回调函数,结果回来就完事儿。

上一篇下一篇

猜你喜欢

热点阅读