2020-07-20

2020-07-20  本文已影响0人  fly林十一

<meta name="source" content="lake">

《RocketMQ实战与原理解析》读书笔记和总结

基本介绍

核心部分

NameServer集群、Broker集群、生产者、消费者

负责管理所有的Broker消息

让生产者和消费者鬼知道集群里有哪些Broker,然后与之通信

实现数据多副本存储和高可用,使用主从架构

向MQ发送消息

从MQ获取消息

其他

用于区分不同类型的消息

Topic可以根据需求设置一个或多个Message Queue,能支持增加并行处理的机器来提高处理速度。

工作原理

先启动NameServer,再启动Broker。这时消息队列已经可以提供服务了,想发送消息队列就使用producter,想接收消息队列就使用consumer。

消费者

一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;(被动接收并自动处理)

另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。(主动拉取处理)

DefaultMQPushConsumer

RocketMQ支持两种消息模式:Clustering和Broadcasting。

在Clustering模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。

在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

长轮询方式

DefaultMQPushConsuer的源码中有很多PullRequest语句。

Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:首先是加大Server端的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。

Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。

“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的。

“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。

“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer

DefaultMQPushConsumer的流量控制

PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。

DefaultMQPullConsumer

一个Topic包括多个Message Queue,如果这个Consumer需要获取Topic下所有的消息,就要遍历多有的Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。

从一个Message Queue里拉取消息的时候,要传入Offset参数(long类型的值),随着不断读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。

拉取消息的请求发出后,会返回:FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET_ILLEGAL四种状态,需要根据每个状态做不同的处理。比较重要的两个状态是FOUNT和NO_NEW_MSG,分别表示获取到消息和没有新的消息。实际情况中可以把while(true)放到外层,达到无限循环的目的。因为PullConsumer需要用户自己处理遍历Message Queue、保存Offset,所以PullConsumer有更多的自主性和灵活性。

Consumer的启动、关闭流程

Consumer分为Push和Pull两种方式,对于PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset的保存,要在程序的异常处理部分增加把Offset写入磁盘方面的处理,记准了每个Message Queue的Offset,才能保证消息消费的准确性。

DefaultMQPushConsumer的退出,要调用shutdown()函数,以便释放资源、保存Offset等。这个调用要加到Consumer所在应用的退出逻辑中。

RocketMQ集群可以有多个NameServer、Broker,某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。

生产者

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、延迟发送、自定义发送规则、发送事务消息等。

发送消息的步骤

  1. 设置Producer的GroupName。
  2. 设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
  3. 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。
  4. 设置NameServer地址。
  5. 组装消息并发送。

发送延迟消息

RocketMQ支持发送延迟消息,Broker收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间后生效。

延迟消息的使用方法是在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个消息发送出去。目前延迟的时间不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如setDelayTimeLevel(3)表示延迟10s。

自定义发送规则

可以通过把MessageQueueSelector的对象作为参数,根据传入的Object参数,或者根据Message消息内容确定把消息发往那个Message Queue,返回被选中的Message Queue。

存储队列位置信息

Offset

Offset是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。

DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP),Consumer. setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。

自定义日志输出

Log是监控系统状态,排查问题的重要手段,RocketMQ的默认Log存储位置是:${user.home}/Logs/rocketmqLogs, Log配置文件的设置可以通过JVM启动参数、环境变量、代码中的设置语句这三种方式来配置。

NameServer

集群状态的存储结构

image.png image.png

状态维护逻辑

其他角色会主动向NameServer上报状态,所以NameServer的主要逻辑在DefaultRequestProcessor类中,根据上报消息里的请求码做相应的处理,更新存储的对应信息。此外,连接断开的事件也会触发状态更新,具体逻辑在org.apache.rocketmq.namesrv.routeinfo的BrokerHousekeepingService类中。

为何不用ZooKeeper

ZooKeeper是Apache的一个开源软件,为分布式应用程序提供协调服务。那为什么RocketMQ要自己造轮子,开发集群的管理程序呢?

答案是ZooKeeper的功能很强大,包括自动Master选举等,RocketMQ的架构设计决定了它不需要进行Master选举,用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。中间件对稳定性要求很高,RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本。

底层通信机制

待完善

RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的

上一篇下一篇

猜你喜欢

热点阅读