RabbitMQ

2020-09-13  本文已影响0人  蜀山_竹君子

MQ简介

什么是MQ?

消息总线(Message Queue),一组跨进程、异步的通信机制,用于上下游消息传递。由消息系统来确保消息的可靠性。
MQ作用
应用解耦、异步通信、流量削峰、数据分发、错峰流控、日志收集等。
MQ衡量标准
服务性能、数据存储、集群架构

主流竞品分析

当前市场MQ产品很多,比如RocketMQ、ActiveMQ、Kafka、RabbitMQ、ZeroMQ,甚至redis也支持MQ功能。

ActiveMQ

apache出品的开源消息总线,完全支持JMS规范的消息中间件,提供丰富API,在中小企业应用广泛。
性能
ActiveMQ在性能方便相对于其他MQ产品,性能较差,在高并发场景会出现阻塞、延迟、堆积。
存储
Active默认基于内存的KahaDB存储,如果要保证消息的可靠性,可以采用关系数据库存储。
集群
ActiveMQ支持master-salave模式、和NETWORK模式。其集群高可用性借助Zookeeper实现。

master-salave模式
master-salave:通过Zookeeper进行集群管理,由Master节点对外提供服务,Master节点出现故障,由Zookeeper会高效的下掉Master节点,由Salave节点提供服务。
NETWORK模式:两套master-salave,由网络通联,组成分布式集群。
NETWORK模式

Kafka

LinkedIn开源的发布-订阅系统,目前是归属于apache顶级项目。Kafka基于Pull模式来处理消息消费,追求高吞吐量,设计目的就是用于日志收集和处理。Kafka不支持事务,对消息的重复、丢失、错误没有严格要求。适用于大量数据的互联网服务的数据收集。Kafk基于操作系统的底层Page Cache实现高效的读写,仅使用内存管理,不存在内存与磁盘之前的IO操作。Kafka借助Zookeeper实现高可用性。


kafka集群

RokectMQ

阿里开源消息中间件,孵化为apache顶级项目。Rocket使用纯java开发,具有高吞吐、高可靠性、适合大型分布式系统应用特点。RoekectMQ 2.X集群基于Zookeeper管理,3.X集群基于NameServer管理。
RokectMQ能够保证消息的顺序消费,提供了丰富的消息拉取等处理模式,开发者可以高效的进行水平扩展,能够承载上亿数据量级。
RockerMQ集群支持Master-Salave、双Master-Salave、多主多从模式。
存储技术支持同步双写、异步复制,使用零拷贝技术。
RockerMQ缺点是上下游交互核心不开源,社区提供的JMS其性能、兼容性、可靠性无法保证。


Rocket

RabbitMQ简介

RabbitMQ是开源的消息代理和队列服务器,用于通过普通协议在不同应用间共享数据。RabbitMQ使用Erlang语言,基于AMQP协议实现。

AMQP

AMQP(Advanced Messae Queueing Protocol)高级消息队列协议: 面向消息中间件的开放式标准应用层协议,定义了以下特性:

AMQP要求消息的提供者和客户端接受者的行为实现对不同供应商可以使用相同的方式进行互相操作。AMQP增减了Exchange和Bindings角色。生产者(Priducer)把消息发布到Exchange上,消息最终达到队列并被消费者接受,而Bindings决定Exchange消息应该达到那个队列。


JMS

JMS(Java Message Service): java消息服务应用程序接口,是java平台中关于面向消息中间件(MOM)的API,JMS定义了一个API以及一组消息收发必须实现的行为。

RabbitMQ优势

基本概念

Exchange类型

Exchange分发消息时,各种类型不同分发策略也有所不同。目前主要有三种类型:

RabbitMQ持久化

RabbitMQ支持消息的持久化,消息持久化包括三部分:

如果Exchange和Queue都是持久化的,那么Binding也是持久化的,如果Exchange和Queue之间有一个是持久化,一个是非持久化那么不允许绑定。

TTL

TTL(Time To Live):生存时间,RabbitMQ支持消息的过期时间,一共两种:

死信队列DLX

死信队列(DLX Dead-Letter-Exchange):利用DLX,当消息在一个队列变成死信(Dead-Message)之后,它能重新pulish到另一个Exchange,这个Exchange就是DLX。DLX也是正常Exchange,和一般Exchang没有区别,能在任何队列被指定,实际就是设定某个队列属性。
消息变成死信的场景

死信队列设置
需要设置死信队列的Exchange和Queue,然后通过Routing Key进行绑定。只是在队列加上一个参数即可。

       Map<String, Object> arguments = Maps.newHashMapWithExpectedSize(3);
       arguments.put("x-message-ttl", dlx-ttl);
       arguments.put("x-dead-letter-exchange", "exchange-name");
       arguments.put("x-dead-letter-routing-key", "routing-key");
       Queue ret = QueueBuilder.burable("queue-name".withArguments(arguments).build();

只需要监听该死信队列即可处理死信消息。还可以通过死信队列实现延迟消息。

消费端ACK和NACK

消费端进行消费时,如果由于业务异常可以进行日志的记录,然后进行补偿。由于服务器宕机严重,我们需要手动进行ACK保障消费端消费成功。
消费端重回队列是为了对没有处理成功消息,把消息重新返回Broker。一般,在实际应用中都会关闭重回队列,也就是设置false。
生产者确认机制(confirm)

  1. 开Channel开启确认模式,Channel.confirmSelect();
  2. 在Channe上开启监听,addConfrimListener,监听成功和失败的处理结果,对消息进行重发或记录日志等待进一步处理。

Return消息机制
Return Listener用于处理一些不可路由消息。
在MQ正常情况下,我们生产者将消息通过指定Exchange和Routing,把消息送达到某一个队列,然后消费者监听这个队列进行消息的处理。
但是在某种情况,我们发送消息时,当前Exchange不存在或指定的路由Key路由不到,这时我们监听不可到达消息就需要Return Listener。
在MQ基础API中有个关键配置:Mandatory,如果设置true,那么监听器会收到不可到达消息,然后处理,如果设置false,那么Broker默认会自动删除不可到达消息。
消费端自定义监听(推模式和拉模式pull/push)

如果是高并发场景,要实现高吞吐量,消费者应该使用basic.Consume方法,直接订阅队列,将信道设置为接收模式,直到取消队列的订阅。在订阅期间,MQ会不间断推送消息到消费者。推送的消息受到Basic.Qos限制。
如果只想从队列获取单条消息,那么应该使用Basic.Get,但是不能讲Basic.Get放到死循环中,这样会严重影响MQ性能。

消息幂等性保证

消费者实现了消息的幂等性,可以防止消息被多次消费。

  1. 创建消息去重库,把全局唯一ID+指纹码作为唯一约束,如果插入成功则表示没有消费这条消息,插入失败则消息已消费。
  2. 优点:实现简单
  3. 缺点:高并发场景存在数据库写入瓶颈
  4. 解决方案:根据ID进行分库分表进行算法路由
update t_payment set orderId = #{orderId} , money=#{money},  payStatus=#{payStatus}  version=#{ version } +1 
where id=#{id} and version=#{version}
update table set status = 2... where status = 1
if(!update)
进行业务处理
else
重复处理

消息可靠性保证

解决方案

  1. 消息落库,对消息状态进行变更



    缺点是对数据库有多次操作,不适合高并发场景。

  2. 消息的延迟投递,做二次确认,回调检查


拆分出一个回调服务,将落库、状态检查等操作安排到回调服务中。
1:消息发送者发送信息到MQ,消费者为下游业务方。
1.1 成功后,作为发送方发送消息到MQ,消费者为回调服务
1.1.1 回调服务接受数据后,落库。
2.2 失败,等待发送者的延时投递信息
2:发送者发送延时投递信息到MQ,消费者为回调服务
2.1 查库,确认下游消费已成功
2.2 确认下游消费已失败,通过RPC接口调用发送者的接口重新发送
回调模式减少了数据库操作,但是不能保证消息的百分之百可靠。

MQ限流

当海量消息瞬时推送过来,消费者无法同时处理那么多数据,严重甚至导致宕机,这时需要流量削峰。
RabbitMQ提供了一直Qos(服务质量保证)功能。即在非自动确认消息的前提下(非ACK),如果一定数目的消息(通过基于consume或channel的qos参数设置)未被确认前,不进行消费新的消息。

RabbitMQ集群

RabbitMQ最大的优势就是内建集群,其设计目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及添加更多节点来线性扩展消息通信的吞吐量。
RabbitMQ会始终记录四种内部元数据:

上一篇下一篇

猜你喜欢

热点阅读