RocketMQ源码解析(二)-nameserv
NameServ是rocketMQ的注册中心,保存所有Broker、Topic的元数据。Broker启动后会向nameserv发送心跳,nameserv也会定时检测broker的可用性,并移除不可用的broker。
Nameserv的启动过程
启动脚本
> nohup sh bin/mqnamesrv &
nameserv启动过程会将所有初始化和启动工作交给NamesrvController
来完成。
NamesrvController
nameserv的主要控制类,负责初始化和后台任务启动,Controller包含的主要组件都在构造函数中做了初始化
Controller构造函数
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
//nameserv参数配置
this.namesrvConfig = namesrvConfig;
//netty的参数配置
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
//初始化RouteInfoManager
this.routeInfoManager = new RouteInfoManager();
//监听客户端连接(Channel)的变化,通知RouteInfoManager检查broker是否有变化
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
//Nameserv的配置参数会保存到磁盘文件中
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
构造函数中初始化了RouteInfoManager
,这个最重要的类,负责缓存整个集群的broker信息,以及topic和queue的配置信息。
RouteInfoManager数据结构
RouteInfoManager
的所有数据通过HashMap缓存在内存中,通过读写锁来控制并发更新。这样可最大程度的提高客户端查询数据的速度。数据更新时会将数据保存到文件中,重启后可恢复数据。
//1、Topic和broker的Map,保存了topic在每个broker上的读写Queue的个数以及读写权限
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//2、注册到nameserv上的所有Broker,按照brokername分组
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//3、broker的集群对应关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//4、broker最新的心跳时间和配置版本号
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//5、broker和FilterServer的对应关系
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
从以上的信息可以看出:
1)Broker使用brokerName来标识主从关系,同一个brokerName下只能由一个master。
2)使用clusterName来判断多个broker是不是属于同一个集群。对于同一个cluster下的broker,producer在发送消息时只会选择发送给其中一个。
3)nameserv会记录brokerAddr的最后活跃时间,如果超过一定没有心跳或其他数据交互,会认为broker已下线。
4)nameserv和broker上都会保存DataVersion字段,当broker配置有变更时,DataVersion会+1。下次心跳时nameserv通过这个字段来判断配置是否有变更。
【注意】因为nameserv是用brokername来区分broker,所以注册到同一个nameserv上的多个集群,brokerName和topic也是不能重复的。
Controller initialize
启动过程中新建了一个Controller的实例后会调用它的initialize()方法:
public boolean initialize() {
//1、初始化KVConfigManager
this.kvConfigManager.load();
//2、初始化netty server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//3、客户端请求处理的线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//4、注册DefaultRequestProcessor,所有的客户端请求都会转给这个Processor来处理
this.registerProcessor();
//5、启动定时调度,每10秒钟扫描所有Broker,检查存活状态
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//6、日志打印的调度器,定时打印kvConfigManager的内容
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//7、监听ssl证书文件变化,
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
...
}
return true;
}
以上最重要的就是第2,4步,初始化nameserv的Server,用来接收客户端请求。所有的客户端请求都会转给第4步中注册的DefaultRequestProcessor
来处理。
第5步中,启动了一个定时器来扫描RouteInfoManager中缓存的broker信息,如果broker已经长时间没有心跳信息,则认为broker已经down掉了,将其移除。
Controller启动:
public void start() throws Exception {
this.remotingServer.start();
//监听ssl文件变化,可以实时更新证书
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
启动的过程比较简单,就是启动netty server开始接收客户端请求。
DefaultRequestProcessor请求处理
前面讲过nameserv最重要的两个作用,一个是路由管理,通过RouteInfoManager
管理路由信息供客户端查询。一个是Broker管理,接收broker注册并通过心跳机制检查broker的可用性。
NameServer通过DefaultRequestProcessor
的processRequest方法来提供请求处理。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
...
switch (request.getCode()) {
...
//broker注册请求
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
//Broker注销请求
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
//根据topic获取broker路由信息
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
//获取broker集群信息
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
//获取所有topic信息
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
//删除topic
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
...
}
return null;
}
查询路由信息
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//从RouteInfoManager中获取topic的路由信息
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//如果支持顺序消息,则填充KVConfig信息
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
路由信息就是直接到RouteInfoManager
查询,我们看下具体实现:
public TopicRouteData pickupTopicRouteData(final String topic) {
...
try {
//获取读锁
this.lock.readLock().lockInterruptibly();
//获取所有支持该topic的broker的queue配置
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
//获取brokerName
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
//根据brokerName获取broker主从地址信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
...
}
获取topic路由的过程就是直接从HashMap中获取缓存的broker配置。
Broker注册
Broker在启动的时候会将topic和queue的配置同步给nameserv。
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
...
//checksum,检查CRC32是否相等
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
//decode request body,如果body已压缩,则先解压。如果body为空,会将topic的版本号默认置为0.
if (request.getBody() != null) {
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
//使用broker上报的信息更新nameserv的RouteInfo
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
//如果broker是slave的话,会将master address和ha server address通过response返回给broker
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
//将Order topic的KV配置信息通过response返回
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
nameserv收到broker注册后,更新routeInfo过程
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
//更新cluster和broker对应关系
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//更新brokername和brokerdata的map
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//如果是master broker,第一次注册或者是topic信息发生变化了,更新topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
//更新broker的心跳时间
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
//更新filter server table
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
//如果是slave broker注册,如果master存在,则返回master broker信息
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
总结
1、nameserv通过clusterName来判断broker是不是属于同一个集群
2、nameserv通过brokerName来判断两个broker是不是主从关系
3、对于相同的brokerName,只有一个master(id=0),不同的slave必须使用不同的Id (id>0)
4、NameServ只会保存master的topic配置信息,因为理论上slave的topic信息是从master同步过去的
5、所有的topic信息以broker上报为准,broker在启动的时候是不会去nameserv获取topic配置的,只会从自己持久化文件中加载。所以,一个新的broker启动后默认只有System topic信息。如果broker是新的,或者broker在挂掉一段时间重启topic不是最新的话,只能通过客户端更新topic来使broker能加入到正常的消息收发中。