RocketMQ

RocketMQ源码分析(三)生产者的启动

2021-04-03  本文已影响0人  甘_

RocketMQ 支持 3 种消息发送方式 :同 步(sync )、异步(async)、单向(oneway )。这些大家应该都比较了解了,我们从生产者的启动开始聊起。
DefaultMQProducer 是默认的消息生产者实现,他实现了MQAdmin接口。

生产者的启动,常规是用 DefaultMQProducerlmpl的start()方法进行追踪,以下为主要步骤:

  1. 检查生产者组是否符合要求,并改变生产者的实例名为进程ID。
  2. 创建 MQClientlnstance 实例 。 整个JVM实例中只存在一个MQClientManager 实
    ,同时维护了一个MQClientlnstance缓存表,最终通过clientId(ip+pid)生成一个实例。
  3. 向MQClientInstance注册,并加入其管理。
  4. 启动实例。
public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 1.检查是否符合要求
                this.checkConfig();
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    //改变生产者名字为进程id
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 通过MQClientManager获取mqclient工厂(同时创建生产者实例)
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 向mqclient工厂注册生产者
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
省略很多代码……
上一篇下一篇

猜你喜欢

热点阅读