RocketMQ-普通消息

2020-07-20  本文已影响0人  Travis_Wu

一、摘要

  1. 默认消息发送超时时间为3s
  2. 默认消息发送是同步的发送模式,同步发送会发送1+重试次数,默认重试2,一共3次
  3. 消息内容不能为0,也不能超过4M
  4. 同步消息发送才会有重试机制,异步发送和oneway发送模式都只有一次发送机会。同步发送 1+重试次数(默认2)
  5. pull模式、push模式启动的时候都不会检查nameserv,pull模式在fetchqueue时没有nameserv时会报错,push模式没有nameserv不会报错

二、简单实例(官方实例)

  1. 发送实例
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}
  1. pull接收实例
public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        
      DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ProducerGroupName");
      consumer.setNamesrvAddr("localhost:9876");
      
      consumer.start();
      
      // 从nameserv上拉取的topic的队列信息
      Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC_TEST");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    // 远程拉取消息,拉取32个
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND://找到消息
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println("msgId:"+m.getMsgId());
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

//        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}
  1. push接收实例
public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TOPIC_TEST", "*");// topic  tag
        /**
         * 全新的消费组 才适用这些策略
         * CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
         * CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
         * CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
         */
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for(MessageExt msg:msgs){
                    System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody()));
                }
                /**
                 * CONSUME_SUCCESS 消费成功
                 * RECONSUME_LATER 重试消费,重试次数可以设置,默认是16次
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

三、RocketMQ发送普通消息的全流程解读

① DefaultMQProducer的启动流程

② send发送方法的核心流程

public class TopicPublishInfo {
    //是否是顺序消息
    private boolean orderTopic = false;
    //是否存在路由信息
    private boolean haveTopicRouterInfo = false;
    //改topic对应的逻辑队列,每一个逻辑队列就对应一个MessageQueue
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //用于选择消息队列的值,每选择一次消息队列,该值会自增1。
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    // topic路由元数据,如borker的元信息和队列的元信息
    private TopicRouteData topicRouteData;
    //....
    //....
}

//数据结构如下:
TopicPublishInfo [
orderTopic=false, 
messageQueueList=[MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=0], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=1], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=2], MessageQueue [topic=TopicTest, brokerName=Silence.local, queueId=3]], 
sendWhichQueue=ThreadLocalIndex{threadLocalIndex=null},
haveTopicRouterInfo=true]
    /**
     * 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
     * 如果没有则更新路由信息,从nameserver端拉取最新路由信息
     *
     * topicPublishInfo
     * 
     * @param topic
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //step1.先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //step1.2 然后从nameServer上更新topic路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            //step2 然后再从本地缓存变量topicPublishInfoTable中再get一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            /**
             *  第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
             */
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
/**
     * 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
     *
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        //设置请求头中的Topic参数后,发送获取Topic路由信息的request请求给NameServer
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
       //这里由于是同步方式发送,所以直接return response的响应
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            //如果NameServer中不存在待发送消息的Topic
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            //如果获取Topic存在,则成功返回,利用TopicRouteData进行解码,且直接返回TopicRouteData
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
image.png
    /**
     * 根据sendLatencyFaultEnable开关是否打开来分两种情况选择队列发送消息
     * @param tpInfo
     * @param lastBrokerName
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {

                //1.在随机递增取模的基础上,再过滤掉not available的Broker代理;对之前失败的,按一定的时间做退避
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        //2.采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

③ Broker代理服务器的消息处理简析

Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:


四、总结

消息发送的核心逻辑在sendKernelImpl方法,这里简单归纳下,主要做了以下几件事:

  1. 根据对应的messageQuene获取broker网络地址。
  2. 为消息分配全局的唯一id
  3. 压缩消息,如果消息体大小超过compressMsgBodyOverHowmuch配置的值(默认4K),则进行压缩
  4. 是否存在发送消息钩子sendMessageHookList,存在则执行钩子
  5. 构建消息发送的请求头SendMessageRequestHeader
  6. 根据不同发送方式进行消息的发送。如果失败进入循环重试
    • 同步发送(SYNC):同步阻塞等待broker处理完消息后返回结果
    • 异步发送(ASYNC):
      不阻塞等待broker处理消息的结果,通过提供回调方法,响应消息发送结果。
      这种方式的发送,RocketMQ做了并发控制,通过clientAsyncSemaphoreValue参数控制,默认值是65535。
      异步发送消息的消息重试次数是通过retryTimesWhenSendAsyncFailed控制的,但如果网络出现异常是无法发生重试的
    • 单向发送(ONEWAY):不关心消息发送是否成功,只管发送
  7. 继续判断发送消息钩子,有则执行钩子的after逻辑

上一篇 下一篇

猜你喜欢

热点阅读