消息中间件程序员RocketMQ源码解读

RocketMQ存储文件与内存映射

2020-12-03  本文已影响0人  93张先生

概览

RocketMQ的消息存储主要是在${ROCKETMQ_HOME}/store文件夹下,message消息主要存储在commitlog文件夹下,RocketMQ消息存储和索引是分开隔离的,已Topic为主题的消息索引存储在consumequeue文件夹下,通过MessageQueue映射为ConsumeQueue的文件就存储在这个文件夹下,然后index主要是以消息key和offset的对应关系,以类似HashMap的方式存储,方便消息查询。

image.png

本片文章主要介绍消息存储组织结构、Message是如何快速存储都MappedFile文件中的。MappedFile文件就是一个个以首条消息的offset为名称的存储文件,如上图commitlog文件夹下展示的00000000000000000000、00000000001073741824等,每一个mappedFile文件的大小约为102410241024=1G。

image.png

DefaultMessageStore

DefaultMessageStore是消息相关操作的主要服务,包括消息的存储、查询、定时清除等等。这里主要介绍其中消息存储相关的事物,包括是否开启TransientStorePool临时消息存储池,一次创建2个MappedFile文件的AllocateMappedFileService消息存储预创建服务,还有历史存储文件mappedFile加载加载到直接内存MappedByteBuffer和对应的mmap文件映射等。

# org.apache.rocketmq.store.DefaultMessageStore

// MappedFile 分配服务
private final AllocateMappedFileService allocateMappedFileService;

// 是否开启
// 消息临时存储
private final TransientStorePool transientStorePool;

this.transientStorePool = new TransientStorePool(messageStoreConfig);
// 根据是否开启 transientStorePoolEnable,存在两种初始化情况。
// transientStorePoolEnable 为 true 表示内容先存储在堆外内存(直接内存),然后通过 Commit 线程将数据提交到FileChannel中,再通过 Flush 线程将内存映射 Buffer 中的数据持久化到磁盘中。
if (messageStoreConfig.isTransientStorePoolEnable()) {
    this.transientStorePool.init();
}
//加载历史mappedFile文件,进行便于文件查询和消费
// load Commit Log
result = result && this.commitLog.load();

TransientStorePool

TransientStorePool是短暂的消息存储池。这里先进行简单介绍,具体作用到应用的时候详细介绍。这里直接开辟默认5个1G的直接内存ByteBuffer,用来临时存储消息。它还引入了内存锁的机制,避免直接内存的数据被替换到系统中的Swap分区中,提高系统存储性能,使RocketMQ消息低延迟、高吞吐量。

public class TransientStorePool {
    // availableBuffers 个数,可通过在broker中配置文件中设置 transientStorePool,默认值为 5
    private final int poolSize;
    // 每个 ByteBuffer 大小,默认为 mappedFileSizeCommitLog,表明 TransientStorePool 为 commitlog 文件服务
    private final int fileSize;
    // 直接内存,ByteBuffer 容器,双端队列
    private final Deque<ByteBuffer> availableBuffers;

    /**
     * 创建默认的堆外内存
     * It's a heavy init method.
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            // 利用 NIO 直接直接分配,堆外内存(直接内存),在系统中的内存,非 JVM 内存
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            // 内存地址
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            // 内存锁定
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }
}   

CommitLog

CommitLog主要有消息的刷盘的存储服务、消息的刷盘服务,存储消息的回调等等,这里主要介绍MappedFileQueue,它是对${ROCKET_HOME}/store/commitlog目录的封装,主要用来管理多个MappedFile。

public class CommitLog {
    // 映射文件队列,ROCKETMQ_HOME/commitlog 文件夹下的文件对应
    protected final MappedFileQueue mappedFileQueue;
    // 默认消息存储服务
    protected final DefaultMessageStore defaultMessageStore;
    // commitLog 刷盘操作
    private final FlushCommitLogService flushCommitLogService;

    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    // 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
    private final FlushCommitLogService commitLogService;
    // 存储消息到 mappedFile 的回调映射
    private final AppendMessageCallback appendMessageCallback;
    // 消息解码服务线程
    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;

    // topic-queue-id,offset;消息的key,和在 commitlog 中的 offset,方便消息存储时的索引
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        // 在这里组织 commitlog 的对应的 MappedFile 文件,然后进行相应的文件操作,文件映射,刷线到磁盘文件
        this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
            defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());

        this.defaultMessageStore = defaultMessageStore;
        // 异步、同步刷盘服务初始化
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // 同步刷盘服务为 GroupCommitService
            this.flushCommitLogService = new GroupCommitService();
        } else {
            // 异步刷盘服务为 FlushRealTimeService
            this.flushCommitLogService = new FlushRealTimeService();
        }
        // 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
        this.commitLogService = new CommitRealTimeService();
        // 存储消息到 mappedFile 的回调映射
        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
            @Override
            protected MessageExtBatchEncoder initialValue() {
                return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            }
        };
        // putMessage 到 mappedFile 时是否使用可重入锁,默认使用自旋锁
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();

    }
}

MappedFileQueue

MappedFile 的存储集合和管理器,是对 ${ROCKET_HOME}/store/commitlog 文件夹的封装。主要用来管理MappedFile文件,包括消息的查询、提交、落盘的刷新,历史MappedFile文件的预热加载和直接内存映射mmap操作,过期文件的删除、追加消息的最后一个MappedFile文件的获取和创建等。

public class MappedFileQueue {
    // 存储路径${ROCKET_HOME}/store/commitlog,该目录下会存在多个内存映射文件
    private final String storePath;
    // 单个文件的存储大小
    private final int mappedFileSize;
    // mappedFile 文件集合
    // 一个线程安全的 ArrayList 的变种,通过可 reentrantLock 可重入锁实现数组的新建和数组旧有内容的 copy 到新建的数组,然后返回新建的数组
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
    // 创建 MappedFile 服务类
    private final AllocateMappedFileService allocateMappedFileService;
    // 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘
    // MappedFile 中的 MappedByteBuffer 中数据写入磁盘的指针,该指针之前的所有数据全部持久化到磁盘
    private long flushedWhere = 0;

    // Java 应用程序态数据要写入nio内存映射的ByteBuffer的提交了位置的指针
    // commitWhere 只有开启 transientStorePool 的前提下才有作用;
    // commitWhere 代表着 transientStorePool 中直接内存 ByteBuffer 需要提交数据到 MappedByteBuffer 直接内存的,位置为已经提交了数据的位置。下次要提交的开始位置,上次提交的结尾位置。
    private long committedWhere = 0;


    /**
     * 项目启动,加载 commitlog 文件夹下对应的文件
     * @return
     */
    public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            // 根据文件名(offset)排序
            Arrays.sort(files);
            for (File file : files) {
                // 如果物理文件大小 != mappedFileSize,说明文件被破坏了,返回false
                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                    // 更新 mappedFile 文件指针
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    // 加入映射文件集合
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

    /**
     * 获取最后存储消息的映射mappedFile
     *
     * @param startOffset mappedFile 开始写文件的offset
     * @param needCreate 是否需要创建新的 mappedFile 文件
     * @return
     */
    //
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 创建映射文件的起始偏移量,也就是即将的mappedfile文件名称
        long createOffset = -1;
        // 获取最后一个映射文件,如果为null或者写满则会执行创建逻辑
        MappedFile mappedFileLast = getLastMappedFile();
        // mappedFileLast == null,表示需要创建新的 mappedFile 文件,创建新文件的offset值;
        if (mappedFileLast == null) {
            // 计算将要创建的映射文件的起始偏移量
            // 如果startOffset<=mappedFileSize则起始偏移量为0
            // 如果startOffset>mappedFileSize则起始偏移量为是mappedFileSize的倍数
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }
        // 映射文件满了,创建新的映射文件
        if (mappedFileLast != null && mappedFileLast.isFull()) {
            // 创建的映射文件的偏移量等于最后一个映射文件的起始偏移量  + 映射文件的大小(commitlog文件大小)
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
        // 创建新的映射文件
        if (createOffset != -1 && needCreate) {
            // 构造commitlog 文件名称
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            // nextNextFilePath,预先创建下一个 mappedFile 文件,通过 allocateMappedFileService 服务,一起创建两个文件,预先创建下一个文件
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;
            // 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
            // 如果上述方式失败则通过new创建映射文件
            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                // 是否是 MappedFileQueue 队列中第一个文件
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }
}

MappedFile

CommitLog、MappedFileQueue、MappedFile的关系如图:


image.png

MappedFile是RocketMQ消息存储的终极Boss,重中之重。涉及MapedFile的预创建和映射、历史数据MappedFile的磁盘文件预热。MappedByteBuffer是通过NIO方式创建的内存映射对象。ByteBuffer writeBuffer是直接内存从TransientStorePool中借来的,他们两个是在内存中用来存放消息的,其中区别下面详细介绍。这里先从CommitLog文件存放消息说起。

public class MappedFile extends ReferenceResource {
    // 当前JVM实例中 MappedFile 虚拟内存
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
    // 当前JVM实例中MappedFile对象个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 即将写入消息的mappedFile 的位置
    // 当前 MappedFile 文件的写指针,从 0 开始(内存映射文件的写指针)
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 当前文件的提交到 MappedBuffer的指针,如果开启 transientStorePoolEnable,则数据会存储在 TransientStorePool 中,然后提交到内存映射 ByteBuffer 中,再刷写到磁盘
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 刷写到磁盘指针,该指针之前的数据持久化到磁盘中
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    // 堆外内存 ByteBuffer,如果不为空,数据首先将存储在该 Buffer 中,然后提交到 MappedFile 对应的内存映射文件 Buffer。
    // transientStorePoolEnable 为true时不为空。
    protected ByteBuffer writeBuffer = null;
    // 堆内存池,transientStorePoolEnable 为true 时启用
    protected TransientStorePool transientStorePool = null;
    // 文件名称
    private String fileName;
    // mappedFile 文件的开始偏移量地址
    private long fileFromOffset;
    // 物理文件
    private File file;
    // NIO 物理文件对应的内存映射Buffer
    private MappedByteBuffer mappedByteBuffer;
    // 文件最后一次内容写入的时间
    private volatile long storeTimestamp = 0;
    // 是否是 MappedFileQueue 队列中第一个文件
    private boolean firstCreateInQueue = false;

}

CommitLog#putMessage方法是来存放消息的,存放消息到系统内存映射中,并没有落入磁盘中,等待同步刷盘、或者异步刷盘,然后是消息存储的高可用。

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    // 消息存储时间
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    // 循环冗余校验码,检测数据在网络中传输是否发生变化
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;
    // 存储服务统计功能服务
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    // topic
    String topic = msg.getTopic();
    int queueId = msg.getQueueId();
    // 事务回滚消息标志
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 如果是非事务消息,或者事务消息的 commit 操作;进而判断是不是延迟消息,存储到特殊的延迟消息队列;然后事务消息存储也进行了同样的消息 topic 的转换,从而实现了消息的事务;事务消息非提交阶段,先进行另一个 topic 的储存,如果事务提交了,才进行,存储到消息的真正的topic 中去。
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        // 如果是延迟级别消息
        if (msg.getDelayTimeLevel() > 0) {
            // 设置消息延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 延迟消息topic
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 延迟消息消息队列Id
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            // 将真实的 topic 放入 message 属性中
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 替换为延迟消息topic
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    // 消息诞生地址 ipv6 设置
    InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setBornHostV6Flag();
    }
    // 消息存储地址 ipv6 设置
    InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setStoreHostAddressV6Flag();
    }

    long elapsedTimeInLock = 0;

    MappedFile unlockMappedFile = null;
    // 最后一个 消息 存储 commitlog 消息映射文件
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    // 自旋锁 或 可重入锁,上锁;消息写入 commitlog 的映射文件是串行的
    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        // 开始上锁时间
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        // 消息存储时间,确定消息全局有序
        msg.setStoreTimestamp(beginLockTimestamp);
        //mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件
        //mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise;新文件,造成脏数据
        }
        // mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
        }
        // 追加消息到 mappedFile 文件中
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            // 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
            case END_OF_FILE:
                // 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
                unlockMappedFile = mappedFile;
                // broker 重新开辟,新的 commitlog 文件
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                // 开辟成功,再将消息写入
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
        }
        // 存储消息花费时间
        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        // 最后释放存储消息的锁
        putMessageLock.unlock();
    }
    // 存储消息花费时间 > 500
    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }
    // 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        // 解锁 mappedFile 的内存锁定
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics
    // topic 下存放消息次数
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    // topic 下存放消息字节数
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    // handle 硬盘刷新
    handleDiskFlush(result, putMessageResult, msg);
    // handle 高可用
    handleHA(result, putMessageResult, msg);
    // 返回存储消息的结果
    return putMessageResult;
}
PutMessage重要步骤
  1. 获取上次最后一个写入消息的存储文件MappedFile,MappedFile文件的获取在后面会详细接受。
// 最后一个 消息 存储 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
  1. 向MappedFile文件追加消息,如果返回END_OF_FILE代表这个整备追加消息的MappedFile文件写满了,不够存储本条消息,然后再去获取这最后下一个MappedFile文件。
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
case END_OF_FILE:
    // 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
    unlockMappedFile = mappedFile;
    // broker 重新开辟,新的 commitlog 文件
    // Create a new file, re-write the message
    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
    if (null == mappedFile) {
        // XXX: warn and notify me
        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
    }
    // 开辟成功,再将消息写入
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
  1. 写MappedFile文件是会被mlock内存锁定,防止被交换到Swap分区中,写满的MappedFile文件进行锁定解除。
// 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
    // 解锁 mappedFile 的内存锁定
    this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
  1. 写入内存的消息进行刷盘,然后是HA消息存储的高可用,Broker存储消息的复制,这两部分内容也很重要,下次在介绍,在本章不是重点内容。
// handle 硬盘刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
最后一个MappedFile的获取

这是MappedFile设计的经典,现在重点介绍。创建MappedFile对象有两种方式。
第一种:通过构造方法,new MappedFile()一个对象。然后进行MapepdFile对象MappedByteBuffer的内存映射。

mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);

public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}
/**
 * MappedFile 初始化,并做好 mappedFile 和 mappedByteBuffer 的NIO 直接内存映射关系
 *
 * @param fileName 物理文件路径
 * @param fileSize mappedFileSize 文件大小
 * @throws IOException
 */
private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    // 物理文件名称
    this.file = new File(fileName);
    // 文件开始位置
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
    // 确保文件路径存在,不存在,进行路径文件创建
    ensureDirOK(this.file.getParent());

    try {
        // 通过 RandomAccessFile 创建读写文件通道,并将文件内容使用NIO 的内存映射 Buffer 将文件映射到内存中
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        // 物理文件对应的内存映射Buffer
        // 通过 NIO 文件通道和mappedFileSize 大小,创建内存映射文件 mappedByteBuffer
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        // 当前JVM实例中 MappedFile 虚拟内存
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        // 当前JVM实例中MappedFile对象个数
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

第二种:预创建MappedFile,通过allocateMappedFileService服务一次创建两个MappedFile对象。

// 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
// 如果上述方式失败则通过new创建映射文件
if (this.allocateMappedFileService != null) {
    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
        nextNextFilePath, this.mappedFileSize);
} else {
    try {
        mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
    } catch (IOException e) {
        log.error("create mappedFile exception", e);
    }
}

AllocateMappedFileService

AllocateMappedFileService是预创建MappedFile文件的服务,通过一次构造两个创建MappedFile的AllocateRequest然后放入队列requestQueue中,通过CountDownLatch线程同步协调器等待mmapOperation()方法,创建MappedFile对象,并返回。RocketMQ中预分配MappedFile的设计非常巧妙,下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟。

CountDownLatch协调两个线程之间的通信
image.png
/**
 * 预先创建 MappedFile 文件,只是先创建2个创建mappedFile 文件的请求,放入队列中,具体 mappedFile 文件的创建和文件内存直接映射由 mmapOperation() 方法来实现。
 * @param nextFilePath 创建 mappedFile 文件的全路径名称
 * @param nextNextFilePath 创建下一个 mappedFile 文件的全路径名称
 * @param fileSize 文件大小
 * @return
 */
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    // 默认提交两个请求
    int canSubmitRequests = 2;
    // 是否启用 transientStorePool
    if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        // SLAVE 节点中 transientByteBuffer 即使没有足够的 ByteBuffer,也不支持快速失败
        // 启动快速失败策略时,计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
        if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
            && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
            // 可用的 ByteBuffer - requestQueue,还剩余可用的 ByteBuffer 数量
            canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
        }
    }
    // 创建分配请求
    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    // 判断requestTable中是否存在该路径的分配请求,如果存在则说明该请求已经在排队中
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    //该路径没有在排队
    if (nextPutOK) {
        // byteBuffer 数量不够,则快速失败
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
            this.requestTable.remove(nextFilePath);
            return null;
        }
        // 数量充足的话,将指定的元素插入到此优先级队列中
        boolean offerOK = this.requestQueue.offer(nextReq);
        if (!offerOK) {
            log.warn("never expected here, add a request to preallocate queue failed");
        }
        // 请求数量 -1
        canSubmitRequests--;
    }
    // 下下个请求的处理
    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
    if (nextNextPutOK) {
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
            this.requestTable.remove(nextNextFilePath);
        } else {
            boolean offerOK = this.requestQueue.offer(nextNextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
        }
    }
    // 报错,日志
    if (hasException) {
        log.warn(this.getServiceName() + " service has exception. so return null");
        return null;
    }
    // 下一个分配请求,获取当前请求,然后通过线程协调器CountDownLatch,协调另一个线程进行完mmpOperation操作后,返回创建好的MappedFile文件
    AllocateRequest result = this.requestTable.get(nextFilePath);
    try {
        if (result != null) {
            // 默认等待5s,等待 mmapOperation 操作创建 mappedFile
            // 调用此方法的线程会被阻塞,直到 CountDownLatch 的 count 为 0;等到 mmapOperation() finally countDownLatch 为 0
            boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
            if (!waitOK) {
                log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                return null;
            } else {
                // 成功从 requestTable 中移除请求,并返回 mappedFile 文件
                this.requestTable.remove(nextFilePath);
                return result.getMappedFile();
            }
        } else {
            log.error("find preallocate mmap failed, this never happen");
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
    }

    return null;
}
具体创建MappedFile对象

AllocateMappedFileService服务开启了一个线程,不停地从创建MappedFile对象的请求队列requestQueue中获取AllocateRequest,并实时创建MappedFile对象,并通过CountDownLatch通知putRequestAndReturnMappedFile() 方法已经创建了MappedFile对象,从而获取返回。

/**
 * 开始 mappedFile 文件分配服务,从 requestQueue 中获取创建 mappedFile 的文件请求
 */
public void run() {
    log.info(this.getServiceName() + " service started");
    // 除非停止,否则一直在进行 mmap 映射操作
    while (!this.isStopped() && this.mmapOperation()) {

    }
    log.info(this.getServiceName() + " service end");
}
MmapOperation具体创建MappedFile对象

在这里创建MappedFile对象,也有两种情况,区别在于是否启用TransientStorePool消息暂存池,它里面有默认5个1G的直接内存,可以通过直接内存赋值给MappedFile的writerBuffer对象,省去了开辟内存的时间;还有一种是通过MappedFile的NIO创建的MappedByteBuffer直接内存映射来存储消息,需要进行文件的map映射操作,开辟内存空间。这两种方式的对比会在下面介绍。

第一种:不启用transientStorePool对象,通过构造方法创建。
第二种:通过ServerLoader.load的方式创建,如果失败了,再去尝试构造方法的方式。

if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    try {
        // 为每一个 mappedFile 文件,进行init中的mmap 映射操作
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } catch (RuntimeException e) {
        log.warn("Use default implementation.");
        // spi 加载失败,使用构造方法创建 mappedFile
        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    }
} else {
    // 不开启 transientStorePool,直接内存映射
    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
MappedFile文件预热

通过 mmap 建立内存映射仅是将文件磁盘地址和虚拟地址通过映射对应起来,此时物理内存并没有填充磁盘文件内容。
当实际发生文件读写时,产生缺页中断并陷入内核,然后才会将磁盘文件内容读取至物理内存。针对上述场景,RocketMQ 设计了 MappedFile 预热机制。
当 RocketMQ 开启 MappedFile 内存预热(warmMapedFileEnable),且 MappedFile 文件映射空间大小大于等于 mapedFileSizeCommitLog(1 GB) 时,调用 warmMappedFile 方法对 MappedFile 进行预热。

if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
    .getMappedFileSizeCommitLog()
    &&
    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
    // 对 mappedFile 进行预热
    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}

MappedFile创建后,需要对 MappedFile 文件进行预热,将内存和磁盘映射起来,然后每页写入占位数据0,然后将这些0数据,刷新到磁盘,进行磁盘预热。

当调用Mmap进行内存映射后,OS只是建立了虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。
程序要访问数据时,OS会检查该部分的分页是否已经在内存中,如果不在,则发出一次 缺页中断。X86的Linux中一个标准页面大小是4KB,
那么1G的CommitLog需要发生 1024KB/4KB=256次 缺页中断,才能使得对应的数据完全加载至物理内存中。

为什么每个页都需要写入数据呢?

RocketMQ在创建并分配MappedFile的过程中预先写入了一些随机值到Mmap映射出的内存空间里。原因在于:
仅分配内存并进行mlock系统调用后并不会为程序完全锁定这些分配的内存,原因在于其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。
锁定的内存可能是写时复制的,这个时候,这个内存空间可能会改变。这个时候,写入假的临时值,这样就可以针对每一个内存分页的写入操做会强制 Linux 为当前进程分配一个独立、私有的内存页。

写时复制
写时复制:子进程依赖使用父进程开创的物理空间。
内核只为新生成的子进程创建虚拟空间结构,它们来复制于父进程的虚拟究竟结构,但是不为这些段分配物理内存,它们共享父进程的物理空间,当父子进程中有更改相应段的行为发生时,再为子进程相应的段分配物理空间。
https:www.cnblogs.com/biyeymyhjob/archive/2012/07/20/2601655.html

为了避免OS检查分页是否在内存中的过程出现大量缺页中断,RocketMQ在做Mmap内存映射的同时进行了madvise系统调用,
目的是使OS做一次内存映射后,使对应的文件数据尽可能多的预加载至内存中,降低缺页中断次数,从而达到内存预热的效果。
RocketMQ通过map+madvise映射后预热机制,将磁盘中的数据尽可能多的加载到PageCache中,保证后续对ConsumeQueue和CommitLog的读取过程中,能够尽可能从内存中读取数据,提升读写性能。

public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    // 创建一个新的字节缓冲区,其内容是此缓冲区内容的共享子序列
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    // warmMappedFile 每间隔 OS_PAGE_SIZE 向 mappedByteBuffer 写入一个 0,此时对应页恰好产生一个缺页中断,操作系统为对应页分配物理内存
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // 刷盘方式是同步策略时,进行刷盘操作
        // 每修改 pages 个分页刷一次盘,相当于 4096 * 4k = 16M,每 16 M刷一次盘,1G 文件 1024M/16M = 64 次
        // force flush when flush disk type is sync
        // 如果刷盘策略为同步刷盘,需要对每个页进行刷盘
        if (type == FlushDiskType.SYNC_FLUSH) {
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        // 防止垃圾回收
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    // 前面对每个页,写入了数据(0 占位用,防止被内存交互),进行了刷盘,然后这个操作是对所有的内存进行刷盘。
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        // 刷盘,强制将此缓冲区内容的任何更改写入包含映射文件的存储设备
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);
    //通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。
    this.mlock();
}

通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。

public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    // RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。
    {
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}

MappedByteBuffer VS WriterBuffer

MappedByteBuffer和WriterBuffer都是MappedFile对象的成员属性,都是用来存放消息的,只有开启了TransientStorePool,才会向writerBuffer直接内存写入消息,然后commit消息到FileChannle中,然后再flush到磁盘。否则就是存储在NIO创建的MappedByteBuffer直接内存中,然后刷新到磁盘。

image.png

从TransientStorePool中借取的MappedFile中的writerBuffer与MappedFile的MappedByteBuffer在数据处理上的差异在什么地方呢?

分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。

那么与直接使用MappedByteBuffer相比差别在什么地方呢?

MappedByteBuffer 和 WriteBuffer 都会经过,PageCache 这个操作进行写入磁盘。
MappedByteBuffer写入数据,写入的是MappedByteBuffer映射的磁盘文件对应的Page Cache,可能会慢一点。而TransientStorePool方案下写入的则为纯粹的内存,并不是PageCache,因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。然后再经过刷盘将直接内存中的数据经过Page Cache 写入磁盘。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。

Mmap的写入操作是:Mmap的MappedByteBuffer映射直接内存,直接内存映射文件,然后文件会对应Page Cache,也就是 MmapedByteBuffer的直接内存可能是Page Cache的东西,然后通过写Page Cache,然后再写入磁盘。

FileChannle:是写直接内存,这个效率比较高,然后直接内存满了,在落盘的时候,再去经过Page Cache,落入磁盘。WriterBuffer的写入方式实际也就是FileChannel的写入方式,Mmap在写入4k一下的文件比较快,然后FileChannel写入文件大于4k时,比Mmap方式的要快,可能是因为PageCache 是4k,然后写着就可能去落盘了。而FileChannel 是写满了直接内存,才去经过PageCache,这样写入直接内存的效率更高,然后再经过Page Cache,当大于4k的时候,大于Page Cache的内存的时候,就是FileChannel快了。大概因为FileChannel是基于Block(块)的。

Mmap VS FileChannle参考https://juejin.cn/post/6844903842472001550

上一篇下一篇

猜你喜欢

热点阅读