Kafka/RocketMQ存储结构对比

2021-06-21  本文已影响0人  进击的蚂蚁zzzliu

一、Kafka

存储结构.png

Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)(如果没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的)。图中的一串数字 0 是该日志段的起始位移值,也就是该日志段中所存的第一条消息的位移值。

一般情况下,一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端(Broker),这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

文件内容

# 1、执行下面命令即可将日志数据文件内容dump出来
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.log --print-data-log > 00000000000022372103_txt.log

#2、dump出来的具体日志数据内容
Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.log
Starting offset: 22372103
offset: 22372103 position: 0 CreateTime: 1532433067157 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 5d2697c5-d04a-4018-941d-881ac72ed9fd
offset: 22372104 position: 0 CreateTime: 1532433067159 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 0ecaae7d-aba5-4dd5-90df-597c8b426b47
offset: 22372105 position: 0 CreateTime: 1532433067159 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 87709dd9-596b-4cf4-80fa-d1609d1f2087
......
......
offset: 22372444 position: 16365 CreateTime: 1532433067166 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 8d52ec65-88cf-4afd-adf1-e940ed9a8ff9
offset: 22372445 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 5f5f6646-d0f5-4ad1-a257-4e3c38c74a92
offset: 22372446 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 51dd1da4-053e-4507-9ef8-68ef09d18cca
offset: 22372447 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 80d50a8e-0098-4748-8171-fd22d6af3c9b
......
......
offset: 22372785 position: 32730 CreateTime: 1532433067174 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: db80eb79-8250-42e2-ad26-1b6cfccb5c00
offset: 22372786 position: 32730 CreateTime: 1532433067176 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 51d95ab0-ab0d-4530-b1d1-05eeb9a6ff00
......
......
#3、同样地,dump出来的具体偏移量索引内容
Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.index
offset: 22372444 position: 16365
offset: 22372785 position: 32730
offset: 22373467 position: 65460
offset: 22373808 position: 81825
offset: 22374149 position: 98190
offset: 22374490 position: 114555
......
......
#4、dump出来的时间戳索引文件内容
Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.timeindex
timestamp: 1532433067174 offset: 22372784
timestamp: 1532433067191 offset: 22373466
timestamp: 1532433067206 offset: 22373807
timestamp: 1532433067214 offset: 22374148
timestamp: 1532433067222 offset: 22374489
timestamp: 1532433067230 offset: 22374830
......
......
1. 消息日志文件(.log)

消息日志文件就是存储具体的消息内容,默认1G;其主要内容为:

2. 偏移量索引文件(.index)

偏移量索引文件中存储的 offset -> position 的映射关系;

为了解决传统二分查找算法导致的不必要的Page Fault的问题,Kafka进行了改进,把所有索引项分成两部分:热区和冷区,然后分别在这两个区域内执行二分查找算法;

3. 时间戳索引文件(.timeindex)

时间戳索引文件存储 时间戳 -> offset映射关系;

Kafka零拷贝机制:
1.Kafka索引都是基于 MappedByteBuffer 的,也就是让用户态和内核态共享内核态的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap 虽然避免了不必要的拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap 的创建和销毁成本可能是不一样的。很高的创建和销毁开销会抵消 Zero Copy 带来的性能优势。由于这种不确定性,在 Kafka 中,只有索引应用了 mmap,最核心的日志并未使用 mmap 机制。
2.TransportLayer 是 Kafka 传输层的接口。它的某个实现类使用了
FileChannel 的 transferTo 方法。该方法底层使用 sendfile 实现了 Zero Copy。对 Kafka而言,如果 I/O 通道使用普通的 PLAINTEXT,那么,Kafka 就可以利用 Zero Copy 特性,直接将页缓存中的数据发送到网卡的 Buffer 中,避免中间的多次拷贝。相反,如果I/O 通道启用了 SSL,那么,Kafka 便无法利用 Zero Copy 特性了。

二、RocketMQ

1. 文件结构

|-- store
    |-- commitlog
        |-- 00000003384434229248
        |-- 00000003385507971072
        |-- 00000003386581712896 
    |-- consumequeue
        |-- TopicA
            |-- 0
                |-- 00000000002604000000
                |-- 00000000002610000000
            |-- 1
                |-- 00000000002610000000
                |-- 00000000002616000000
        |-- TopicB
            |-- 0
                |-- 00000000000732000000
            |-- 1
                |-- 00000000004610000000
            |-- 3
                |-- 00000000005610000000
            |-- 4
                |-- 00000000006610000000
    |-- index
        |-- 20210620170423012

RocketMQ主要有三中日志文件:

2. 消息存储

消息存储.png

RocketMQ的混合型存储结构采用了数据(commitlog)和索引(index/consumequeue)部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失,因此Consumer也就肯定有机会去消费这条消息,至于消费的时间可以稍微滞后一些也没有太大的关系。
这里,RocketMQ的具体做法是,使用Broker端的后台服务线程ReputMessageService不停地分发请求并异步构建ConsumeQueue和IndexFile。

2.1 commitlog
commitlog.png

RocketMQ的commitLog是WAL的一种,通过对文件的顺序追加写入,提高了文件的写入性能;

2.2 consumequeue
ConsumeQueue.png
2.3 index
IndexFile.png

为了满足根据msgId以及消息key查询消息的需求,每个Broker对应一组indexFile;

三、对比

上一篇下一篇

猜你喜欢

热点阅读