RocketMQ源码解析
本文基于 Apache RocketMQ release-4.9.4 版本源码,深入剖析其核心架构设计思想
RocketMQ 作为阿里巴巴开源的消息中间件,历经双11等极端场景考验。其源码中蕴含了大量精妙的设计思想,本文将带你深入核心模块,揭秘高性能消息队列的实现原理。
一、CommitLog 存储引擎
1.1 MappedFileQueue 文件管理
文件位置:store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
RocketMQ 的 CommitLog/ConsumeQueue 是 按固定大小分段写文件 的,每个 segment 就是一个 MappedFile。
例如 commitlog 默认大小:1GB 一个文件段。每个 MappedFile 文件名 = 该文件中第一条消息的物理 offset
MappedFileQueue 就是管理这一串文件的队列。
/**
* MappedFile队列管理器 - 负责物理文件的生命周期管理
*/
public class MappedFileQueue {
private final String storePath;
protected final int mappedFileSize;
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
/**
* 获取最后一个MappedFile - 核心方法
* 设计思想:懒加载,只在需要时才创建新文件
*/
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
// 获取列表中的最后一个文件
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
/**
* 根据起始偏移量获取或创建MappedFile
*/
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 计算创建偏移量
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
// 即使 startOffset 落在第一段文件内部,RocketMQ 仍然会从 0 开始创建第一段文件,而不是从 startOffset 对齐的位置创建文件
// 因为每个 MappedFile 的起始 offset 必须是文件大小(1GB)的整数倍,例如 offset:0 -> 00000000000000000000, offset:1GB -> 00000000001073741824, ...
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 有文件且满了 → 创建下一个文件
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 假设最后文件:00000000000000000000,大小是1G,offset = 0 + 1GB = 00000000001073741824
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 如果需要创建文件 → 创建;否则返回最后一个文件
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
protected MappedFile tryCreateMappedFile(long createOffset) {
// 当前文件路径
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// 下一个文件路径(用于提前 warm-up,异步分配)
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
// 异步创建(RocketMQ 默认)
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
// 普通创建(fallback)
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
// 将新建文件加入列表
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
// 首个文件被标记 firstCreateInQueue,用于一些恢复逻辑
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
}
1.2 MappedFile 内存映射实现
文件位置:store/src/main/java/org/apache/rocketmq/store/MappedFile.java
MappedFile 是 CommitLog 文件的「文件片段」:
- 每个 commitlog 文件大小固定(默认 1GB)
- 所有消息顺序写入 MappedFile 的 mappedByteBuffer 或 writeBuffer
- 负责维护当前写入偏移:
- wrotePosition(实际写入位置)
- committedPosition(flush 前已 commit 的位置)
/**
* 单个内存映射文件实现 - 核心IO组件
*/
public class MappedFile extends ReferenceResource {
// 当前文件已经写入的字节数(下一次写入的偏移)
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 写入 writeBuffer 但尚未 flush 到文件的偏移
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// MappedFile 固定大小(如 1GB)
protected int fileSize; // 文件大小
protected FileChannel fileChannel;
private MappedByteBuffer mappedByteBuffer; // 内存映射缓冲区
/**
* 消息追加入口方法
*/
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
return appendMessagesInner(msg, cb, putMessageContext);
}
/**
* 实际的消息追加实现
*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
// 获取当前写指针
// 使用AtomicInteger是为了支持多线程并发写入(虽然正常情况 broker 写 commitlog 是单线程的,但 flush 是另一个线程)
int currentPos = this.wrotePosition.get();
// 校验是否还能写入
if (currentPos < this.fileSize) {
// 获取写入用的 ByteBuffer
// mappedByteBuffer 的 position 是共享(线程不安全),slice() 会生成一个「共享内存、但独立 position/limit」的新 ByteBuffer,所以不同线程可以独立操作 position,不影响原始 buffer
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 处理不同类型消息
// doAppend是序列化消息(header + body + properties),写入 ByteBuffer,返回写入字节数、存储时间戳等
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新写入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
// 文件已满
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
}
1.3 CommitLog 消息追加及消息序列化
文件位置:store/src/main/java/org/apache/rocketmq/store/CommitLog.java
// 这里代码较多,仅展示部分核心代码
public class CommitLog {
/**
* 追加消息
*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 设置消息存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// 设置消息体CRC
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
try {
// 从 MappedFileQueue 获取可写文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 写入数据
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘请求
// 刷盘方式有同步和异步两种,会返回一个 future,用于后面判断 flush 是否成功
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 提交主从同步请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// 让两个 Future 全部完成后再汇总结果
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// 计算消息写入的物理偏移
// fileFromOffset:当前 MappedFile 的起始 offset,例如 1GB * n
// byteBuffer.position():当前文件内偏移
long wroteOffset = fileFromOffset + byteBuffer.position();
// 生成 msgId
// msgId 是一个两个字段拼接的唯一值;broker IP+port -> 唯一标识这条消息是在哪个 broker 写入;wroteOffset -> 唯一定位这条消息的物理偏移
Supplier<String> msgIdSupplier = () -> {
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// 查找并维护 queueOffset;
// 对于 Topic-QueueId 的队列递增编号:queueOffset = 第几条消息
String key = putMessageContext.getTopicQueueTableKey();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// 事务消息处理
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// 事务半消息不会写入 ConsumeQueue,因此 queueOffset 设为 0,这是 RocketMQ 事务消息的关键机制
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
}
// 检查 MappedFile 剩余空间,如果当前文件剩余空间不足容纳一整条消息,RocketMQ 会在末尾写入 BLANK_MAGIC_CODE,代表此文件结束
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank,
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// 把 queueOffset 和 phyOffset 写入序列化后的消息中
int pos = 4 + 4 + 4 + 4 + 4;
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
// 刷新 storeTimestamp
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
// 将序列化好的消息写入 MappedByteBuffer,清空编码缓存
// 因为使用的是 顺序写, 所以非常快,MappedFile + OS PageCache = 几乎是零拷贝写,RocketMQ 的高性能主要来源于这里
byteBuffer.put(preEncodeBuffer);
msgInner.setEncodedBuff(null);
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 写成功后更新 queueOffset
// 如果消息是普通消息、事务提交消息则 queueOffset 自增;如果是Prepared、Rollback则不更新
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
break;
}
return result;
}
}
这里简单介绍下零拷贝,零拷贝是一种避免CPU在内存间复制数据的技术,让数据直接从内核空间传输到用户空间(或设备),跳过CPU的拷贝环节。
- 传统IO:磁盘文件 → 内核缓冲区 → 用户缓冲区 → Socket缓冲区 → 网卡
- 零拷贝:磁盘文件 → 内核缓冲区 → Socket缓冲区 → 网卡
两种主要实现如下: - mmap(内存映射):把磁盘文件 映射到进程虚拟内存,用户态可以像访问内存一样访问文件,没有显式 read() / write() 拷贝
- sendfile(发送文件):网络传输专用,直接把文件从磁盘发送到网络,跳过用户空间
rocketmq中CommitLog、ConsumeQueue、IndexFile 使用了mmap,Broker 向 Consumer 发送消息时直接从内存映射区读取(mmap + 普通 socket write(Netty)),主从同步时使用 sendfile 方式传输 CommitLog。
二、ConsumeQueue:消费队列索引机制
2.1 索引文件结构设计
文件位置:store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
每个 Topic + QueueId 一个 ConsumeQueue,作用是Consumer 只顺序读 ConsumeQueue,拿到 offset + size,再去 CommitLog 精确读取,避免在大文件中扫描。
/**
* ConsumeQueue - 消费队列索引,加速消息查找
* 设计思想:为每个Topic的每个Queue建立独立索引文件,固定长度索引项
* 索引项结构:CommitLog偏移量(8) + 消息大小(4) + 消息Tag哈希码(8)
*/
public class ConsumeQueue {
public static final int CQ_STORE_UNIT_SIZE = 20; // 每个索引项固定20字节
/**
* 添加消息索引
* @param offset 在CommitLog中的物理偏移量
* @param size 消息总长度
* @param tagsCode 消息Tag的哈希值(用于过滤)
* @param cqOffset 在ConsumeQueue中的逻辑偏移量
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
// 清空ByteBuffer准备写入
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
// 写入索引项的三部分数据
this.byteBufferIndex.putLong(offset); // CommitLog物理偏移量(8字节)
this.byteBufferIndex.putInt(size); // 消息长度(4字节)
this.byteBufferIndex.putLong(tagsCode); // Tag哈希值(8字节)
// 计算在索引文件中的写入位置
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 查找或创建对应的MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 如果是新创建的文件,初始化一些元数据
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
}
// 将索引项追加到文件
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
/**
* 根据逻辑偏移量读取索引项
* @param startIndex 逻辑偏移量(消息序号)
*/
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
// 计算在文件中的字节偏移量
long offset = startIndex * CQ_STORE_UNIT_SIZE;
// 检查偏移量是否有效(不小于最小逻辑偏移量)
if (offset >= this.getMinLogicOffset()) {
// 根据偏移量找到对应的MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
// 计算在文件内的相对偏移量并读取数据
return mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
}
}
return null;
}
}
2.2 索引构建服务
文件位置:store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
主要功能为:
① 顺序读取 CommitLog 中新增的数据
② 解析出消息结构(DispatchRequest)
③ 将消息写入 ConsumeQueue / IndexFile / MultiDispatch
④ 触发长轮询(唤醒消费者)
⑤ 更新 reputFromOffset,持续循环处理
public class DefaultMessageStore implements MessageStore {
/**
* ReputMessageService - 索引构建服务(异步构建ConsumeQueue索引)
* 设计思想:后台线程从CommitLog读取消息,异步构建索引,不影响主写入流程
*/
class ReputMessageService extends ServiceThread {
private volatile long reputFromOffset = 0; // 当前已处理到的CommitLog偏移量
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 短暂休眠,避免空转消耗CPU
Thread.sleep(1);
// 执行索引构建任务
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
/**
* 核心索引构建逻辑
*/
private void doReput() {
// 检查并修正起始偏移量(防止越界)
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
// 只要 commitlog 有新数据就持续处理
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// 从CommitLog读取数据
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
// 遍历读取到的数据,解析每条消息
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 解析消息头,获取消息大小和基本信息
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
// 计算消息在字节流中的大小
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 分发消息到对应的ConsumeQueue
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 如果启用了长轮询,通知有新消息到达
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
// 长轮询唤醒消费者
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// offset 继续向前推进,准备处理下一条消息
this.reputFromOffset += size;
readSize += size;
} else if (size == 0) {
// 说明遇到了文件结尾的 BLANK 区域,reputFromOffset 跳到下一个 commitlog 文件的起点
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
// 解析失败
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
// 大概率是读到了半截消息,需要等待写入完成,所以停止处理
doNext = false;
}
}
}
} finally {
// 释放 result buffer
result.release();
}
} else {
doNext = false;
}
}
}
}
}
三、长轮询机制实现
3.1 PullRequestHoldService 长轮询服务
文件位置:broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
PullRequestHoldService 是 Broker 长轮询实现的核心线程,用于处理消费者的 long-polling 拉取请求。
消费者拉取消息时,如果队列没有新消息,Broker 会“挂起”请求并加入 PullRequestHoldService 中。
一旦有新消息写入,ReputMessageService 会通知 PullRequestHoldService 唤醒这些请求返回数据。
/**
* PullRequestHoldService - 长轮询服务,实现推拉结合模式
* 设计思想:当没有消息时挂起客户端请求,有新消息时立即响应
*/
public class PullRequestHoldService extends ServiceThread {
// 存储挂起的拉取请求,按Topic@QueueId分组
protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 每5秒检查一次挂起的请求
this.waitForRunning(5 * 1000);
} else {
// 如果长轮询关闭,每 shortPollingTimeMills 毫秒检查
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
// 检查是否有新消息到达可以唤醒挂起的请求
this.checkHoldRequest();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
/**
* 检查所有挂起的请求,有新消息时立即唤醒
*/
protected void checkHoldRequest() {
// 遍历所有挂起的请求
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 获取该队列的最大偏移量
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 检查并通知有新消息到达
this.notifyMessageArriving(topic, queueId, offset);
} catch (Exception e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
* 通知有新消息到达,唤醒挂起的请求
*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
// 获取该队列的所有挂起请求
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 克隆列表避免并发修改
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
for (PullRequest request : requestList) {
// 检查是否有新消息到达
long newestOffset = maxOffset;
// 若第一次判断 offset <= 消费 offset,尝试再次获取最新 offset
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
// 如果有新消息,立即响应客户端
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter()
.isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// 若 consumer 还过滤属性,再继续过滤
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
//直接唤醒并返回消息
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Exception e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 超时(长轮询挂起时间到了,也要返回)
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
// 无法唤醒 → 重新放回列表
replayList.add(request);
}
// 未被唤醒的请求重新放回
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
}
3.2 消息拉取请求处理器
文件位置:broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
/**
* 唤醒长轮询请求并重新执行拉取逻辑
* @param channel 客户端与 Broker 的 Netty 连接
* @param request 客户端的原始 PullMessage 请求(之前挂起的)
*/
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
// 创建一个 Runnable:真正的业务执行逻辑
Runnable run = new Runnable() {
@Override
public void run() {
try {
// processRequest() 是 Pull 消息的真正逻辑,包括校验 topic、queue、offset;从 ConsumeQueue 查找消息;从 CommitLog 读取消息;封装返回结果。这里的过程几乎与客户端“主动拉取”一致
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
// 如果 processRequest() 返回结果,就写回客户端
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
// 通过 Netty 异步写回给客户端;writeAndFlush 是异步的
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
// 回调监控发送状态
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
// 提交到 Broker 的线程池执行
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
}
四、高可用主从同步机制
文件位置:store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
public class HAConnection {
class WriteSocketService extends ServiceThread {
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
// 每一次循环都是一次心跳+数据写入调度周期
while (!this.isStopped()) {
try {
// WriteSocketService 使用 NIO 的 Selector 对 SocketChannel 写就绪事件进行等待,最长 1s 超时
this.selector.select(1000);
// 没收到 Slave 的请求偏移量 → 等待
// Slave 在建立连接后会先发一个 8 字节消息(Slave 最大已拉取 offset),Master 若还没收到,就不能开始同步
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 如果 nextTransferFromWhere 未初始化
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
// slave 第一次同步,从 commitlog 文件头开始
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
this.nextTransferFromWhere = masterOffset;
} else {
// slave 已有部分 offset,从 slave 的请求 offset 开始
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 如果上次写入完成
if (this.lastWriteOver) {
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 构造心跳包 Header (offset + size=0)
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.lastWriteOver = this.transferData();
}
} else {
// 如果上次 write 未完成 → 继续写
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 从 CommitLog 读取要同步的数据
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
// 制每次同步的最大 batch 大小
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
// 设置同步 offset
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
// 构造 Header(真实数据包)
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
// 调用 transferData() 写入 socket
this.lastWriteOver = this.transferData();
} else {
// 若没有 commitlog 数据可同步 → 等待通知
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// 退出逻辑
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
}
}
}
五、延迟消息实现
文件位置:store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
public class ScheduleMessageService extends ConfigManager {
class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;
// 构造函数
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// 发生异常,暂停10秒后重试,避免频繁重试
log.error("ScheduleMessageService, executeOnTimeup exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
// 获取对应 delayLevel 的 ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 每个 delay level 对应一个 queueId,如果队列不存在,则等待100ms后重试
if (cq == null) {
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
// 从 ConsumeQueue 获取 index buffer(20 bytes 一条)
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ == null) {
// 若未找到 buffer,则修正 offset 并等待100ms后重试
resetOffset = this.offset;
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 遍历 ConsumeQueue 内容
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// CommitLog 物理偏移
long offsetPy = bufferCQ.getByteBuffer().getLong();
// CommitLog 物理大小
int sizePy = bufferCQ.getByteBuffer().getInt();
// deliver timestamp
long tagsCode = bufferCQ.getByteBuffer().getLong();
// 判断是否到期投递
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown > 0) {
// 如果这条消息还没到期,则整个队列后续消息也肯定没到期,所以等待100ms后重试
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
// 到期后,从 CommitLog 读取真正的消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
// 调用 messageTimeup() 重新构建消息
// 1.恢复原始 Topic;2.恢复原始 queueId;3.移除 delayLevel 属性(不再是延迟消息)
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 使用同步/异步方式重新投递到真实 Topic
boolean deliverSuc;
if (ScheduleMessageService.this.enableAsyncDeliver) {
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
} else {
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
}
// 处理失败情况
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
// 处理完成后移动 offset
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
// 最终调度下一次任务
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
}
}
六、消费进度管理
文件位置:broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
这段代码是管理消费者在每个 Topic 的每个队列上的消费进度(消费 offset):
- ConsumerGroup + Topic + QueueId → offset
- Broker 负责持久化这个消费进度
- Consumer 提交消费进度时调用 commitOffset
- Consumer 查询消费进度时调用 queryOffset
public class ConsumerOffsetManager extends ConfigManager {
protected static final String TOPIC_GROUP_SEPARATOR = "@";
// key:topic@group,value:key是queueId,value是offset
// 每个 Topic 在 RocketMQ 中是 多队列(分区) 的,每个 queueId 都有自己的消费进度
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
// 组装key
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
// 不存在,说明是第一次提交,初始化并将数据放入
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
// 已存在,更新 offset
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
}
/**
* 查询offset
*/
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
}
设计要点总结
1 存储设计精华
- 顺序写入:所有消息顺序追加到CommitLog,最大化磁盘IO性能
- 内存映射:使用MappedByteBuffer实现零拷贝,提升读写效率
- 索引分离:ConsumeQueue只存储索引,数据与索引分离便于管理
- 固定长度索引:每个索引项20字节,支持O(1)随机访问
2 性能优化关键
- 异步构建:ReputMessageService后台构建索引,不阻塞主流程
- 批量刷盘:支持同步/异步刷盘策略,平衡性能与可靠性
- 长轮询:推拉结合模式,平衡实时性与服务端压力
- 线程隔离:网络IO、业务处理、刷盘操作线程分离
3 高可用保障
- 文件滚动:大文件拆分为固定大小文件,便于管理和恢复
- 偏移量管理:精确的偏移量计算,保证消息不丢失不重复
- 超时机制:长轮询请求超时控制,避免资源耗尽
通过这些精妙的设计,RocketMQ能够在保证数据可靠性的同时,实现极高的吞吐量和低延迟,成为企业级消息中间件的优秀选择。