rocketMq broker对consumer的处理

2018-12-11  本文已影响0人  圣村的希望

  在broker中是会把消息存放在commitLog中的,在后台还会把消息的逻辑位置(类似索引)存放到consumeQueue中。所以在获取消息的时候,broker端是先读取consumeQueue中的消息逻辑位置,拿到offset后再去commitLog中获取消息返回给client端。

  在broker启动初始化的时候回去注册PullMessageProcessor来处理获取消息的请求,接下来从这个类来查看对拉取消息请求的处理:

final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

  在PullMessageProcessor中的processRequest可到,获取消息的请求处理也是交给了MessageStore来进行获取消息:

//根据offset去ConsumeQueue中获取消息在commitLog中的逻辑位置
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);

//从commitLog中获取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

  从DefaultMessageStore中很简单地就可以看出获取消息的大致逻辑,线程consumeQueue中获取逻辑消息位置然后再去commitLog中获取实际消息。

  ConsumeQueue的维护:在rocketMq中,每个MessageQueue都对应有一个consumeQueue,他类似于是消息的逻辑队列。

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    ······
    //这个是一个后台线程,轮询把新写入的消息写入到consumeQueue中
    this.reputMessageService = new ReputMessageService();
}

class ReputMessageService extends ServiceThread {
    @Override
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    //新写入的消息添加到consumeQueue
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }
}

private void doReput() {
    //从mappedFile中获取到新写入的消息
    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

    DefaultMessageStore.this.doDispatch(dispatchRequest);
}

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            //prepared消息和rollback消息是不会往consumeQueue中存放的
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        //通过topic和queueId获取到对应的consumeQueue,如果没有就创建对应queueId的consumeQueue
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        cq.putMessagePositionInfoWrapper(dispatchRequest);
}
  1. 在DefaultMessageStore初始化的时候实例化了ReputMessageService,这是一个后台线程,主要靠它来进行后台轮询吧新写入到mappedFile中的消息添加到consumeQueue中。
  2. 后台线程不停滴从mappedFile中获取到消息,然后再把消息添加到consumeQueue中,但是也不是所有的消息都会添加到consumeQueue中,事务消息的prepared消息和rollback消息是不会被写入到consumeQueue中,所以事务消息再没有被commit的时候是不会被consumer消费的。
  3. 存放消息到consumeQueue中时,也是先去查看当前内存是否已经有对应的consumeQueue,没有就进行新建messageQueue对应的consumeQueue,用messageQueue的id作为key,consumeQueue作为value放到map中,然后把这个map添加到对应的topic下。所以这里可以看出MessageQueue和ConsumeQueue是一一对应的。
//这个是DefaultMessageStore中的一个属性
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

上一篇 下一篇

猜你喜欢

热点阅读