spingboot收藏

Java进阶-RocketMQ-基础

2022-02-16  本文已影响0人  GIT提交不上

一、架构设计

官方文档-技术架构
RocketMQ源码解析(一)-架构原理

技术架构.jpeg 部署架构.jpeg

  集群工作流程:

二、消息发送

  Producer发送消息支持3种方式,同步、异步和Oneway。

2.1 Name 服务器的均等性

  在 Broker 启动的时候,其会将自己在本地存储的话题配置文件 (默认位于 $HOME/store/config/topics.json 目录) 中的所有话题加载到内存中去,然后会将这些所有的话题全部同步到所有的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步:

image.png

2.2 寻找路由信息

  当客户端发送消息的时候,其首先会尝试寻找话题路由信息。即这条消息应该被发送到哪个地方去。客户端在内存中维护了一份和话题相关的路由信息表 topicPublishInfoTable,当发送消息的时候,会首先尝试从此表中获取信息。如果此表不存在这条话题的话,那么便会从 Name 服务器获取路由消息。

image.png

  服务器返回的话题路由信息包括以下内容:

image.png

  每个 Broker 上面可以绑定多个可写消息队列和多个可读消息队列,客户端根据返回的所有 Broker 地址列表和每个 Broker 的可写消息队列列表会在内存中构建一份所有的消息队列列表。之后客户端每次发送消息,都会在消息队列列表上轮循选择队列 (我们假设返回了两个 Broker,每个 Broker 均有 4 个可写消息队列):

image.png

2.3 话题检查

image.png

2.4 整体流程

image.png

2.5 源码解析

Message msg = new Message( "Test", "Hello World".getBytes() );
DefaultMQProducer producer = new DefaultMQProducer();
producer.start();
--------
producer.send(msg);
RocketMQ-Producer.png

RocketMQ 消息发送流程
RocketMQ源码解析(三)-Producer

三、消息存储

  用来存储消息的文件被称之为 MappedFile。文件默认创建的大小为 1GB,即 1024 * 1024 * 1024 = 1073741824 字节,每个文件的命名是按照总的字节偏移量来命名的

image.png

3.1 文件创建

  当 Broker 启动的时候,其会将位于存储目录下的所有消息文件加载到一个列表中,当有新的消息到来的时候,其会默认选择列表中的最后一个文件来进行消息的保存。

  RocketMQ 提供了一个专门用来实例化 MappedFile 文件的服务类 AllocateMappedFileService。在内存中,也同时维护了一张请求表 requestTable 和一个优先级请求队列 requestQueue 。当需要创建文件的时候,Broker 会创建一个 AllocateRequest 对象,其包含了文件的路径、大小等信息。然后先将其放入 requestTable 表中,再将其放入优先级请求队列 requestQueue 中。

  服务类会一直等待优先级队列是否有新的请求到来,如果有,便会从队列中取出请求,然后创建对应的 MappedFile,并将请求表 requestTable 中 AllocateRequest 对象的字段 mappedFile 设置上值。最后将 AllocateRequest 对象上的 CountDownLatch 的计数器减 1 ,以标明此分配申请的 MappedFile 已经创建完毕了。

image.png

  等待 MappedFile 创建完毕之后,其便会从请求表 requestTable 中取出并删除表中记录,然后再将其放到MappedFile列表中去。

image.png

3.2 文件初始化

  在 MappedFile 的构造函数中,其使用了 FileChannel 类提供的 map 函数来将磁盘上的这个文件映射到进程地址空间中。然后当通过 MappedByteBuffer 来读入或者写入文件的时候,磁盘上也会有相应的改动。

3.3 消息写入

  每条消息的存储是按照一个 4 字节的长度来做界限的,这个长度本身就是整个消息体的长度,当读完这整条消息体的长度之后,下一次再取出来的一个 4 字节的数字,便又是下一条消息的长度。

image.png image.png

  前两位是 4 字节的长度和 4 字节的 MAGICCODE,MAGICCODE 的可选值有:

  当这个文件有能力容纳这条消息体的情况下,其便会存储 MESSAGE_MAGIC_CODE 值;当这个文件没有能力容纳这条消息体的情况下,其便会存储 BLANK_MAGIC_CODE 值。所以这个 MAGICCODE 是用来界定这是空消息还是一条正常的消息。

3.4 消息刷盘

四、消息接受

  消费者客户端与 Broker 服务器进行沟通的整体流程如下:

image.png

  RocketMQ 管理消息的单位不是话题,而是队列。
  客户端有两种消费模式,一种是广播模式,另外一种是集群模式。

  集群模式:

4.1 Broker消费队列文件

  消息往 Broker 存储就是在向 CommitLog 消息文件中写入数据的一个过程。在 Broker 启动过程中,其会启动一个叫做 ReputMessageService 的服务,这个服务每隔 1 秒会检查一下这个 CommitLog 是否有新的数据写入。ReputMessageService 自身维护了一个偏移量 reputFromOffset,用以对比和 CommitLog 文件中的消息总偏移量的差距。当这两个偏移量不同的时候,就代表有新的消息到来了。

  commitLog 文件夹下面存放的是完整的消息,来一条消息,向文件中追加一条消息。同时,根据这一条消息属于 TopicTest 话题下的哪一个队列,又会往相应的 consumequeue 文件下的相应消费队列文件中追加消息的偏移量、消息大小和标签码。

image.png image.png

4.2 消息队列偏移量

  针对同一话题,在集群模式下,由于每个客户端所消费的消息队列不同,所以每个消息队列已经消费到哪里的消费偏移量是记录在 Broker 服务器端的。而在广播模式下,由于每个客户端分配消费这个话题的所有消息队列,所以每个消息队列已经消费到哪里的消费偏移量是记录在客户端本地的。

  集群模式:

image.png

  广播模式:

image.png

4.3 拉取消息

  客户端和 Broker 服务器端完整拉取消息的流程图如下:

image.png

4.3 消费消息

  并发消费:

image.png

  有序消费:

  RocketMQ 的有序消费主要依靠两把锁,一把是维护在 Broker 端,一把维护在消费者客户端。在有序消费的时候,Broker 需要确保任何一个队列在任何时候都只有一个客户端在消费它,都在被一个客户端所锁定。

image.png

  RocketMQ 的消息树是用 TreeMap 实现的,其内部基于消息偏移量维护了消息的有序性。每次消费请求都会从消息树中拿取偏移量最小的几条消息 (默认为 1 条)给用户,以此来达到有序消费的目的。

image.png

五、消息过滤

image.png image.png image.png

六、消息索引

  ID (偏移量) 查询:

image.png

  消息队列偏移量查询:

image.png

  消息索引服务:

  每当一条消息发送过来之后,其会封装为一个 DispatchRequest 来下发给各个转发服务,而 CommitLogDispatcherBuildIndex 构建索引服务便是其中之一。

image.png
上一篇下一篇

猜你喜欢

热点阅读