Rocket MQ : 拒绝神化零拷贝

2022-11-25  本文已影响0人  在中国喝Java

笔者能力有限,理解偏差请大家多多指正

  不可否认零拷贝对于Rocket MQ的高性能表现有着积极正面的作用,但是笔者认为只是锦上添花,并非决定性因素。Rocket MQ性能卓越的原因绝非零拷贝就可以一言以蔽之。本文不会侧重点也不在这里,关于零拷贝已经有很多帖子:

预热机制

  Rocket MQ采用内存映射来提高文件I/O访问性能,MappedFile、MappedFileQueue管理存储文件。MappedFileQueue对存储文件进行封装可以理解为MappedFile的管理容器。譬如CommitLog文件存储位置:`${ROCKE_HOME}/store/commitlog/`该目录下存在多个MappedFile文件。

  MappedFile是内存映射的具体实现:构造方法包含文件名称、文件大小、以及一个transientStorePool标识位,如果开启`transientStorePoolEnable`机制则表示内容先存储在堆外内存,而后通过Commit线程将数据提交到FileChannel,而后Flush线程负责持久化。
public MappedFile(String fileName, int fileSize) throws IOException {
    init(fileName, fileSize);
}

public MappedFile(String fileName, int fileSize,
    TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize, transientStorePool);
}
复制代码
  两个构造方法不约而同的都会走init(),我们看两参数的那个即可。
private void init(String fileName, int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        /**
         * fileChannel.map(MapMode mode, long position, long size)
         * 将此 fileChannel 对应的一个区域直接映射到内存中
         */
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        /* 映射内存大小累加 */
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        /* 映射文件个数累加 */
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}
复制代码
  Java中在尝试文件映射的时候提供三种模式:
public MappedFile getLastMappedFile(long startOffset) {
    return getLastMappedFile(startOffset, true);
}
复制代码
  getLastMappedFile方法会往**AllocateMappedFileService#requestQueue**阻塞队列提交**AllocateRequest**任务。AllocateMappedFileService服务线程此时会被唤醒执行**mmapOperation**方法。大致流程:
/* 是否开启内存读写分离 */
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    try {
        /* Rocket允许自己定制实现细节 */
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } catch (RuntimeException e) {
        /* 没有自定义实现,使用系统默认实现 */
        log.warn("Use default implementation.");
        /* 注意这里三参构造 */
        mappedFile = new MappedFile(
            req.getFilePath(),
            req.getFileSize(), 
            messageStore.getTransientStorePool()
        );
    }
}
else {
    /* 注意这里两参构造 */
    mappedFile = new MappedFile(
        req.getFilePath(),
        req.getFileSize()
    );
}
复制代码
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        /* force flush when flush disk type is sync */
        if (type == FlushDiskType.SYNC_FLUSH) {
            /* 每写入 pages 个内存页时刷盘一次 */
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        /* prevent gc */
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    /* force flush when prepare load finished */
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime
        );
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}",
        this.getFileName(), System.currentTimeMillis() - beginTime
    );

    /* !!! 这一行超级重要 !!! */
    this.mlock();
}
复制代码

mlock

#include <sys/mman.h>

int mlock(const void *addr, size_t len);
int mlock2(const void *addr, size_t len, int flags);
int munlock(const void *addr, size_t len);

int mlockall(int flags);
int munlockall(void);
复制代码
  **总结一下就是Rocket MQ为了自身的高性能拒绝内存被操作系统交换**

madvise

  为了防止剧透,刚刚一直没有带大家看看MappedFile#mlock其实该方法还有别的妙处。
public void mlock() {
    long beginTime = System.currentTimeMillis();
    long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    {
        /**
         * MADV_WILLNEED 表示应用程序希望很快访问此地址范围
         */
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}
复制代码
  为了更加极致的性能体验,Linux操作系统暴露了`madvise sysytem call `,madvise()系统调用,用于向内核提供对于起始地址为`addr`,长度为`length`的**内存空间的操作建议或者指示**。在大多数情况下,此类建议的目标是提高系统或者应用程序的性能。
#include <sys/mman.h>

int madvise(void *addr, size_t length, int advice);
复制代码
  最初,此系统调用,仅仅支持一组常规的(conventional)建议值,这些建议值在各种系统中也有实现,(但是请注意,POSIX中并没有指定madvise()),后来又添加了许多特定于Linux的建议值。第三个参数advice其实就是一个标识,根据标识不同Linux内核采取的策略也有所区别。

文件系统设计

  针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。

  当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

  RocketMQ采用混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中),虽然说因为两种索引文件ConsumeQueue和IndexFile导致Rocket MQ其实不是全局的顺序写,但是这两种文件其实足够小,况且索引文件自身也是顺序写,可以说Rocket MQ已经尽最大努力保证全局顺序写了。

硬件加持

  页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

  在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。

  另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

  上面提到的相邻文件预读、Mmap内存映射其实本质原因都是因为可以向内存借力,没有更快更好的内存硬件一切都是空谈。其实软件工程师所能做的相对有限,我们只是在最大限度的发挥硬件的能力。

总结

  笔者认为Rocket MQ高性能的关键是:

引用

上一篇下一篇

猜你喜欢

热点阅读