RocketMQ源码解析

2025-11-30  本文已影响0人  shark没有辣椒

本文基于 Apache RocketMQ release-4.9.4 版本源码,深入剖析其核心架构设计思想

RocketMQ 作为阿里巴巴开源的消息中间件,历经双11等极端场景考验。其源码中蕴含了大量精妙的设计思想,本文将带你深入核心模块,揭秘高性能消息队列的实现原理。

一、CommitLog 存储引擎

1.1 MappedFileQueue 文件管理

文件位置:store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

RocketMQ 的 CommitLog/ConsumeQueue 是 按固定大小分段写文件 的,每个 segment 就是一个 MappedFile。
例如 commitlog 默认大小:1GB 一个文件段。每个 MappedFile 文件名 = 该文件中第一条消息的物理 offset

MappedFileQueue 就是管理这一串文件的队列。

/**
 * MappedFile队列管理器 - 负责物理文件的生命周期管理
 */
public class MappedFileQueue {
    private final String storePath;
    protected final int mappedFileSize;
    protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
    
    /**
     * 获取最后一个MappedFile - 核心方法
     * 设计思想:懒加载,只在需要时才创建新文件
     */
    public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;
        
        while (!this.mappedFiles.isEmpty()) {
            try {
                // 获取列表中的最后一个文件
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }
        
        return mappedFileLast;
    }
    
    /**
     * 根据起始偏移量获取或创建MappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset) {
        return getLastMappedFile(startOffset, true);
    }
    
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 计算创建偏移量
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();
        
        if (mappedFileLast == null) {
            // 即使 startOffset 落在第一段文件内部,RocketMQ 仍然会从 0 开始创建第一段文件,而不是从 startOffset 对齐的位置创建文件
            // 因为每个 MappedFile 的起始 offset 必须是文件大小(1GB)的整数倍,例如 offset:0 -> 00000000000000000000, offset:1GB -> 00000000001073741824, ...
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        // 有文件且满了 → 创建下一个文件
        if (mappedFileLast != null && mappedFileLast.isFull()) {
             // 假设最后文件:00000000000000000000,大小是1G,offset = 0 + 1GB = 00000000001073741824
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
        
        // 如果需要创建文件 → 创建;否则返回最后一个文件
        if (createOffset != -1 && needCreate) {
            return tryCreateMappedFile(createOffset);
        }
        
        return mappedFileLast;
    }
    
    protected MappedFile tryCreateMappedFile(long createOffset) {
        // 当前文件路径
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        // 下一个文件路径(用于提前 warm-up,异步分配)
        String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
                + this.mappedFileSize);
        return doCreateMappedFile(nextFilePath, nextNextFilePath);
    }

    protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
        MappedFile mappedFile = null;

        if (this.allocateMappedFileService != null) {
            // 异步创建(RocketMQ 默认)
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                // 普通创建(fallback)
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        // 将新建文件加入列表
        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                // 首个文件被标记 firstCreateInQueue,用于一些恢复逻辑
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }
}

1.2 MappedFile 内存映射实现

文件位置:store/src/main/java/org/apache/rocketmq/store/MappedFile.java

MappedFile 是 CommitLog 文件的「文件片段」:

/**
 * 单个内存映射文件实现 - 核心IO组件
 */
public class MappedFile extends ReferenceResource {
    // 当前文件已经写入的字节数(下一次写入的偏移)
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 写入 writeBuffer 但尚未 flush 到文件的偏移
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // MappedFile 固定大小(如 1GB)
    protected int fileSize; // 文件大小
    protected FileChannel fileChannel;
    private MappedByteBuffer mappedByteBuffer; // 内存映射缓冲区
    
    /**
     * 消息追加入口方法
     */
    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
            PutMessageContext putMessageContext) {
        return appendMessagesInner(msg, cb, putMessageContext);
    }
    
    /**
     * 实际的消息追加实现
     */
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
            PutMessageContext putMessageContext) {
        assert messageExt != null;
        assert cb != null;
        
        // 获取当前写指针
        // 使用AtomicInteger是为了支持多线程并发写入(虽然正常情况 broker 写 commitlog 是单线程的,但 flush 是另一个线程)
        int currentPos = this.wrotePosition.get();
        // 校验是否还能写入
        if (currentPos < this.fileSize) {
            // 获取写入用的 ByteBuffer
            // mappedByteBuffer 的 position 是共享(线程不安全),slice() 会生成一个「共享内存、但独立 position/limit」的新 ByteBuffer,所以不同线程可以独立操作 position,不影响原始 buffer
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;

            // 处理不同类型消息
            // doAppend是序列化消息(header + body + properties),写入 ByteBuffer,返回写入字节数、存储时间戳等
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                        (MessageExtBrokerInner) messageExt, putMessageContext);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                        (MessageExtBatch) messageExt, putMessageContext);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            
            // 更新写入位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        
        // 文件已满
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
}
1.3 CommitLog 消息追加及消息序列化

文件位置:store/src/main/java/org/apache/rocketmq/store/CommitLog.java

// 这里代码较多,仅展示部分核心代码
public class CommitLog {
    /**
     * 追加消息
     */
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // 设置消息存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 设置消息体CRC
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        AppendMessageResult result = null;
        try {
            // 从 MappedFileQueue 获取可写文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            // 写入数据
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        // 提交刷盘请求
        // 刷盘方式有同步和异步两种,会返回一个 future,用于后面判断 flush 是否成功
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // 提交主从同步请求
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        // 让两个 Future 全部完成后再汇总结果
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        });
    }

    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
            // 计算消息写入的物理偏移
            // fileFromOffset:当前 MappedFile 的起始 offset,例如 1GB * n
            // byteBuffer.position():当前文件内偏移
            long wroteOffset = fileFromOffset + byteBuffer.position();
            // 生成 msgId
            // msgId 是一个两个字段拼接的唯一值;broker IP+port -> 唯一标识这条消息是在哪个 broker 写入;wroteOffset -> 唯一定位这条消息的物理偏移
            Supplier<String> msgIdSupplier = () -> {
                MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
                msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
                return UtilAll.bytes2string(msgIdBuffer.array());
            };
            // 查找并维护 queueOffset;
            // 对于 Topic-QueueId 的队列递增编号:queueOffset = 第几条消息
            String key = putMessageContext.getTopicQueueTableKey();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }
            // 事务消息处理
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // 事务半消息不会写入 ConsumeQueue,因此 queueOffset 设为 0,这是 RocketMQ 事务消息的关键机制
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
            }
            // 检查 MappedFile 剩余空间,如果当前文件剩余空间不足容纳一整条消息,RocketMQ 会在末尾写入 BLANK_MAGIC_CODE,代表此文件结束
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                        maxBlank, 
                        msgIdSupplier, msgInner.getStoreTimestamp(),
                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }
            // 把 queueOffset 和 phyOffset 写入序列化后的消息中
            int pos = 4 + 4 + 4 + 4 + 4;
            preEncodeBuffer.putLong(pos, queueOffset);
            pos += 8;
            preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
            // 刷新 storeTimestamp
            preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
            // 将序列化好的消息写入 MappedByteBuffer,清空编码缓存
            // 因为使用的是 顺序写, 所以非常快,MappedFile + OS PageCache = 几乎是零拷贝写,RocketMQ 的高性能主要来源于这里
            byteBuffer.put(preEncodeBuffer);
            msgInner.setEncodedBuff(null);
            
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // 写成功后更新 queueOffset
                    // 如果消息是普通消息、事务提交消息则 queueOffset 自增;如果是Prepared、Rollback则不更新
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
                    break;
            }
            return result;
        }
}

这里简单介绍下零拷贝,零拷贝是一种避免CPU在内存间复制数据的技术,让数据直接从内核空间传输到用户空间(或设备),跳过CPU的拷贝环节。

二、ConsumeQueue:消费队列索引机制

2.1 索引文件结构设计

文件位置:store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

每个 Topic + QueueId 一个 ConsumeQueue,作用是Consumer 只顺序读 ConsumeQueue,拿到 offset + size,再去 CommitLog 精确读取,避免在大文件中扫描。

/**
 * ConsumeQueue - 消费队列索引,加速消息查找
 * 设计思想:为每个Topic的每个Queue建立独立索引文件,固定长度索引项
 * 索引项结构:CommitLog偏移量(8) + 消息大小(4) + 消息Tag哈希码(8)
 */
public class ConsumeQueue {
    public static final int CQ_STORE_UNIT_SIZE = 20; // 每个索引项固定20字节
    
    /**
     * 添加消息索引
     * @param offset 在CommitLog中的物理偏移量
     * @param size 消息总长度
     * @param tagsCode 消息Tag的哈希值(用于过滤)
     * @param cqOffset 在ConsumeQueue中的逻辑偏移量
     */
    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {
        // 清空ByteBuffer准备写入
        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        
        // 写入索引项的三部分数据
        this.byteBufferIndex.putLong(offset);   // CommitLog物理偏移量(8字节)
        this.byteBufferIndex.putInt(size);      // 消息长度(4字节)
        this.byteBufferIndex.putLong(tagsCode); // Tag哈希值(8字节)
        
        // 计算在索引文件中的写入位置
        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
        
        // 查找或创建对应的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {
            // 如果是新创建的文件,初始化一些元数据
            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            }
            
            // 将索引项追加到文件
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        
        return false;
    }
    
    /**
     * 根据逻辑偏移量读取索引项
     * @param startIndex 逻辑偏移量(消息序号)
     */
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        // 计算在文件中的字节偏移量
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        
        // 检查偏移量是否有效(不小于最小逻辑偏移量)
        if (offset >= this.getMinLogicOffset()) {
            // 根据偏移量找到对应的MappedFile
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                // 计算在文件内的相对偏移量并读取数据
                return mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            }
        }
        
        return null;
    }
}
2.2 索引构建服务

文件位置:store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
主要功能为:
① 顺序读取 CommitLog 中新增的数据
② 解析出消息结构(DispatchRequest)
③ 将消息写入 ConsumeQueue / IndexFile / MultiDispatch
④ 触发长轮询(唤醒消费者)
⑤ 更新 reputFromOffset,持续循环处理

public class DefaultMessageStore implements MessageStore {
    /**
     * ReputMessageService - 索引构建服务(异步构建ConsumeQueue索引)
     * 设计思想:后台线程从CommitLog读取消息,异步构建索引,不影响主写入流程
     */
    class ReputMessageService extends ServiceThread {
        private volatile long reputFromOffset = 0; // 当前已处理到的CommitLog偏移量
        
        @Override
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");
            
            while (!this.isStopped()) {
                try {
                    // 短暂休眠,避免空转消耗CPU
                    Thread.sleep(1);
                    // 执行索引构建任务
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
            
            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }
        
        /**
         * 核心索引构建逻辑
         */
        private void doReput() {
            // 检查并修正起始偏移量(防止越界)
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                    this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            // 只要 commitlog 有新数据就持续处理
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
                // 从CommitLog读取数据
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
                        
                        // 遍历读取到的数据,解析每条消息
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            // 解析消息头,获取消息大小和基本信息
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            // 计算消息在字节流中的大小
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
                                
                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    // 分发消息到对应的ConsumeQueue
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                                    
                                    // 如果启用了长轮询,通知有新消息到达
                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                            && DefaultMessageStore.this.messageArrivingListener != null) {
                                        // 长轮询唤醒消费者
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }
                                    
                                    // offset 继续向前推进,准备处理下一条消息
                                    this.reputFromOffset += size;
                                    readSize += size;
                                }  else if (size == 0) {
                                    // 说明遇到了文件结尾的 BLANK 区域,reputFromOffset 跳到下一个 commitlog 文件的起点
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {
                                // 解析失败
                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    // 大概率是读到了半截消息,需要等待写入完成,所以停止处理
                                    doNext = false;
                                }
                            }
                        }
                    } finally {
                        // 释放 result buffer
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
            
        }
    }
}

三、长轮询机制实现

3.1 PullRequestHoldService 长轮询服务

文件位置:broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java

PullRequestHoldService 是 Broker 长轮询实现的核心线程,用于处理消费者的 long-polling 拉取请求。
消费者拉取消息时,如果队列没有新消息,Broker 会“挂起”请求并加入 PullRequestHoldService 中。
一旦有新消息写入,ReputMessageService 会通知 PullRequestHoldService 唤醒这些请求返回数据。

/**
 * PullRequestHoldService - 长轮询服务,实现推拉结合模式
 * 设计思想:当没有消息时挂起客户端请求,有新消息时立即响应
 */
public class PullRequestHoldService extends ServiceThread {
    // 存储挂起的拉取请求,按Topic@QueueId分组
    protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
        new ConcurrentHashMap<String, ManyPullRequest>(1024);
    
    @Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    // 每5秒检查一次挂起的请求
                    this.waitForRunning(5 * 1000);
                } else {
                    // 如果长轮询关闭,每 shortPollingTimeMills 毫秒检查
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
                // 检查是否有新消息到达可以唤醒挂起的请求
                this.checkHoldRequest();
            } catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
        
        log.info("{} service end", this.getServiceName());
    }
    
    /**
     * 检查所有挂起的请求,有新消息时立即唤醒
     */
    protected void checkHoldRequest() {
        // 遍历所有挂起的请求
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);
                // 获取该队列的最大偏移量
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    
                try {
                    // 检查并通知有新消息到达
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Exception e) {
                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                }
            }
        }
    }

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
        notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
    }
    
    /**
     * 通知有新消息到达,唤醒挂起的请求
     */
    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        // 获取该队列的所有挂起请求
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        
        if (mpr != null) {
            // 克隆列表避免并发修改
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                for (PullRequest request : requestList) {
                    // 检查是否有新消息到达
                    long newestOffset = maxOffset;
                    // 若第一次判断 offset <= 消费 offset,尝试再次获取最新 offset
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    
                    // 如果有新消息,立即响应客户端
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter()
                            .isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));

                        // 若 consumer 还过滤属性,再继续过滤
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }
                        if (match) {
                            try {
                                //直接唤醒并返回消息
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Exception e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
                    // 超时(长轮询挂起时间到了,也要返回)
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                    // 无法唤醒 → 重新放回列表
                    replayList.add(request);
                }
                // 未被唤醒的请求重新放回
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }
}
3.2 消息拉取请求处理器

文件位置:broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java

public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    /**
     * 唤醒长轮询请求并重新执行拉取逻辑
     * @param channel 客户端与 Broker 的 Netty 连接
     * @param request 客户端的原始 PullMessage 请求(之前挂起的)
     */
    public void executeRequestWhenWakeup(final Channel channel,
        final RemotingCommand request) throws RemotingCommandException {
        // 创建一个 Runnable:真正的业务执行逻辑
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    // processRequest() 是 Pull 消息的真正逻辑,包括校验 topic、queue、offset;从 ConsumeQueue 查找消息;从 CommitLog 读取消息;封装返回结果。这里的过程几乎与客户端“主动拉取”一致
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
                    // 如果 processRequest() 返回结果,就写回客户端
                    if (response != null) {
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            // 通过 Netty 异步写回给客户端;writeAndFlush 是异步的
                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                                // 回调监控发送状态
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to {} failed",
                                            future.channel().remoteAddress(), future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        } catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                } catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", e1);
                }
            }
        };
        // 提交到 Broker 的线程池执行
        this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
    }
}

四、高可用主从同步机制

文件位置:store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java

public class HAConnection {
    class WriteSocketService extends ServiceThread {
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");
            // 每一次循环都是一次心跳+数据写入调度周期
            while (!this.isStopped()) {
                try {
                    // WriteSocketService 使用 NIO 的 Selector 对 SocketChannel 写就绪事件进行等待,最长 1s 超时
                    this.selector.select(1000);
                    // 没收到 Slave 的请求偏移量 → 等待
                    // Slave 在建立连接后会先发一个 8 字节消息(Slave 最大已拉取 offset),Master 若还没收到,就不能开始同步
                    if (-1 == HAConnection.this.slaveRequestOffset) {
                        Thread.sleep(10);
                        continue;
                    }
                    // 如果 nextTransferFromWhere 未初始化
                    if (-1 == this.nextTransferFromWhere) {
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            // slave 第一次同步,从 commitlog 文件头开始
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            this.nextTransferFromWhere = masterOffset;
                        } else {
                            // slave 已有部分 offset,从 slave 的请求 offset 开始
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }
                    // 如果上次写入完成
                    if (this.lastWriteOver) {

                        long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {

                            // 构造心跳包 Header (offset + size=0)
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                            this.byteBufferHeader.putInt(0);

                            this.lastWriteOver = this.transferData();
                        }
                    } else {
                        // 如果上次 write 未完成 → 继续写
                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }
                    // 从 CommitLog 读取要同步的数据
                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    if (selectResult != null) {
                        int size = selectResult.getSize();
                        // 制每次同步的最大 batch 大小
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }
                        // 设置同步 offset
                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += size;

                        // 构造 Header(真实数据包)
                        this.byteBufferHeader.putLong(thisOffset);
                        this.byteBufferHeader.putInt(size);
                        // 调用 transferData() 写入 socket
                        this.lastWriteOver = this.transferData();
                    } else {
                        // 若没有 commitlog 数据可同步 → 等待通知
                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }
                } catch (Exception e) {

                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
            // 退出逻辑
            HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

            if (this.selectMappedBufferResult != null) {
                this.selectMappedBufferResult.release();
            }

            this.makeStop();

            readSocketService.makeStop();

            haService.removeConnection(HAConnection.this);

            HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
        }
    }
}

五、延迟消息实现

文件位置:store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java

public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask implements Runnable {

        private final int delayLevel;
        private final long offset;
        // 构造函数
        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
            this.delayLevel = delayLevel;
            this.offset = offset;
        }

        @Override
        public void run() {
            try {
                if (isStarted()) {
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // 发生异常,暂停10秒后重试,避免频繁重试
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
            }
        }

        public void executeOnTimeup() {
            // 获取对应 delayLevel 的 ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
             // 每个 delay level 对应一个 queueId,如果队列不存在,则等待100ms后重试
            if (cq == null) {
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
                return;
            }
            // 从 ConsumeQueue 获取 index buffer(20 bytes 一条)
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ == null) {
                // 若未找到 buffer,则修正 offset 并等待100ms后重试
                resetOffset = this.offset;
                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
                return;
            }

            long nextOffset = this.offset;
            try {
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // 遍历 ConsumeQueue 内容
                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // CommitLog 物理偏移
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // CommitLog 物理大小
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // deliver timestamp
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    // 判断是否到期投递
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;
                    
                    if (countdown > 0) {
                        // 如果这条消息还没到期,则整个队列后续消息也肯定没到期,所以等待100ms后重试
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                    // 到期后,从 CommitLog 读取真正的消息
                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                    if (msgExt == null) {
                        continue;
                    }
                    // 调用 messageTimeup() 重新构建消息
                    // 1.恢复原始 Topic;2.恢复原始 queueId;3.移除 delayLevel 属性(不再是延迟消息)
                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                        continue;
                    }
                    // 使用同步/异步方式重新投递到真实 Topic
                    boolean deliverSuc;
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
                    } else {
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
                    }
                    // 处理失败情况
                    if (!deliverSuc) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                }
                // 处理完成后移动 offset
                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }
            // 最终调度下一次任务
            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }
    }
}

六、消费进度管理

文件位置:broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

这段代码是管理消费者在每个 Topic 的每个队列上的消费进度(消费 offset):

public class ConsumerOffsetManager extends ConfigManager {
    protected static final String TOPIC_GROUP_SEPARATOR = "@";
    // key:topic@group,value:key是queueId,value是offset
    // 每个 Topic 在 RocketMQ 中是 多队列(分区) 的,每个 queueId 都有自己的消费进度
    protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // 组装key
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        // 不存在,说明是第一次提交,初始化并将数据放入
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            // 已存在,更新 offset
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }
    /**
     * 查询offset
     */
    public long queryOffset(final String group, final String topic, final int queueId) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null != map) {
            Long offset = map.get(queueId);
            if (offset != null)
                return offset;
        }

        return -1;
    }
}

设计要点总结

1 存储设计精华
2 性能优化关键
3 高可用保障

通过这些精妙的设计,RocketMQ能够在保证数据可靠性的同时,实现极高的吞吐量和低延迟,成为企业级消息中间件的优秀选择。

上一篇 下一篇

猜你喜欢

热点阅读