RocketMQ阅读笔记之路由中心NameServer

2019-10-31  本文已影响0人  九点半的马拉

为什么会有NameServer

消息中间件一般基于主题的订阅发布机制,消息生产者会发送某一主体(Topic)的消息到消息服务器(Broker),消息服务器负责该消息的持久化存储,消息消费者订阅感兴趣的主题。通常情况下,为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么消息生产者如何 知道消息要发往哪台消息服务器呢?如果某一台消息服务器宕机了,那么生产者如何在不重启服务的情况下感知。NameServer可以 解决上述问题。

Broker消息服务器在启动的时候向所有NameServer注册,消息生产者在发送消息之前先从NameServer获取Broker服务器地址列表,然后 根据负载均衡算法从列表中选择一台消息服务器进行消息发送,如果检测到Broker宕机,则从路由注册表中将其移除,但是路由变化不会马上通知消息生产者。

NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同 。

NameServer作用及重要变量

NameServer存储路由的基础信息,还能够管理Broker节点,包括路由注册、路由删除等功能。

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; 
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; 
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; 
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

从上面可以看出数据类型都是HashMap, 其中,QueueData记录的是消息队列的信息。

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
 # 读队列数量
 private int readQueueNums;
 # 写队列数量
 private int writeQueueNums;
 # 读写权限,具体含义参考PermName
 private int perm;
 # topic同步标记,具体含义参考TopicSysFlag
 private int topicSynFlag;
public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
 private String brokerName;
 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
class BrokerLiveInfo {
  private long lastUpdateTimestamp;
 private DataVersion dataVersion;
 private Channel channel;
 private String haServerAddr;

路由注册

Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。

NameServer处理心跳包

public RegisterBrokerResult registerBroker(
    final String clusterName,
 final String brokerAddr,
 final String brokerName,
 final long brokerId,
 final String haServerAddr,
 final TopicConfigSerializeWrapper topicConfigWrapper,
 final List<String> filterServerList,
 final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
 try {
        try {
            this.lock.writeLock().lockInterruptibly();    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
 if (null == brokerNames) {
                brokerNames = new HashSet<String>();
 this.clusterAddrTable.put(clusterName, brokerNames);
  }
            brokerNames.add(brokerName);   boolean registerFirst = false;    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
 if (null == brokerData) {
                registerFirst = true;
  brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
 this.brokerAddrTable.put(brokerName, brokerData);
  }
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
  //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
 //The same IP:PORT must only have one record in brokerAddrTable  Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
 while (it.hasNext()) {
                Entry<Long, String> item = it.next();
 if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
  }
            }

            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
  registerFirst = registerFirst || (null == oldAddr);   if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
 if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
  }
                    }
                }
            }

            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
 new BrokerLiveInfo(
                    System.currentTimeMillis(),
  topicConfigWrapper.getDataVersion(),
  channel,
  haServerAddr));
 if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
  }

            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
  } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
  }
            }

            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
 if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
 if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
  result.setMasterAddr(masterAddr);
  }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
  }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
  }

    return result; }

路由删除

NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

上一篇下一篇

猜你喜欢

热点阅读