saas

RabbitMQ 消息可靠性投递 消息幂等性 有序性

2019-11-13  本文已影响0人  風暴之灵

消息可靠性投递

可靠性投递需满足四个条件:

1.保障消息的成功发出

2.保障MQ节点的成功接收

3.发送端收到MQ节点(Broker)确认应答

4.完善的消息进行补偿机制

临界值情况:

情况一:生产端投递消息失败,消费端未收到消息。

情况二:生产端投递消息了,消费端也收到了,但在返回应答过程中,突然网络中断了导致发送端未收到确认应答。

可靠性投递解决方案

第一种方案:消息落库,对消息状态进行打标。

在发送消息时,要将消息持久化到数据库中,然后消息设置一个状态,如发送出去,状态为0(发送中),消息到达broker端,broker端返回响应后,将消息状态变更为1(消息已成功收到)。

但是,针对没有收到响应的消息,该怎么做?做一个轮询操作,抓取一些没有OK的消息,设置最大尝试次数,进行重新发送。

第二种方案:消息的延迟投递,做二次确认,回调检查。

消息落库方案

1.消息落库。分为两个小步骤,一个是订单业务数据落库,一个是订单message落库。因此,第一件事情把订单业务数据入库(创建订单实体),第二件事情要生产一条消息,将message封装好,然后把这条消息也要入库,设置消息初始状态0,表示消息发送中。这种方式缺点在于要对数据库数据持久化两次。这里必须保证这两步持久化操作成功,否则立即返回快速失败。

2.发送消息给broker

3.假设broker收到消息,broker将应答给生产端。

4.producer端会有个Confirm Listener异步的监听broker端返回的响应。如果broker端返回true,监听器监听到true后,会将这条指定的消息状态更新为1,表示这条消息已经百分之百的投递成功了。

5.分布式定时任务,处理极端情况。假设broker端在回送响应时,网络突然间闪断,这是MSG DB中这条消息永远是初始状态0。这个时候需要设定一个规则,设置一个临界值timeout,比如5分钟,如果消息5分钟之内还是初始状态0,就需要将这条消息抽取出来,执行

6.重新发送。或者定时任务设置一个定时间隔,如每隔5分钟抓取一次status为0的订单,这种可能会出现一个小问题,消息刚发出去,定时任务就开始跑了,有一些消息其实是能ACK成功的,但是分布式定时任务又重发了一遍。所以建议是设置一个消息超时的最大时间限制,如5分钟之内这条线消息还没有收到broker端的响应,status还是0,这个时候,再去抽取出来,再执行第六步重新发送。第六步:重新发送

7.设置最大重试次数,超过最大重试次数,将status更新为2表示投递失败。

消息的延迟投递, 做二次确认, 回调检查

1. 第一次消息发送消息A必须业务数据落库之后才能进行消息发送

2.第二次消息延迟发送, 设定延迟一段时间发送第二次验证消息B

3.消费端监听Broker, 进行消息消费,消费消息A

4.消费成功之后, 发送确认消息C到确认消息队列

5. Callback Service监听step4中的确认消息队列, 接收确认消息C,存储到数据库

6.Callback Service监听step2发送的Delay Check的消息队列, 收到延时消息B之后,检测数据库是否存储

7.如果数据库没有存储,重新走该流程

消息幂等性

消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。

实现消息幂等性

消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)

利用Redis的分布式锁:给消息分配一个全局ID,只要消费过该消息,将 < id,message>写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。

消息有序性

RabbitMQ在2.7.0及之后版本支持【同发送信道、同exchange、同queue、同消费信道的消息顺序与发送顺序相同】

一句话总结:单一生产者、单一队列、单一消费进程是可以保障有序的,其他不保证

上一篇下一篇

猜你喜欢

热点阅读