RocketMq IndexService介绍
系列
- RocketMq broker 配置文件
- RocketMq broker 启动流程
- RocketMq broker CommitLog介绍
- RocketMq broker consumeQueue介绍
- RocketMq broker 重试和死信队列
- RocketMq broker 延迟消息
- RocketMq IndexService介绍
开篇
-
这个系列的主要目的是介绍RocketMq broker的原理和用法,在这个系列当中会介绍 broker 配置文件、broker 启动流程、broker延迟消息、broker消息存储。
-
这篇文章主要介绍broker IndexService,主要介绍IndexService的数据结构和对应的建索引过程。
IndexFile 介绍

-
IndexFile文件的存储位置是:\store\index${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W4+2000W20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。
-
其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。202000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。
-
IndexFile在解决hash冲突的过程中会采用头插法,即所有的冲突数据都往链表的头部进行插入,然后每个新添加的元素都会包含后一个元素的位置,hash对应的slot Table会指向第一个索引元素。在实际元素存储的数据的顺序和查询的顺序是逆向映射的,这点需要理解。
IndexFile创建
public class IndexService {
private static final int MAX_TRY_IDX_CREATE = 3;
private final DefaultMessageStore defaultMessageStore;
private final int hashSlotNum;
private final int indexNum;
private final String storePath;
private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
// private int maxHashSlotNum = 5000000;
// private int maxIndexNum = 5000000 * 4;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
if (!tmp.isWriteFull()) {
indexFile = tmp;
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
if (indexFile == null) {
try {
// 创建的文件名以时间戳作为文件名
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile =
// 文件的hashSlotNum=5000000,indexNum=5000000 * 4
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
if (indexFile != null) {
// 前置文件刷盘
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
}
- IndexService在不存在或者当前文件已满的情况下会创建新的indexFile文件呢。
- indexFile文件的名为当前时间戳、hashSlotNum=5000000,indexNum=5000000 * 4。
Index存储
public class IndexFile {
private static int hashSlotSize = 4;
private static int indexSize = 20;
private static int invalidIndex = 0;
private final int hashSlotNum;
private final int indexNum;
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 在Index的文件没有满的情况下放置索引数据
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 1、针对key计算hash值
int keyHash = indexKeyHashMethod(key);
// 2、记录hash值应该保存的slot的位置
int slotPos = keyHash % this.hashSlotNum;
// 3、计算Index文件当中slotPos对应的实际物理偏移
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// 获取absSlotPos位置记录当前存储的index的位移
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 4、计算index实际的存储的偏移
// 实际位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的偏移(已有index的个数*indexSize)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 5、生成Index的对象放置 keyHash、phyOffset、timeDiff、slotValue(保存的该hash值下前一个index的逻辑位移,也就是第几个index对象)、
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 6、记录SlotPos的当前的index的个数,即逻辑位移。
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
// 设置Index文件整体的index个数和slot个数、commitLog的最后物理偏移和最新的存储时间戳。
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
}
-
IndexFile#putKey实现了整个index文件存储过程,由于IndexFile实现的是类似hash的结果,所以存储过程也跟hash的存储流程比较相似。
-
1、针对key计算hash值,记录hash值应该保存的slot的位置,计算Index文件当中slotPos对应的实际物理偏移。
-
2、根据slotPos对应的实际物理偏移获取该slot下最新的index文件的逻辑位移,即index linked list的第几个。
-
3、计算index实际的存储的偏移,实际位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的物理偏移(已有index的个数*indexSize)。
-
4、生成当前Index的对象放置 keyHash、phyOffset(commitLog的实际偏移量)、timeDiff、slotValue(保存的该hash值下前一个index的逻辑位移,也就是第几个index对象),slotValue起到了链表链接的作用。
-
5、设置Index文件整体的index个数和slot个数、commitLog的最后物理偏移和最新的存储时间戳。