RabbitMQ消息队列
消息队列提供消息的接收,消息的路由,和消息的发送功能。
它可以为大型系统解耦,提供异步处理能力和工作队列的能力
特性
围绕着消息队列的功能,rabbit mq提供以下特性
-
可靠性或高性能
用户可以选择是否持久化消息和队列来提高可靠性
投递确认特性保证消息被正确的消费
发布者证实机制保证消息发布的安全性
通过镜像提供高可用,避免消息队列单点问题
-
灵活的路由
Rabbit MQ提供非常多样的路由选择,可以满足应用的大部分场景
-
集群,高可用
通过多个MQ服务器联合工作,并提供镜像功能保证集群的高可用和高性能
协议
Rabbit MQ使用AMQP协议来作为消息队列的协议
AMQP
简介
高级消息队列协议,是一种网络协议
它包括发布者,消费者和消息代理
mq-amqp-simple发布者向消息队列投递消息
消息的某些属性会被消息代理利用,作为路由的特性,而其他属性会被直接透传给消费者
由于网络是不可靠的,消费者有可能处理失败,AMQP提供一个消息确认的机制,只有消息被确认了,才会从消息队列删除。消息确认可以是自动或者由消费者发送回执确认
某些情况下,消息有可能无法被成功的路由,消息或许返回给发布者并被丢弃,或者如果消息代理执行了延期操作,消息会被放入一个死信队列中
消息路由规则
- Direct exchange(直连交换机) : (Empty string) and amq.direct
用于单播
发布者的消息携带路由键R,会被发送到所有绑定了路由键R的消费队列
可以实现相同路由键R的消费者的负载均衡
- Fanout exchange(扇型交换机) : amq.fanout
用于广播
- Topic exchange(主题交换机) : amq.topic
用于多播,通过路由键和队列来多播
- Headers exchange(头交换机) : amq.match (and amq.headers in RabbitMQ)
它是直连交换的另一种模式
不同之处是直连交换只通过路由键来路由
而头交换则通过更多自定义的头属性,来路由消息
队列
- Name:名字
- Durability:消息代理重启后,交换机是否还存在,但消息体不会被持久化
- Exclusive:只能被一个连接使用,连接关闭后,队列自动删除
- Auto-delete:当最后一个消费者退订后,队列被删除
队列需要被声明(declare)后才能被使用
队列可以被多次声明,但如果声明的参数不一致,会返回错误
消费者
消费有两种模式:
- push API,消息订阅,被动消费
- pull API,根据需要,主动消费
拒绝消息
当消费者消费某条消息失败后,可以发送拒绝消息给消息代理
消息代理或许把消息销毁或者再次投入消息队列
预取消息
多个消费者共享一个队列的案例中,可以配置在收到一个消息确认后,每个消费者可以接收多少条消息
它可以提高系统的吞吐量
消息
- content type:内容类型
- Content encoding:内容编码
- Routing key:路由键
- Delivery mode:是否持久化
- Message priority:消息优先级
- Message publishing timestamp:消息发布时间错
- Expiration period:消息有效期
- Publisher application id:发布应用的ID
连接
通过TCP SSL来保护连接
通道
每个线程都开一个连接是不高效的
可以为整个应用开一个channel
各个线程通过共享这个channel的连接,来提升连接的利用率
脑裂问题
当部署了两个节点的rabbitmq的时候,如果这两个节点的网络不通或者不稳定的时候,两个节点会自动选举自己为master,并且各自工作。
等网络重新变好时,两个节点不会重新成为集群,并造成消息消费异常,需要手工解决
自动解决的方法:
配置 cluster_partition_handling为以下的一种模式
-
ignore
当出现脑裂时,需要人工恢复
-
pause_minority
启动奇数个mq实例,如果部分网络不可用时,少数派(小于半数的节点数)的实例会自动关闭,多数派会继续正常工作。并可以自动恢复集群
-
pause_if_all_down
语法为{pause_if_all_down, [nodes], ignore|autoheal}
当某个节点检测其他所有节点都不可用时,自动关闭自身。并通过ignore或者autoheal来恢复集群
这适用于偶数个节点的情况
例如两个机房分别部署了两个mq实例,但机房不通时,各机房的mq依然可以继续工作,适用于4个mq实例的情况
-
autoheal:当出现脑裂的时候,Rabbitmq会选取一个获胜的分区作为master。获胜的评判规则是:连接较多的一个、顺序优先
丢数据问题
- rabbitmq收到消息,暂存在内存中,还没消费,就挂了,造成消息丢失
- 消费者消费了消息,但还没来得及处理,就挂了,rabbitmq以为消费者已经处理,但实际没有被处理,造成消息丢失
解决方法
-
发布者
开启事务,如果消息没有被mq接收,则回滚,并重新发送消息(同步,性能低)
适用confirm机制(异步),确定消息是否正确收到,mq可以异步通知,但会造成消息乱序的问题
-
对于MQ
-
消息持久化
exchange持久化
queue持久化
message持久化
-
开启集群镜像
HA模式,可以选择同步给所有节点,同步最多N个节点和之同步给符合条件的节点
缺点是吞吐量会下降,当可用性会增大
-
消息补偿机制
可以通过数据库来做备用存储,来保证消息的正确性和可用性
-
-
消费者
ACK确认机制