程序员RocketMQ源码解读

ConsumeQueue索引文件及构建

2020-12-06  本文已影响0人  93张先生

ConsumeQueue概览

RocketMQ是基于主题订阅模式实现消息消费,消费者关心的是主题Topic下的所有消息,同一主题的消息不连续地存储在commitlog文件中,如果直接从commitlog文件中去遍历查找订阅主题下的消息,效率极其低下,为了适应消息消费的检索需求,设计了消息消费队列文件ConsumeQueue,该文件可以看成是Commitlog关于消息消费的索引文件,consumequeue的一级目录为主题Topic,二级目录为Topic的消息队列。主要是针对每一个Topic建立的索引,方便消费者消费某个主题下的消息。


image.png
ConsumeQueue条目

ConsumeQueue的每一条都是一条消息的索引,一共20字节。


image.png

单个ConsumeQueue文件默认包含30万个条目,每个条目20byte,单个文件的长度为30W20byte,约5.7M。与ConsumeQueue对等的是CommitLog对象。他们都有自己MappedFileQueue及MappedFile对象,他们都是使用MappedFileQueue和MappedFile对象实现消息字节数组和消息索引字节数组的落盘。ConsumeQueue没有使用AllocateMappedFileService服务来创建MappedFile文件,而是使用了MappedFile的构造方法来创建MappedFile文件。ConsumeQueue每一个文件的名称是以第一个消息条数20byte字节的大小为命名的。

ConsumeQueue异步构造

构建consumequeue、indexFile索引文件,通过一个ReputMessageService异步线程进行处理,构建consumequeue、indexFile索引文件的数据从commitLog的MappedFile中的ByteBuffer中获取,一条消息消息构造一个构建索引服务的DispatchRequest请求,再由ConsumeQueue服务处理DispatchRequest请求构建consumequeue的mappedFile文件。由IndexService处理请求构建indexFile索引文件,然后将各自的文件进行刷盘。

消息消费队列ConsumeQueue索引文件是基于CommitLog文件构建的,当消息生产者提交消息存储在CommitLog的MappedFile文件中,ConsumeQueue需要及时更新,否则消息无非被及时消费,根据消息属性查找消息也会出现较大的延迟。构建ConsumeQueue的数据来源为CommitLog的MappedFile中的ByteBuffer,此时消息未必被Commit、Flush等。获取一定数量的消息后,RocketMQ根据每条消息构造一个DispatchRequest请求,开启一个新的线程处理请求,并构造ConsumeQueue的MappedFile文件,将消息写入MappedFile的FileChannel中,等待异步刷盘操作。

构建过程

DefaultMessageStore是消息存储服务的入口和关键API,包含消息分发构建ConsumeQueue和Index索引文件的ReputMessageService的服务。它会开启一个线程进行实时消息分发和ConsumeQueue和Index索引文件构建。

// CommitLog  消息分发,根据 CommitLog 文件,异步构建 ConsumeQueue、IndexFile 文件
private final ReputMessageService reputMessageService;

// 开启异步构建服务
this.reputMessageService.start();

@Override
public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");
    // 异步构建ConsumeQueue、Index服务线程是否停止,一直调用doReput()方法,推送一次构建服务,线程休息1毫秒
    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            // 进行消息ConsumeQueue、Index文件异步构建
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

doReput()实时从CommitLog的MappedFile文件中获取需要构建的消息,然后每条消息包装成一个DispatchRequest,进行消息分发。

/**
 * 异步构建ConsumeQueue、Index文件
 * doReput()方法在没有需要构建的offset时会停止,但调用它的地方会一直不停的调用doReput()方法,进行再次构建ConsumeQueue
 */
private void doReput() {
    // reputFromOffset小于commitlog中mappedFile文件开始的offset,进行reputFromOffset值调整为mappedFile文件的开始offset
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
            this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    //无限循环构建,commitlog文件剩余offset需要构建
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
        // 开始构建的值
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
        //根据需要构建的offset从MappedFile
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                // 开始构建的offset
                this.reputFromOffset = result.getStartOffset();
                // 一次读取ByteBuffer中一条消息,根据每条消息的大小获取一条消息,然后取下一条消息,构建一个DispatchRequest
                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    // 创造异步构建ConsumeQueue的分发请求
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
                    // 构建dispatchRequest成功
                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            DefaultMessageStore.this.doDispatch(dispatchRequest);

                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }

                            this.reputFromOffset += size;
                            readSize += size;
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            // 重新获取构建的offset偏移量
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    // 构建失败
                    } else if (!dispatchRequest.isSuccess()) {
                        // 构建失败,这条数据略过,进行构建位置更新,进行下一条ConsumeQueue条目的构建
                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                // 获得需要构建的数据的释放
                result.release();
            }
        // result为null不需要构建
        } else {
            doNext = false;
        }
    }
}

CommitLogDispatcherBuildConsumeQueue是构建ConsumeQueue请求的处理类。

/**
 * 构建ConsumeQueue文件分发服务
 */
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            // 没有事务、事务提交
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                //处理从commit log 异步构建ConsumeQueue请求
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

putMessagePositionInfo处理具体构建请求,并创建或选择一个ConsumeQueue对象。

// 处理从commit log 异步构建ConsumeQueue请求
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // ConsumeQueue 处理从commit log 异步构建ConsumeQueue请求
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

// 根据topic和queueId获取ConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        // 新建ConsumeQueue
        ConsumeQueue newLogic = new ConsumeQueue(
            topic,
            queueId,
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
            this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }

    return logic;
}

putMessagePositionInfo()将消息索引信息存放到consumequeue的byteBufferIndex中,并追加到consumequeue的内存映射文件中(本操作只追加并不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘模式。

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
    // 将commitlog的偏移量、消息长度、tag hash code存入byteBufferIndex
    this.byteBufferIndex.flip();
    //一条消息消费索引大小20byte
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //commitlog的偏移量
    this.byteBufferIndex.putLong(offset);
    //消息长度
    this.byteBufferIndex.putInt(size);
    // tag hash code
    this.byteBufferIndex.putLong(tagsCode);
    //开始存储consumequeue条目的物理偏移量
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    // 通过构造函数获取ConsumeQueue的MappedFile对象,不是预分配的
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        // 如果是第一次创建,赋值一些变量
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }
        // 并根据consumeQueueOffset计算ConsumeQueue中物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加并不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘
        if (cqOffset != 0) {
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;
        // 将内容追加到ConsumeQueue的内存映射文件中
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

消息消费查询

AdminBrokerProcessor#getIndexBuffer()根据consumequeue的消息下标,进行消息索引条目的返回。

// 根据consumequeue进行消息消费
SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());

ConsumeQueue#getIndexBuffer()确定consumequeue的MappedFile,然后从MappedFile中查找索引条目。

/**
 * 根据offset通过consumequeue查找消息
 * @param startIndex 为查找的offset值
 * @return
 */
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    int mappedFileSize = this.mappedFileSize;
    // consumequeue物理offset,消息条数*20字节(消息大小)
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        // 确定mappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            // 根据消息余数偏移量,进行ByteBuffer消息查找
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}

MappedFile#findMappedFileByOffset()方法根据 offset 定位 MappedFile 的算法为 (int)((offset/this.mappedFileSize) - (mappedFile.getFileFromOffset()/this.MappedFileSize)),获取这个 MappedFile 在 mappedFiles 的下标,然后获取 MappedFile 文件。

RocketMQ commitlog 日志文件有定时删除功能,所以 commitlog 文件夹下的文件个数是会发生改变的,所以下标的起始位置也会发生改变,动态确定 offset 所在文件的下标为:总文件的个数 - 现有文件个数 = 这个 offset 所在 MappedFile 文件集合中的下标值。

/**
 *
 * Finds a mapped file by offset.
 *
 * @param offset Offset.
 * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
 * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
 */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                    offset,
                    firstMappedFile.getFileFromOffset(),
                    lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                    this.mappedFileSize,
                    this.mappedFiles.size());
            } else {
                // mappedFile 文件下标
                // (offset / this.mappedFileSize) 为这个 offset 所在 mappedFile 文件中的第几个个数,定义为:sum
                //  (firstMappedFile.getFileFromOffset() / this.mappedFileSize)) 为第一个文件所在的文件个数, 定义为:first
                // sum - first 为这个 offset,在现有的 mappedFiles 集合文件的下标。
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }

            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}

MappedFile#selectMappedBuffer()方法根据数据所在的pos位置,从ByteBuffer中查询数据。

/**
 * 获取 mappedBuffer 中的数据
 * @param pos mappedBuffer 中的一个位置,必须小于可读数据的位置
 * @return
 */
public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            int size = readPosition - pos;
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }

    return null;
}
上一篇下一篇

猜你喜欢

热点阅读