(三)(1)broker是如何自动创建topic的
2021-07-22 本文已影响0人
guessguess
有一个问题困扰了我很久。
生产者在发送消息的时候,如果并没有提前在某个broker去创建topic,那么从注册中心拉取到的路由信息必然是没有该topic对应的相关信息的。
但是我在使用生产者随便使用一个未创建的topic去发消息的时候,消费者最后却知道从哪里去拉取该topic的消息。因此我断定,broker偷偷做了些什么。
每个broker都会定时将自己的topic信息同步到注册中心,注册中心随后会更新路由信息,给topic分配对应的队列以及broker,但是如果broker一开始并没有创建任何的自定义topic,那么同步到注册中心的就只有默认topic。
所以生产者一开始拉取到的路由信息其实只有默认的主题。当生产者发送的消息所对应的主题不存在路由信息的时候,便会选择默认主题,然后随便选一个队列,找到该队列的broker以及对应的broker地址,最后进行消息的发送。
那么在该Broker接收到消息之后做了什么。最后可以使得消费者可以拉取到对应的消息。
先贴一下debug路径。
![](https://img.haomeiwen.com/i22548080/62d8192e43a25540.png)
这里的话,从debug的路径来看,在broker接收到信息之后会先做一个检查。
代码如下
public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
省略部分代码
这里是broker从自己的topic信息中查看是否有对应的topic信息。由于我们传的是自定义的topic,同时该broker也没有创建该topic,所以必然是null。
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
省略部分代码。
所以直接进行创建。
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
省略代码
if (null == topicConfig) {
创建失败则返回 错误的响应码。
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
}
那么看看是如何自动创建topic的。
下面这个方法很简单
public class TopicConfigManager extends ConfigManager {
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
一开始必然为空,没有对应topic的信息。
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
如果不允许自动创建,更改默认主题配置的权限。
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
如果允许自动创建
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
则直接创建主题。
topicConfig = new TopicConfig(topic);
int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
defaultTopic, remoteAddress);
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
defaultTopic, topicConfig, remoteAddress);
最后加入到该broker的topic配置中。
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
this.persist();
}
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageMethod exception", e);
}
if (createNew) {
再次进行注册,进行数据同步。
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
}
接下来看看如何进行数据同步。
public class BrokerController {
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
获取到该broker的topic配置,此时topic配置中已经包含自定义的topic了。
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
进行同步
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
}
最后一路debug,代码如下
向所有的注册中心同步topic配置。
public class BrokerOuterAPI {
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
}
那么注册中心是如何处理broker的注册?
可以往前面看。broker的路由注册
注册中心生成新的路由信息。那么消费者从注册中心就可以拉取到该topic对应的路由数据,从而进行消息的拉取以及消费了。