2020-07-20
<meta name="source" content="lake">
《RocketMQ实战与原理解析》读书笔记和总结
基本介绍
核心部分
NameServer集群、Broker集群、生产者、消费者
- NameServer
负责管理所有的Broker消息
让生产者和消费者鬼知道集群里有哪些Broker,然后与之通信
- Broker
实现数据多副本存储和高可用,使用主从架构
- 生产者
向MQ发送消息
- 消费者
从MQ获取消息
其他
- topic
用于区分不同类型的消息
- message queue
Topic可以根据需求设置一个或多个Message Queue,能支持增加并行处理的机器来提高处理速度。
工作原理
先启动NameServer,再启动Broker。这时消息队列已经可以提供服务了,想发送消息队列就使用producter,想接收消息队列就使用consumer。
消费者
一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;(被动接收并自动处理)
另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。(主动拉取处理)
DefaultMQPushConsumer
-
Consumer的GroupName
-
Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。
-
NameServer的地址和端口号
-
可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port; ip2:port; ip3:port”。
-
Topic名称
-
Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe("TopicTest", "tag1 ||tag2 || tag3"),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。
RocketMQ支持两种消息模式:Clustering和Broadcasting。
在Clustering模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。
在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。
长轮询方式
DefaultMQPushConsuer的源码中有很多PullRequest语句。
Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:首先是加大Server端的工作量,进而影响Server的性能;其次,Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。
Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。
“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的。
“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。
“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。
DefaultMQPushConsumer的流量控制
PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。
DefaultMQPullConsumer
- 获取Message Queue并遍历
一个Topic包括多个Message Queue,如果这个Consumer需要获取Topic下所有的消息,就要遍历多有的Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。
- 维护Offsetstore
从一个Message Queue里拉取消息的时候,要传入Offset参数(long类型的值),随着不断读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。
- 根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回:FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET_ILLEGAL四种状态,需要根据每个状态做不同的处理。比较重要的两个状态是FOUNT和NO_NEW_MSG,分别表示获取到消息和没有新的消息。实际情况中可以把while(true)放到外层,达到无限循环的目的。因为PullConsumer需要用户自己处理遍历Message Queue、保存Offset,所以PullConsumer有更多的自主性和灵活性。
Consumer的启动、关闭流程
Consumer分为Push和Pull两种方式,对于PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset的保存,要在程序的异常处理部分增加把Offset写入磁盘方面的处理,记准了每个Message Queue的Offset,才能保证消息消费的准确性。
DefaultMQPushConsumer的退出,要调用shutdown()函数,以便释放资源、保存Offset等。这个调用要加到Consumer所在应用的退出逻辑中。
RocketMQ集群可以有多个NameServer、Broker,某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。
生产者
生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、延迟发送、自定义发送规则、发送事务消息等。
发送消息的步骤
- 设置Producer的GroupName。
- 设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
- 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。
- 设置NameServer地址。
- 组装消息并发送。
发送延迟消息
RocketMQ支持发送延迟消息,Broker收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间后生效。
延迟消息的使用方法是在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个消息发送出去。目前延迟的时间不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如setDelayTimeLevel(3)表示延迟10s。
自定义发送规则
可以通过把MessageQueueSelector的对象作为参数,根据传入的Object参数,或者根据Message消息内容确定把消息发往那个Message Queue,返回被选中的Message Queue。
存储队列位置信息
Offset
Offset是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。
DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP),Consumer. setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。
自定义日志输出
Log是监控系统状态,排查问题的重要手段,RocketMQ的默认Log存储位置是:${user.home}/Logs/rocketmqLogs, Log配置文件的设置可以通过JVM启动参数、环境变量、代码中的设置语句这三种方式来配置。
NameServer
- 整个消息队列的状态服务器,集群的各个组件通过它来了解全局的信息。
- 各个角色的机器都要定期向NameServer上报自己的状态,超时不上报的话,NameServer会认为某个机器出故障不可用了。
- NameServer可以部署多个,相互之间独立。
集群状态的存储结构
image.png image.png状态维护逻辑
其他角色会主动向NameServer上报状态,所以NameServer的主要逻辑在DefaultRequestProcessor类中,根据上报消息里的请求码做相应的处理,更新存储的对应信息。此外,连接断开的事件也会触发状态更新,具体逻辑在org.apache.rocketmq.namesrv.routeinfo的BrokerHousekeepingService类中。
为何不用ZooKeeper
ZooKeeper是Apache的一个开源软件,为分布式应用程序提供协调服务。那为什么RocketMQ要自己造轮子,开发集群的管理程序呢?
答案是ZooKeeper的功能很强大,包括自动Master选举等,RocketMQ的架构设计决定了它不需要进行Master选举,用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。中间件对稳定性要求很高,RocketMQ的NameServer只有很少的代码,容易维护,所以不需要再依赖另一个中间件,从而减少整体维护成本。
底层通信机制
待完善
RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的