RocketMQ底层原理之存储设计
塑造你生活的不是你偶尔做的一两件事,而是你一贯坚持做的事。
——安东尼.罗宾
大纲
图示分布式事务演进及RocketMQ方案
图示业务场景:用户 A 转账 100 元给用户 B,这个业务比较简单,具体的步骤:
1、用户 A 的账户先扣除 100 元。
2、再把用户 B 的账户加 100 元。
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中,因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。
因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。
图示消息中间件的方式,把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到消息中间件;加钱业务订阅“扣款成功消息”,再对用 户 B 加钱(系统怎么知道给用户 B 加钱呢?是消息体里面包含了源账户和目标账户 ID,以及钱数)
场景一:先扣款后向 MQ 发消息
先扣款再发送消息,万一发送消息失败了,那用户 B 就没法加钱
场景二:先向 MQ 发像消息,后扣款
扣款成功消息发送成功,但用户 A 扣款失败,可加钱业务订阅到了消息,用户 B 加了钱
问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。
RocketMq 消息中间件把消息分为两个阶段:半事务阶段和确认阶段。
半事务阶段:
该阶段主要发一个消息到 rocketmq,但该消息只储存在 commitlog 中,但 consumeQueue 中不可见,也就是消费端(订阅端)无法看到此消息。
commit/rollback 阶段(确认阶段):
该阶段主要是把 prepared 消息保存到 consumeQueue 中,即让消费端可以看到此消息,也就是可以消费此消息。如果是 rollback 就不保存。
图示整个流程:
1、A 在扣款之前,先发送半事务消息。
2、发送预备消息成功后,执行本地扣款事务。
3、扣款成功后,再发送确认消息。
4、B 消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱。
注意:上面的确认消息可以为 commit 消息,可以被订阅者消费;也可以是 Rollback消息,即执行本地扣款事务失败后,提交 rollback 消息,即删除那个预备消息,订阅者无法消费。
异常 1:如果发送半事务消息失败,下面的流程不会走下去;这个是正常的。
异常 2:如果发送半事务消息成功,但执行本地事务失败;这个也没有问题,因为此预备消息不会被消费端订阅到,消费端不会执行业务。
异常 3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户 A 扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
RocketMQ如何解决上面的问题,核心思路就是【事务回查】,也就是 RocketMQ会定时遍历 commitlog 中的半事务消息。
异常 3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为 RocketMQ会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送 commit 确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常 2 也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送 Rollback确认消息,把消息进行删除。
同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把 A已经处理完的业务进行回退)
图示如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。
好的方案是设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction中应当已经记录该TransactionId的状态为「已完成」。当RocketMq事务回查时,只需要检查对应的 TransactionId的状态是否是「已完成」就好,而不用关心具体的业务数据。
如果是银行业务,对数据要求性极高,一般 A 与 B 需要进行手动对账,手动补偿。
RocketMQ的存储设计
1.Domain Model
领域模型(Domain Model)是对领域内的概念类或现实世界中对象的可视化标识。又称概念模型、领域对象模型、分析对象模型。它专注于分析问题领域本身,发掘重要的业务领域概念,并建立业务领域概念之间的关系。
图示(1)Message
Message是RocketMQ消息引擎中的主体。messageId是全局唯一的。MessageKey是业务系统(生产者)生成的,所以如果要结合业务,可以使用MessageKey作为业务系统的唯一索引。
代码示例 SendResult源码图示另外Message中的equals方法和hashCode主要是为了完成消息只处理一次(Exactly-Once)。
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
(2)Topic
Tags是在同一Topic中对消息进行分类。
subTopics==Message Queue,其实在内存逻辑中,subTopics是对Topics的一个拓展,尤其是在MQTT这种协议下,在Topic底下会有很多subTopics。
(3)Queue
Queue是消息物理管理单位,比如在RocketMQ的控制台中,就可以看到每一个queue中的情况(比如消息的堆积情况、消息的TPS、QPS)。
(4)Offset
对于每一个Queue来说都有Offset,这个是消费位点。
(5)Group
业务场景中,如果有一堆发送者,一堆消费者,所以这里使用Group的概念进行管理。
2.对应关系
Message与Topic是多对一的关系,一个Topic可以有多个Message。
Topic到Queue是一对多的关系,这个也是方便横向拓展,也就是消费的时候,这里可以有很多很多的Queue。
一个Queue只有一个消费位点(Offset),所以Topic和Offset也是一对多的关系。
Topic和Group也是多对多的关系。
消费并发度、热点问题
1.消费并发度
从上面模型可以看出,要解决消费并发,就是要利用Queue,一个Topic可以分出更多的queue,每一个queue可以存放在不同的硬件上来提高并发。
2.热点问题(顺序、重复)
前面讲过要确保消息的顺序,生产者、队列、消费者最好都是一对一的关系。但是这样设计,并发度就会成为消息系统的瓶颈(并发度不够)RocketMQ 不解决这个矛盾的问题。理由如下:
1、 乱序的应用实际大量存在。
2、 队列无序并不意味着消息无序。
另外还有消息重复,造成消息重复的根本原因是:网络不可达(网络波动)。所以如果消费者收到两条一样的消息,应该是怎么处理?
RocketMQ 不保证消息不重复,如果你的业务要严格确保消息不重复,需要在自己的业务端进行去重。
1、 消费端处理消息的业务逻辑保持幂等性。
2、 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现。
消息存储结构及运转机制
1.消息存储结构
RocketMQ 因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储。所以 RocketMQ 采用文件进行存储。
(1)存储文件
图示>commitLog:消息存储目录
>config:运行期间一些配置信息
>consumerqueue:消息消费队列存储目录
>index:消息索引文件存储目录
>abort:如果存在该文件则Broker非正常关闭
>checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
(2)消息存储结构
图示RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是 CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
>CommitLog:存储消息的元数据。
>ConsumerQueue:存储消息在CommitLog的索引。
>IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程。
图示CommitLog
CommitLog以物理文件的方式存放,每台Broker 上的CommitLog被本机器所有ConsumeQueue共享,文件地址:$ {user.home}\store\${ commitlog}\${ fileName}。在CommitLog中,一个消息的存储长度是不固定的, RocketMQ采取一些机制,尽量向CommitLog中顺序写 ,但是随机读。commitlog文件默认大小为1G ,可通过在 broker置文件中设置 mappedFileSizeCommitLog属性来改变默认大小。
图示Commitlog文件存储的逻辑视图如下,每条消息的前面4个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。
图示每个CommitLog文件的大小为 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为
1073741824(1G =1073741824byte)。
图示每台 Rocket 只会往一个 commitlog 文件中写,写完一个接着写下一个。
indexFile和ComsumerQueue中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个CommitLog文件上。
ConsumeQueue
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件, 文件地址在${$storeRoot}\consumequeue\${topicName}\${queueld}\$ {fileName}。
图示 图示ConsumeQueue中存储的是消息条目,为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,消息条目如下:
图示ConsumeQueue即为Commitlog文件的索引文件, 其构建机制是 当消息到达Commitlog文件后由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。
存储机制这样设计有以下几个好处:
1) CommitLog顺序写 ,可以大大提高写入效率。
(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方 但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!)
2)虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
3)为了保证完全的顺序写,需要ConsumeQueue这个中间结构 ,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性, CommitLog 里存储了ConsumeQueues 、Message Key、 Tag 等所有信息,即使ConsumeQueue丢失,也可以通过 commitLog 完全恢复出来。
IndexFile
RocketMQ 还支持通过 MessageID 或者 MessageKey 来查询消息;使用ID查询时,因为ID 就是用 broker+offset 生成的(这里 msgId 指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。但是对于用MessageKey来查询消息,RocketMQ 则通过构建一个 index 来提高读取速度。
index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列RocketMQ专门为消息订阅构建的索引文件 ,提高根据主题与消息检索消息的速度 ,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。
图示Config
config 文件夹中存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json : topic配置属性。
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json :集群消费模式消息消进度。
consumerFilter.json :主题消息过滤信息。
图示其他
abort :如果存在 abort 文件说明 Broker 非正常闭,该文件默认启动时创建,正常退出之前删除
checkpoint :文件检测点,存储commitlog文件最后一次刷盘时间戳、 consumequeue最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。
2.过期文件删除
由于 RocketMQ 操作 CommitLog,ConsumeQueue 文件是基于内存映射机制并在启动的时候会加载 commitlog,ConsumeQueue 目录下的所有文件,为 了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。
删除过程分别执行清理消息存储文件( Commitlog )与消息消费 队列文件( ConsumeQueue 文件), 消息消费队列文件与消息存储文件( Commitlog)共用一套过期文件机制。
RocketMQ清除过期文件的方法是 :如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42 小时(不同版本的默认值不同,这里以 4.4.0 为例) ,通过在Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。
触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每 10s 执行一次。
(1)过期判断
文件删除主要是由这个配置属性:fileReservedTime:文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件, 可以删除。
另外还有其他两个配置参数:
deletePhysicFilesInterval:删除物理文件的时间间隔(默认是 100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔 deletePhysicFilesInterval 这个时间再删除另外一个文件,由于删除文件是一个非常耗费 IO 的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳 destroyMapedFileIntervalForcibly 这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少 1000,直到引用 小于等于 0 为止,即可删除该文件。
(2)删除条件
1)指定删除文件的时间点, RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次。删除过期文件操作, 默认为凌晨 4 点。
2)磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是 85),会触发过期文件删除操作。
另外还有 RocketMQ 的磁盘配置参数:
1:物理使用率大于 diskSpaceWarningLevelRatio(默认 90%可通过参数设置),则会阻止新消息的插入。
2:物理磁盘使用率小于 diskMaxUsedSpaceRatio(默认 75%) 表示磁盘使用正常。
3.零拷贝与 MMAP
(1)什么是零拷贝?
零拷贝(英语: Zero-copy) 技术是指计算机执行操作时,CPU 不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省 CPU 周期和内存带宽。
➢零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率
➢零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上:下文切换而带来的开销
可以看出没有说不需要拷贝,只是说减少冗余[不必要]的拷贝。
下面这些组件、框架中均使用了零拷贝技术:Kafka、Netty、Rocketmq、Nginx、Apache。
(2)传统数据传送机制
比如:读取文件,再用 socket 发送出去,实际经过四次 copy。
伪码实现如下:
buffer = File.read();
Socket.send(buffer);
1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy 到应用程序的 buffer;
3、第三步:将 application 应用程序 buffer 中的数据,copy 到 socket 网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将 socket buffer 的数据,copy 到网卡,由网卡进行网络传输。
图示分析上述的过程,虽然引入 DMA 来接管 CPU 的中断请求,但四次 copy 是存在“不必要的拷贝”的。实际上并不需要第二个和第三个数据副本。应 用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。
显然,第二次和第三次数据 copy 其实在这种场景下没有什么帮助反而带来开销(DMA 拷贝速度一般比 CPU 拷贝速度快一个数量级),这也正是零拷贝出现的背景和意义。
打个比喻:200M 的数据,读取文件,再用 socket 发送出去,实际经过四次 copy(2 次 cpu 拷贝每次 100ms ,2 次 DMS 拷贝每次 10ms)。
传统网络传输的话:合计耗时将有 220ms。
同时,read 和 send 都属于系统调用,每次调用都牵涉到两次上下文切换:
图示总结下,传统的数据传送所消耗的成本:4 次拷贝,4 次上下文切换。
4 次拷贝,其中两次是 DMA copy,两次是 CPU copy。
(3)mmap 内存映射
硬盘上文件的位置和应用程序缓冲区(application buffers)进行映射(建立一种一一对应关系),由于 mmap()将文件直接映射到用户空间,所以实际文 件读取时根据这个映射关系,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝,不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。
mmap 内存映射将会经历:3 次拷贝: 1 次 cpu copy,2 次 DMA copy;
打个比喻:200M 的数据,读取文件,再用 socket 发送出去,如果是使用 MMAP 实际经过三次 copy(1 次 cpu 拷贝每次 100ms ,2 次 DMS 拷贝每次10ms)合计只需要 120ms。
从数据拷贝的角度上来看,就比传统的网络传输,性能提升了近一倍。
以及 4 次上下文切换
图示mmap()是在 <sys/mman.h> 中定义的一个函数,此函数的作用是创建一个新的 虚拟内存 区域,并将指定的对象映射到此区域。mmap其实就是通过内存映射 的机制来进行文件操作。
Windows 操作系统上也有虚拟机内存,如下图:
图示(4)代码
代码示例4.RocketMQ 中 MMAP 运用
如果按照传统的方式进行数据传送,那肯定性能上不去,作为 MQ 也是这样,尤其是 RocketMQ,要满足一个高并发的消息中间件,一定要进行优化。 所以 RocketMQ 使用的是 MMAP。
RocketMQ 一个映射文件大概是,commitlog 文件默认大小为 1G。
这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了。
(1)MMAP 文件对应
图示 图示 图示(2)RocketMQ 源码中的 MMAP 运用
RocketMQ 源码中,使用 MappedFile 这个类进行 MMAP 的映射。
源码图示 源码图示RocketMQ 存储整体设计总结
1.消息生产与消息消费相互分离
Producer 端发送消息最终写入的是 CommitLog(消息存储的日志数据文件),Consumer 端先从 ConsumeQueue(消息逻辑队列)读取持久化消息的 起始物理位置偏移量 offset、大小 size 和消息Tag的HashCode值,随后再从CommitLog中进行读取待拉取消费消息的真正实体内容部分;
2.RocketMQ 的 CommitLog 文件采用混合型存储
所有的Topic下的消息队列共用同一个CommitLog的日志数据文件,并通过建立类似索引文件—ConsumeQueue 的方式来区分不同Topic下面的不同MessageQueue的消息,同时为消费消息起到一定的缓冲作用(异步服务线生成了 ConsumeQueue 队列的信息后,Consumer端才能进行消费)。这样,只要消息写入并刷盘至CommitLog文件后,消息就不会丢失,即使 ConsumeQueue中的数据丢失,也可以通过CommitLog来恢复。
3.RocketMQ 每次读写文件的时候真的是完全顺序读写吗?
发送消息时,生产者端的消息确实是顺序写入 CommitLog;订阅消息时,消费者端也是顺序读取 ConsumeQueue,然而根据其中的起始物理位置偏移 量 offset 读取消息真实内容却是随机读取 CommitLog。 所以在 RocketMQ 集群整体的吞吐量、并发量非常高的情况下,随机读取文件带来的性能开销影响还是比较大的,至于RocketMQ怎么优化的,源码解读部分讲解。
我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞、收藏和评论,我们下期见!
上一篇:RocketMQ基础篇(下)