消息队列--RocketMQ学习笔记
2018-06-19 本文已影响83人
LY丶Smile
序言
本文是RocketMQ学习及使用过程中整理的笔记,主要是个人觉得很关键或者是需要加深理解部分知识的纪录,主要涉及RocketMQ基本概念、部署结构、消息相关知识等,可以做入门资料阅读
消息队列应用场景
- 异步处理
- 将不是必须的业务逻辑进行异步处理
- 应用解耦
- 应用之间的交互采用消息队列可以很好的减少彼此的依赖,一个应用崩溃不会影响整个服务
- 流量削峰
- 比较常见于秒杀和团购
- 直接丢弃或者延后处理
- 消息通讯
- 应用很单纯,就是用作消息通讯,也是上边所有的场景的基础
基本概念
- Topic
- 主题,一级消息类型,可以配合Tag使用做细致区分,不同类型的消息设置不同Topic
- 建议:一个应用尽可能使用一个Topic,功能以Tag区分
- Tag
- 消息标签,二级消息类型,用于进一步区分某个Topic下的消息分类
- Queue
- 消息队列
- 一个Topic下可以有多个queue
- Producer
- 生产者,发送消息
- Consumer
- 消费者
- 一个消息可以对应多个消费者
- Group
- Consumer Group:消费者分组,为了实现集群消费,不同Consumer Group之间消费进度彼此不受影响,一个Consumer Group下包含多个Consumer实例
- Producer Group:生产者分组,标识发送同一类消息的Producer,通常发送逻辑一致,一个Producer Group可以发送多个Topic消息
- Broker
- 服务器
物理部署结构
图片来自网络- NameServer几乎无状态的节点,节点之间无任何信息同步
- Broker为Master/Slave模式,一个Master可以对应多个Slave,一个Slave只能对应一个Master,对应关系:BrokerName相同,BrokerId不同,0表示Master,非0表示Slave
- Producer与NameServer中的随机一个节点建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Master建立长连接
- Consumer跟提供Topic服务的Master、Slave建立长连接。Consumer可以从Master订阅也可以从Slave订阅,取决于Broker的配置
- Producer与Consumer都会定时发送心跳
逻辑部署结构
图片来自网络消费类型
-
集群消费:
- 一个Group里的Consumer平均消费Topic下的Queue
- Consumer Group里的Consumer数目最好和Topic的queue数目一致或者成倍数关系
-
广播消费:
- 忽略Consumer Group,消费者只要订阅了Topic,那么就会收到该Topic下的所有queue
- 可以在实例化Consumer的时候指定消费类型
offset
纪录消费位置
顺序消息
-
相关概念
- 顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。包括顺序发布和顺序消费
-
注意事项
- 顺序消息暂不支持广播模式
事务消息
-
相关概念
- 事务消息:通过事务消息能达到分布式事务的最终一致
- 半消息:暂时还不能投递的消息,发送方成功将消息发送到了RocketMQ的服务器端,但是服务端没有收到生产者对该消息的二次确认,此时消息被标记为“暂不能投递”状态
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
-
事务消息图解
图片来自网络1). 可以分为三个阶段:第一阶段发送Prepare消息并且能拿到消息的地址,第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息并修改消息的状态
2). 事务消息主要在于消息投递给消费者之前,消息投递给消费者之后,如果消费者消费失败,则不断重试,如果最终还是失败,则只能人工介入。
3). 回查是在二次确认未到达MQ Server的情况下启用
-
注意事项
- 事务消息的 Producer ID 不能与其他类型消息的 Producer ID 共用。
- 通过 ONSFactory.createTransactionProducer 创建事务消息的 Producer 时必须指定 LocalTransactionChecker 的实现类,处理异常情况下事务消息的回查。
- 事务消息发送完成本地事务后,可在 execute 方法中返回如下三种状态:
- TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
- TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
- TransactionStatus.Unknow 暂时无法判断状态,期待固定时间以后 MQ Server 向发送方进行消息回查。
消息查询手段
-
按Message Id查询
- Message Id消息唯一标识,系统自动生成
-
按Message Key查询
- 用户指定,最好是能确保其唯一性
消息过滤
-
broker端消息过滤
- 减少无用消息传输,减少了网络开销
- 增加了broker的负担,实现起来相对复杂
-
Consumer端消息过滤
- 过滤逻辑完全自定义实现
- 缺点是有无用消息传输,增大了网络开销
开发--零散知识点
引入
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0-incubating</version>
</dependency>
用法
- producer.setRetryTimesWhenSendFailed(5); //消息发送失败重试次数
-
consumer.setConsumeFromWhere(ConsumeFromWhere.x)
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_LAST_OFFSET:第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,