RocketMQ系列1:元数据通信

2020-12-27  本文已影响0人  过去今天和未来

最近一段时间由于公司做一些LDC(阿里称为逻辑数据中心,)架构升级,所以有幸参与中间件RocketMQ改造,所以整这个机会会把RocketMQ整个流程梳理一份。

image

先简单概述一下RocketMQ中成员

以下则是本系列开篇,首先会梳理一下各个成员之间的路由信息通信


一、Broker上报元数据流程

启动过程

1. BrokerController创建

主要是加载类成员,包括配置类、管理类、服务类、任务类、策略类等。后续在分析这些。

2. 启动过程

主要包括消息存储服务、远程通信服务、心跳服务等启动过程,我们重点看一下定时线程池,启动后10s执行。时间间隔则是在10-60s期间,可以在conf文件中配置。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
     BrokerController.this.registerBrokerAll(true, false,brokerConfig.isForceRegister());
  }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

3. BrokerController#registerBrokerAll

List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());

4. 真正进行Broker元信息上报
BrokerController#doRegisterBrokerAll

 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }

二、Nameserver接受Broker请求处理

DefaultRequestProcessor#processRequest

该方法采用策略分配请求处理,对于Broker上报请求Code则是Register_Broker,最终到DefaultRequestProcessor#registerBroker ,该方法主要更新Nameserver元数据基本都是Map对象结构。

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

三、客户端拉取Namesrv元数据

1. 生产端启动

MQClientInstance#start

 // 元数据
 public class TopicRouteData extends RemotingSerializable {
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
}

2. 发送消息

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

四、NameServer处理客户端请求
DefaultRequestProcessor#processRequest
客户端请求获取路由信息,请求Code是Get_RouteInfo_By_Topic。接着处理客户端获取的请求,调用pickupTopicRouteData,核心流程主要根据topic获取QueueData获取BrokerNameSet,遍历Set后得到BrokerData最终返回最新的topicRouteData。

     try {
               this.lock.readLock().lockInterruptibly();
                List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                    topicRouteData.setQueueDatas(queueDataList);
                    foundQueueData = true;
                    Iterator<QueueData> it = queueDataList.iterator();
                    while (it.hasNext()) {
                        QueueData qd = it.next();
                        brokerNameSet.add(qd.getBrokerName());
                    }
                    for (String brokerName : brokerNameSet) {
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                                filterServerMap.put(brokerAddr, filterServerList);
                            }
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }

上一篇 下一篇

猜你喜欢

热点阅读