消息中间件

RocketMQ源码-Index索引介绍

2019-07-22  本文已影响3人  persisting_


1 概述
2 入口方法介绍
3 索引结构介绍
4 索引操作
5 索引查询

1 概述

RocketMQ中Broker在收到生产者发送的消息时,会将消息存储下来,写入CommitLog,但是此时消息是不可消费也不可查询的。需要等待专门的服务对刚写入的消息进行Reput操作,将消息信息记录到ConsumeQueueIndex中,消息记录到ConsumeQueue完成后该消息就可被消费,消息完成索引到Index中之后就可以根据时间戳和Key进行查询了。

DefaultMessageStore中用于进行Reput操作的服务实现类为ReputMessageService,该类扩展自ServiceThread,自身就是一个线程,其run方法定义如下:

//ReputMessageService
//@Override
public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //sleep(1)则保证基本上时刻在尝试数据构建,
            //如果速度够快,即消息刚完成写入就进行reput操作
            //则消息还在缓冲中,可避免从硬盘读取数据
            Thread.sleep(1);
            //进行reput操作,完成ConsumeQueue和Index
            //数据的构建
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

ReputMessageService.doReput方法具体实现不展开介绍,就是将还没有进行ConsumeQueueIndex构建的消息提取出来,进行ConsumeQueueIndex构建。

构建ConsumeQueue则入口类为CommitLogDispatcherBuildConsumeQueue,而Index的构建入口类为CommitLogDispatcherBuildIndex

本文我们主要介绍IndexService的实现。

2 入口方法介绍

第一节概述已经提到,Index构建的口类为CommitLogDispatcherBuildIndex,其源码如下:

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        //如果启用了索引,则调用indexService.buildIndex
        //进行索引构建
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

3 索引结构介绍

IndexFile是索引结构的具体实现,因为索引也会持久化到硬盘中,所以IndexFile也通过MappedFile进行文件写入操作,关于MappedFile的介绍可以参考笔者文章RocketMQ源码-MappedFile介绍

RocketMQ中的索引文件分为三个部分,分别为头部、SlotTable和index,如下图所示:

索引文件结构.jpg

默认的Slot数量为5000000,默认的index数量为4*5000000个,可配置。

因为可能消息数目较多,一个索引文件不能保存所有的消息索引信息,所以会使用多个索引文件,索引文件的头部保存了偏移等信息,其结构如下图所示:

index header.jpg

各字段含义如下:

  1. beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
  2. endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
  3. beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
  4. beginPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
  5. hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
  6. indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes

每个消息在索引文件尾部的占用一个节点,保存key的hash,还保存了该消息在消息文件中的物理位置,插入时间,解决hash冲突用的上一个冲突索引的位置,具体结构如下图所示:

index结构.jpg
  1. key hash value: message key的hash值
  2. phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
  3. timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
  4. prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。

上面截图和部分说明引用自文章rocketMq - index介绍

根据加入索引的时间依次放置,第一个加入索引的放在索引的第一个位置,第二个则在索引的第二个位置,以此类推。

每个消息根据key的hash值被映射到slotTable节点上,在对应的slot节点上保存的是其在索引的位置,如果发生冲突,即该slot上的值不为0,则表示已经有其他消息索引占用了该slot,那么使用链表方法处理冲突,该slot更新为最新索引的消息在索引中的位置,先前加入的冲突索引位置则记录在该索引的prevIndex字段中。

使用的映射方法如下:

//keyHash为key的hash,hashSlotNum默认500w
//也就是hash算法为求余法
int slotPos = keyHash % this.hashSlotNum;

4 索引操作

在介绍索引操作之前,我们先看下建立索引依赖的消息key字段到底是什么,其实根据源码发现就是消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX属性,该属性可在生产者生产消息时自己指定,如果不指定则会在发送之前调用MessageClientIDSetter.setUniqID(msg);进行初始化,具体如何产生唯一的ID算法这里不做介绍。

索引操作主要实现在IndexFile.putKey方法中:

//IndexFile
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //如果该indexFile还没有达到最大的索引数目,则
    //可以继续写入,否则发挥失败
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //计算key的hash值
        int keyHash = indexKeyHashMethod(key);
        //根据hash值计算在slotTable中的位置
        int slotPos = keyHash % this.hashSlotNum;
        //根据在slotTable中的位置、索引头部大小(40b)、
        //每个slot的大小(4b)计算该slot在文件中的物理位置
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            //获取slotTable该slot位置上的值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            //如果值小于invalidIndex(该值为0)或者值大于
            //当天存在的索引个数,则置为0
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
            //根据消息的存储时间和该索引文件头部记录的
            //第一个消息存储时间计算时间差
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            //计算该索引在索引链表中的位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;

            //先存key的hash
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            //记录该消息的在消息文件中的物理位置
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            //记录落地时间差
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            //记录上一个落在该位置上的冲突索引位置
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            //在slotTable中记录该消息在索引中的位置
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            //如果是第一次索引消息,则记录开始物理偏移和
            //开始时间
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            //每次追加一个新的索引,递增slotTable中slot
            //数量、索引数量
            //同时每次新增的索引也就是最后一个索引,记录
            //最后一个索引物理偏移以及最后落地时间
            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    return false;
}

5 索引查询

索引查询方法为IndexFile.selectPhyOffset:

//IndexFile
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    final long begin, final long end, boolean lock) {
    if (this.mappedFile.hold()) {
        //根据key计算hash
        int keyHash = indexKeyHashMethod(key);
        //根据hash值计算在slotTable中的位置
        int slotPos = keyHash % this.hashSlotNum;
        //根据在slotTable中的位置、索引头部大小(40b)、
        //每个slot的大小(4b)计算该slot在文件中的物理位置
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;
        try {
            if (lock) {
                // fileLock = this.fileChannel.lock(absSlotPos,
                // hashSlotSize, true);
            }

            //获取该slot位置上的值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // if (fileLock != null) {
            // fileLock.release();
            // fileLock = null;
            // }
            //如果slot上的值为无效值或者该值大于最大索引
            //数量,则表示没有符合条件的索引数据,不作任何
            //操作
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                || this.indexHeader.getIndexCount() <= 1) {
            } else {
                //下面的实现也比较简单,因为使用链地址法
                //解决hash冲突,所以这里读取链表上的每个
                //数据,如果时间满足要求并且key的hash
                //一致,则加入到返回列表中
                for (int nextIndexToRead = slotValue; ; ) {
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }

                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + nextIndexToRead * indexSize;

                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                    if (timeDiff < 0) {
                        break;
                    }

                    timeDiff *= 1000L;

                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    //消息落地时间符合要求
                    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                    //key的hash一致且消息落地时间符合要求
                    if (keyHash == keyHashRead && timeMatched) {
                        //加入到返回列表中
                        phyOffsets.add(phyOffsetRead);
                    }

                    if (prevIndexRead <= invalidIndex
                        || prevIndexRead > this.indexHeader.getIndexCount()
                        || prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break;
                    }
                    //读取链表中的下一个冲突索引
                    nextIndexToRead = prevIndexRead;
                }
            }
        } catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }

            this.mappedFile.release();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读