消息中间件选型

RocketMQ源码解析(六)-Broker#MessageSt

2019-01-10  本文已影响0人  空挡

Broker将消息存储抽象成MessageStore接口,默认实现类是DefaultMessageStore。主要提供如下方法:

数据结构图

MessageStore数据结构图
【注】以上图片转载自博客RocketMQ消息存储流程图及数据结构

数据结构

通过上面的图可以看到消息存储涉及到一下几个数据结构:
CommitLog,存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量。举个例子,当前commitLog文件的大小是12413435字节,那下一条消息到来后它的offset就是12413436。这个说法不是非常准确,但是offset大概是这么计算来的。commitLog并不是一个文件,而是一系列文件(上图中的MappedFile)。每个文件的大小都是固定的(默认1G),写满一个会生成一个新的文件,新文件的文件名就是它存储的第一条消息的offset。
ConsumeQueue,既然所有消息都是存储在一个commitLog中,但是consumer是按照topic+queue的维度来消费消息的,没有办法直接从commitLog中读取,所以针对每个topic的每个queue都会生成consumeQueue文件。ConsumeQueue文件中存储的是消息在commitLog中的offset,可以理解成一个按queue建的索引,每条消息占用20字节(上图中的一个cq)。跟commitLog一样,每个Queue文件也是一系列连续的文件组成,每个文件默认放30w个offset。
IndexFile,CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上。根据msgKey来查询消息时,可以先到IndexFile中查询offset,然后根据offset去commitLog中查询消息详情。

线程服务

MessageStore除了上面的数据结构以外,还需要相应的服务来对数据做操作。
IndexService,负责读写IndexFile的服务
ReputMessageService,消息存储到commitLog后,MessageStore的接口调用就直接返回了,后续由ReputMessageService负责将消息分发到ConsumeQueueIndexService
HAService,负责将master-slave之间的消息数据同步
以上就是MessageStore的整体结构了,下面看下它的启动过程。

MessageStore启动

启动入口在DefaultMessageStore.start()方法:

public void start() throws Exception {
        //1、写lock 文件,尝试获取lock文件锁,保证磁盘上的文件只会被一个messageStore读写
        lock = lockFile.getChannel().tryLock(0, 1, false);
        if (lock == null || lock.isShared() || !lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }

        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
        lockFile.getChannel().force(true);
        //2、启动FlushConsumeQueueService,是一个单线程的服务,定时将consumeQueue文件的数据刷新到磁盘,周期由参数flushIntervalConsumeQueue设置,默认1sec
        this.flushConsumeQueueService.start();
        //3、启动CommitLog
        this.commitLog.start();
        //4、消息存储指标统计服务,RT,TPS...
        this.storeStatsService.start();
        //5、针对master,启动延时消息调度服务
        if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
        //6、启动reputMessageService,该服务负责将CommitLog中的消息offset记录到cosumeQueue文件中
        if (this.getMessageStoreConfig().isDuplicationEnable()) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        } else {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        }
        this.reputMessageService.start();
        //7、启动haService,数据主从同步的服务
        this.haService.start();
        //8、对于新的broker,初始化文件存储的目录
        this.createTempFile();
        //9、启动定时任务
        this.addScheduleTask();
        this.shutdown = false;
    }

以上就是整个MessageStore服务启动的过程,其中有几项下面解释一下:

    public void start() {
        //加载刷盘服务
        this.flushCommitLogService.start();
        //storePool flush
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }

FlushCommitLogService,跟第2步类似的,该服务负责将CommitLog的数据flush到磁盘,针对同步刷盘和异步刷盘,有两种实现方式
CommitLogService,这个service只有在采用内存池缓存消息的时候才需要启动。在使用内存池的时候,这个服务会定时将内存池中的数据刷新到FileChannel中,这个我们后面讲CommitLog的文章中再详细讲。

  1. 定时清理过期的commitLog、cosumeQueue和Index数据文件, 默认文件写满后会保存72小时
  2. 定时自检commitLog和consumerQueue文件,校验文件是否完整。主要用于监控,不会做修复文件的动作。
  3. 定时检查commitLog的Lock时长(因为在write或者flush时侯会lock),如果lock的时间过长,则打印jvm堆栈,用于监控。

以上就是整个启动的过程了,后续的文章开始讲解Broker是怎样接收Producer消息,还有怎样将消息交给Consumer的。

上一篇 下一篇

猜你喜欢

热点阅读