rocketmq之消息存储学习笔记

2019-07-24  本文已影响0人  heyong

一、存储总体结构

image.png

从上面的图中可以看出,Broker都是通过DefaultMessageStore实现数据的存储和读取。消息的存储主要是通过调用DefaultMessageStore.putMessage(),实现消息的存储。

二、消息存储过程

本节将以消息发送存储为突破点,一点一点揭开RocketMQ 存储设计的神秘面纱。消息存储入口: org.apache.rocketmq.store.DefaultMessageStore#putMessage 。

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
                // 如果当前节点是从节点,拒绝写
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

        // 当前broker状态,是否可写
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }
                // 消息主题长度限制
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
                // 消息属性限制
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }
                // 是否os cache刷新繁忙
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }
                // 写数据
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
        return result;
    }

获取到对应的MappedFile写数据

  MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

在写入CommitLog 之前,先申请putMessageLock,也就是将消息存储到CornrnitLog 文件中是串行的。

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                        // 写数据
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
   }

调用appendMessagesInner写数据

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            // 调用slice共享缓冲区
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            // 写入位置
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
                // 追加消息
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 更新写入位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

创建全局唯一id,消息ID 有16 字节,消息ID 组成如图4-4 所示

image.png
            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();

            this.resetByteBuffer(hostHolder, 8);
            String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

消息存储格式

RocketMQ 消息存储格式如下。

  • TOTALSIZE : 该消息条目总长度,4 字节。
  • MAGICCODE : 魔数, 4 字节。固定值Oxdaa320a7 。
  • BODYCRC : 消息体crc校验码, 4 字节。
  • QUEUEID : 消息消费队列ID , 4 字节。
  • FLAG : 消息FLAG , RocketMQ 不做处理, 供应用程序使用,默认4 字节。
  • QUEUEOFFSET :消息在消息消费队列的偏移量, 8 字节。
  • PHYSICALOFFSET : 消息在CommitLog 文件中的偏移量, 8 字节。
  • SYSFLAG : 消息系统Flag ,例如是否压缩、是否是事务消息等, 4 字节。
  • BORNTIMESTAMP : 消息生产者调用消息发送API 的时间戳, 8 字节。
  • BORNHOST :消息发送者IP 、端口号, 8 字节。
  • STORETIMESTAMP : 消息存储时间戳, 8 字节。
  • STOREHOSTADDRESS: Broker 服务器IP+端口号, 8 字节。
  • RECONSUMETIMES : 消息重试次数, 4 字节。
  • Prepared Transaction Offset : 事务消息物理偏移量, 8 字节。
  • BodyLength :消息体长度, 4 字节。
  • Body : 消息体内容,长度为bodyLen th 中存储的值。
  • TopieLength : 主题存储长度, 1 字节,表示主题名称不能超过255 个字符。
  • Topic : 主题,长度为TopieL e n g th 中存储的值。
  • PropertiesLength : 消息属性长度, 2 字节, 表示消息属性长度不能超过6 553 6 个字符。
  • Properties : 消息属性,长度为PropertiesLength 中存储的值。

上述表示CommitLog 条目是不定长的,每一个条目的长度存储在前4 个字节中。

如果消息长度+END_FILE_MIN_BLANK_LENGTH 大于CommitLog 文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE, Broker 会重新创建一个新的CommitLog 文件来存储该消息。

从这里可以看出,每个CommitLog 文件最少会空闲8个字节,高4 字节存储当前文件剩余空间,低4 字节存储魔数: CommitLog.BLANK_MAGIC_CODE 。

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

将消息内容存储到ByteBuffer 中,然后创建AppendMessageResult 。这里只是将消息存储在MappedFile 对应的内存映射Buffer 中,并没有刷写到磁盘。

DefaultAppendMessageCallback#doAppend 只是将消息追加在内存中, 需要根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘。

AppendMessageResult 的属性:

上一篇下一篇

猜你喜欢

热点阅读