Java学习资料收集

RocketMQ之producer发送消息源码分析

2019-07-04  本文已影响0人  nhhnhh

RocketMQ主要有NameServer,producer的发送,borker端的消息存储,consumer端的消费。首先我们先来看一下producer的发送
以下是producer的发送逻辑:
1.DefaultMQProducerImpl#sendDefaultImpl(入口)
2.判断一下topic是否可用,为空或者不可用则不处理DefaultMQProducerImpl#tryToFindTopicPublishInfo
3.如果topick可用,则判断一下他的发送模式来计算他的发送总次数
4.for循环发送次数,选择发送队列,如果队列为空,则直接跳出循环DefaultMQProducerImpl#selectOneMessageQueue
5.如果选择队列的耗时太长,那也不再进行处理。
6.调用发送主逻辑DefaultMQProducerImpl#sendKernelImpl
7.更新一下broker的可用时间,对broker的可用性进行更新DefaultMQProducerImpl#updateFaultItem
8.根据发送结果,来判断是否继续循环,如果是同步模式的话,就继续for循环。

private SendResult sendDefaultImpl(
       Message msg,
       final CommunicationMode communicationMode,
       final SendCallback sendCallback,
       final long timeout
   ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
       this.makeSureStateOK();
       Validators.checkMessage(msg, this.defaultMQProducer);

       final long invokeID = random.nextLong();
       long beginTimestampFirst = System.currentTimeMillis();
       long beginTimestampPrev = beginTimestampFirst;
       long endTimestamp = beginTimestampFirst;
       //获取一下topic信息
       TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
       if (topicPublishInfo != null && topicPublishInfo.ok()) {
           boolean callTimeout = false;
           MessageQueue mq = null;
           Exception exception = null;
           SendResult sendResult = null;
           //判断一下发送的模式,如果是同步的,就为重试次数+1
           int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
           int times = 0;
           String[] brokersSent = new String[timesTotal];
           for (; times < timesTotal; times++) {
               String lastBrokerName = null == mq ? null : mq.getBrokerName();
               //选择可用的队列
               MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
               if (mqSelected != null) {
                   mq = mqSelected;
                   brokersSent[times] = mq.getBrokerName();
                   try {
                       //判断一下获取队列的耗时,如果超时了就认为不可用,直接跳出循环
                       beginTimestampPrev = System.currentTimeMillis();
                       long costTime = beginTimestampPrev - beginTimestampFirst;
                       if (timeout < costTime) {
                           callTimeout = true;
                           break;
                       }
                       //发送的主逻辑
                       sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                       endTimestamp = System.currentTimeMillis();
                       //更新一下broker的可用时间,对broker的可用性进行判断
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                       switch (communicationMode) {
                           case ASYNC:
                               return null;
                           case ONEWAY:
                               return null;
                           case SYNC:
                               //如果发送状态不为ok的话,判断是否需要重试,判断落盘失败,是否需要重试,需要的话就继续循环。默认是false。
                               if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                   if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                       continue;
                                   }
                               }

                               return sendResult;
                           default:
                               break;
                       }
                   } catch (RemotingException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                       log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());
                       exception = e;
                       continue;
                   } catch (MQClientException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                       log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());
                       exception = e;
                       continue;
                   } catch (MQBrokerException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                       log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());
                       exception = e;
                       switch (e.getResponseCode()) {
                           case ResponseCode.TOPIC_NOT_EXIST:
                           case ResponseCode.SERVICE_NOT_AVAILABLE:
                           case ResponseCode.SYSTEM_ERROR:
                           case ResponseCode.NO_PERMISSION:
                           case ResponseCode.NO_BUYER_ID:
                           case ResponseCode.NOT_IN_CURRENT_UNIT:
                               continue;
                           default:
                               if (sendResult != null) {
                                   return sendResult;
                               }

                               throw e;
                       }
                   } catch (InterruptedException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                       log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());

                       log.warn("sendKernelImpl exception", e);
                       log.warn(msg.toString());
                       throw e;
                   }
               } else {
                   break;
               }
           }

           if (sendResult != null) {
               return sendResult;
           }

           String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
               times,
               System.currentTimeMillis() - beginTimestampFirst,
               msg.getTopic(),
               Arrays.toString(brokersSent));

           info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

           MQClientException mqClientException = new MQClientException(info, exception);
           if (callTimeout) {
               throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
           }

           if (exception instanceof MQBrokerException) {
               mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
           } else if (exception instanceof RemotingConnectException) {
               mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
           } else if (exception instanceof RemotingTimeoutException) {
               mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
           } else if (exception instanceof MQClientException) {
               mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
           }

           throw mqClientException;
       }

       List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
       if (null == nsList || nsList.isEmpty()) {
           throw new MQClientException(
               "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
       }

       throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
           null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
   }

那么我们来看一下tryToFindTopicPublishInfo这个方法

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //从本地缓存中读取是否有该topic信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //如果topic信息为空或者不可用则再从nameServer获取topic信息
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //再从nameServer获取一下topic信息,此时获取的topic为默认的topic
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

updateTopicRouteInfoFromNameServer方法

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            //从nameServer获取数据的时候锁一下
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        //获取topic为AUTO_CREATE_TOPIC_KEY的topic
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            //赋值队列的读写队列数量
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        //获取对应的topic
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        //判断一下topic的信息,如读写队列数量,broker信息有没有变
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            //判断一下是否需要更新topic信息
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            //赋值一份topic信息
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            //重新往brokerAddrTable缓存里面塞入当前topic已经注册过的broker以及对应的地址
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info
                            {
                                //更新一下写的topic信息
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info
                            {
                                //更新一下读的topic信息
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }

选择队列selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
       //是否支持容错,默认是false
       if (this.sendLatencyFaultEnable) {
           try {
               //获取一下ThreadLocal里定义的index
               int index = tpInfo.getSendWhichQueue().getAndIncrement();
               //循环队列长度
               for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                   int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                   if (pos < 0)
                       pos = 0;
                   //获取相对应的队列
                   MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                   //判断当前的borker是否可用,如果可用的话,并且lastBrokerName为null(当for循环第一次发送时候lastBrokerName为null)
                   if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                       if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                           return mq;
                   }
               }
               //走到这一步,说明没有符合条件的broker,那么再取一个broker
               final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
               //获取当前broker的写队列数量
               int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
               if (writeQueueNums > 0) {
                   //获取下一个mq队列,然后将对应的brokername还有队列id赋值
                   final MessageQueue mq = tpInfo.selectOneMessageQueue();
                   if (notBestBroker != null) {
                       mq.setBrokerName(notBestBroker);
                       mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                   }
                   return mq;
               } else {
                   latencyFaultTolerance.remove(notBestBroker);
               }
           } catch (Exception e) {
               log.error("Error occurred when selecting message queue", e);
           }
           //重新获取队列
           return tpInfo.selectOneMessageQueue();
       }

       return tpInfo.selectOneMessageQueue(lastBrokerName);
   }

发送主逻辑

 private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //获取broker为master角色的broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            //r如果地址为空,重新获取一下topic信息
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            //是否开启vip通道,如果是的话,端口号减2为vip通道
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                //压缩一下msg信息
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
                //看一下是不是需要支持事务
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                //发送信息校验一下
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                //发送消息的前置逻辑
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                //构建发送消息体
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                //根据发送模式,进行发送message
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

以上就是RocketMQ的producer发送消息的源码分析

上一篇下一篇

猜你喜欢

热点阅读