RocketMQ源码分析(三)生产者的启动
2021-04-03 本文已影响0人
甘_
RocketMQ 支持 3 种消息发送方式 :同 步(sync )、异步(async)、单向(oneway )。这些大家应该都比较了解了,我们从生产者的启动开始聊起。
DefaultMQProducer 是默认的消息生产者实现,他实现了MQAdmin接口。
生产者的启动,常规是用 DefaultMQProducerlmpl的start()方法进行追踪,以下为主要步骤:
- 检查生产者组是否符合要求,并改变生产者的实例名为进程ID。
- 创建 MQClientlnstance 实例 。 整个JVM实例中只存在一个MQClientManager 实
例,同时维护了一个MQClientlnstance缓存表,最终通过clientId(ip+pid)生成一个实例。 - 向MQClientInstance注册,并加入其管理。
- 启动实例。
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1.检查是否符合要求
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
//改变生产者名字为进程id
this.defaultMQProducer.changeInstanceNameToPID();
}
// 通过MQClientManager获取mqclient工厂(同时创建生产者实例)
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 向mqclient工厂注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
省略很多代码……