JavaJava 杂谈程序员

RabbitMQ实战(三)-高级特性

2019-07-01  本文已影响22人  JavaEdge

0 相关源码

1 你将学到

2 保证消息的百分百投递成功

2.1 Producer 的可靠性投递

2.1.1 要求

在实际生产中,很难保障前三点的完全可靠,比如在极端的环境中,生产者发送消息失败了,发送端在接受确认应答时突然发生网络闪断等等情况,很难保障可靠性投递,所以就需要有第四点完善的消息补偿机制。

2.1.2 解决方案

2.1.2.1 方案一:消息信息落库,对消息状态进行打标(常见方案)

将消息持久化到DB并设置状态值,收到Consumer的应答就改变当前记录的状态.
再轮询重新发送没接收到应答的消息,注意这里要设置重试次数.

方案流程图 image
方案实现流程

比如我下单成功
step1 - 对订单数据入BIZ DB订单库,并对因此生成的业务消息入MSG DB消息库

此处由于采用了两个数据库,需要两次持久化操作,为了保证数据的一致性,有人可能就想着采用分布式事务,但在大厂实践中,基本都是采用补偿机制!

这里一定要保证step1 中消息都存储成功了,没有出现任何异常情况,然后生产端再进行消息发送。如果失败了就进行快速失败机制

对业务数据和消息入库完毕就进入
setp2 - 发送消息到 MQ 服务上,如果一切正常无误消费者监听到该消息,进入

step3 - 生产端有一个Confirm Listener,异步监听Broker回送的响应,从而判断消息是否投递成功

step6 - 把抓取出来的消息进行重新投递(Retry Send),也就是从第二步开始继续往下走

step7 - 当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。

思考:该方案在高并发的场景下是否合适

对于第一种方案,我们需要做两次数据库的持久化操作,在高并发场景下显然数据库存在着性能瓶颈.

其实在我们的核心链路中只需要对业务进行入库就可以了,消息就没必要先入库了,我们可以做消息的延迟投递,做二次确认,回调检查。下面然我们看方案二

2.1.2.2 消息延迟投递,两次确认,回调检查(大规模海量数据方案)

大厂经典实现方案

当然这种方案不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务.

主要就是为了减少DB操作

方案流程图 image
方案实现流程
设计目的

少做一次DB的存储,在高并发场景下,最关心的不是消息百分百投递成功,而是一定要保证性能,保证能抗得住这么大的并发量。所以能节省数据库的操作就尽量节省,异步地进行补偿.

其实在主流程里面是没有Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其他的步骤都是一个补偿机制。

小结

这两种方案都是可行的,需要根据实际业务来进行选择,方案二也是互联网大厂更为经典和主流的解决方案.但是若对性能要求不是那么高,方案一要更简单.

3 幂等性

3.1 什么是幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的

比如数据库的乐观锁,在执行更新操作前,先去数据库查询version,然后执行更新语句,以version作为条件,如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种乐观锁的机制来保障幂等性.

3.2 Con - 幂等性

3.2.1 什么是Con - 幂等性

在业务高峰期最容易产生消息重复消费问题,当Con消费完消息时,在给Pro返回ack时由于网络中断,导致Pro未收到确认信息,该条消息就会重新发送并被Con消费,但实际上该消费者已成功消费了该条消息,这就造成了重复消费.

而Con - 幂等性,即消息不会被多次消费,即使我们收到了很多一样的消息.

3.2.2 主流幂等性实现方案

3.2.2.1 唯一ID+指纹码

核心:利用数据库主键去重
小结

首先我们需要根据消息生成一个全局唯一ID,然后还需要加上一个指纹码。这个指纹码它并不一定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障这次操作是绝对唯一的。

将ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了

3.2.2.2 利用Redis原子性

这里我们使用Redis实现幂等,还需要考虑如下问题

这里只提用Redis的原子性去解决MQ幂等性重复消费的问题

MQ的幂等性问题 根本在于的是生产端未正常接收ACK,可能是网络抖动、网络中断导致

可能的方案

Con在消费开始时将 ID放入到Redis的BitMap中,Pro每次生产数据时,从Redis的BitMap对应位置若不能取出ID,则生产消息发送,否则不进行消息发送。

但是有人可能会说,万一Con,ProRedis命令执行失败了怎么办,虽然又出现重复消费又出现Redis非正常执行命令的可能性极低,但是万一呢?

OK,我们可以在Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。

4 Confirm机制

4.1 什么是Confirm机制

4.2 Confirm机制流程图

image

Pro发送消息到Broker,Broker接收到消息后,产生回送响应
Pro中有一个Confirm Listener异步监听响应应答

4.2 实现Confirm机制

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上添加监听:addConfirmListener
    监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理

接下来就让我们根据原理进行实操吧!

5 Return机制

5.1 什么是Return机制

5.2 Return机制示意图

image

5.3 实现Return机制

在基础的API中的一个关键的配置项:Mandatory

6 Con - 自定义监听

自定义Con实现只需要继承 DefaultConsumer 类,重写 handleDelivery 方法即可!

6.1 代码实现

7 Con - 限流

7.1 什么是Con - 限流

7.1.1 消息过载场景

因此,我们需要限流

7.1.2 Con - 限流机制

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于Con或者channel设置Qos的值) 未被确认前,不消费新的消息

不能设置自动签收功能(autoAck = false)
如果消息未被确认,就不会到达Con,目的就是给Pro减压

限流设置API

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);

prefetchSize和global这两项,RabbitMQ没有实现,暂且不研究
prefetchCount在 autoAck=false 的情况下生效,即在自动应答的情况下该值无效

手工ACK
void basicAck(Integer deliveryTag,boolean multiple)
调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于我们是一次处理一条消息,所以设置为false

7.2 实现Con - 限流

8 Con - ACK & 重回队列机制

8.1 ACK & NACK

当我们设置autoACK=false 时,就可以使用手工ACK方式了,其实手工方式包括了手工ACK与NACK

当我们手工 ACK 时,会发送给Broker一个应答,代表消息处理成功,Broker就可回送响应给Pro.
NACK 则表示消息处理失败,如果设置了重回队列,Broker端就会将没有成功处理的消息重新发送.

使用方式

API:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
API:void basicAck(long deliveryTag, boolean multiple)

8.2 重回队列

8.3 实现机制

9 TTL机制

9.1 什么是TTL

9.2 管控台演示

10 死信队列机制

10.1 什么是死信队列

DLX - 死信队列(dead-letter-exchange)
利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange中,这个Exchange就是DLX.

10.2 死信队列的产生场景

10.3 死信的处理过程

10.4 死信队列的配置

arguments.put(" x-dead-letter-exchange","dlx.exchange");

这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

10.5 实操演示

11 总结

本文专注RabbitMQ高级特性的学习

首先介绍了大厂在实际使用中是如何保障消息投递成功和幂等性的,以及对RabbitMQ的确认消息、返回消息、ACK与重回队列、消息的限流,以及对超时时间、死信队列的使用

最后,感谢您的阅读!

参考

RabbitMQ 100% 投递成功方案详解
RabbitMQ 从入门到精通(二)
RabbitMQ 幂等性概念及业界主流解决方案
RabbitMQ从入门到精通(三)

上一篇下一篇

猜你喜欢

热点阅读