RocketMQ源码阅读(十)-Consumer消费消息

2018-04-10  本文已影响450人  _呆瓜_

1.消费方式和消费者组

1.消费方式: 拉取和推送两种(事实上所有从远程获取数据都是这两种方式).
2.消费者组与消费模式
多个消费者组成一个消费组, 两种模式: 集群(消息被其中任何一个消息者消费), 广播模式(全部消费者消费).

2.Consumer消费消息的基本流程

RocketMQ 分别使用 DefaultMQPullConsumer 和 DefaultMQPushConsumer 实现了拉取和推送两种方式. 下面主要以DefaultMQPullConsumer为例进行分析.

先看源码中给出的Demo:

public class PullConsumerTest {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); //@1
        consumer.start(); //@2

        try {
            MessageQueue mq = new MessageQueue();
            mq.setQueueId(0);
            mq.setTopic("TopicTest3");
            mq.setBrokerName("vivedeMacBook-Pro.local");

            long offset = 26;

            long beginTime = System.currentTimeMillis();
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); //@3
            System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
            System.out.printf("%s%n", pullResult);
        } catch (Exception e) {
            e.printStackTrace();
        }

        consumer.shutdown();
    }
}

首先在@1处构建Consumer并且制定其所属的消费者组. 在@2处启动Consumer, 并且在@3处拉取消息.

Consumer启动

事实上DefaultMQPullConsumer将所有操作都委托给DefaultMQPullConsumerImpl, 下面看DefaultMQPullConsumerImpl#start.

public void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();  //@1 

            this.copySubscription(); //@2

            if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPullConsumer.changeInstanceNameToPID(); //@3
            }

            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); //@4

            //@5            
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(//
                mQClientFactory, //
                this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
            
            //@6            
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
            //@7
            if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPullConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
            }

            this.offsetStore.load();

            //@8
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;

                throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            //@9
            mQClientFactory.start();
            log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
            //@10
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
                + this.serviceState//
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
}

首先判断当前consumer的状态, 除了CREATE_JUST之外, 全部是非法状态.(这个容易理解, 因为时刚刚启动, 不应该处于其他状态).
状态合法后, 大体过程如下:

Consumer获取消息

Consumer获取消息使用pullBlockIfNotFound方法, 方法签名如下:

PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) 
      throws MQClientException, RemotingException, MQBrokerException, InterruptedException

该方法一共有4个参数.

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();     //@1
    //@2
    if (null == mq) {
        throw new MQClientException("mq is null", null);

    }

    if (offset < 0) {
        throw new MQClientException("offset < 0", null);
    }

    if (maxNums <= 0) {
        throw new MQClientException("maxNums <= 0", null);
    }

    this.subscriptionAutomatically(mq.getTopic()); //@3

    int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //@4
    //@5
    SubscriptionData subscriptionData;
    try {
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }

    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;   //@6

    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
        mq, 
        subscriptionData.getSubString(), 
        0L, 
        offset, 
        maxNums, 
        sysFlag, 
        0, 
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), 
        timeoutMillis, 
        CommunicationMode.SYNC, 
        null
    );   //@7
    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);  //@8
    //@9
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}

过程概括如下:

上一篇下一篇

猜你喜欢

热点阅读