rabbitMQ可靠性投递
可靠性投递
关于消息的可靠性投递,从以下几个方面来着手:
- 保障消息成功发出;
- 保障MQ成功接收;
- 发送方接收到MQ响应;
- 完善的消息补偿机制(如重试机制);
本文关于消息投递的两种方式:消息落库打标
和二次消息机制
。
消息落库打标
1)发送方将消息存到RDB中(如MySQL);该步骤可以通过事务来确保 业务数据 和 消息数据 落库成功;
2)发送方发送消息给MQ broker;
3)MQ broker返回给发送方确认ACK(rabbitmq自带的confirm机制);
4)修改消息数据库;
5)分布式定时任务会检查消息数据库,取出其中的异常数据(如超时状态为修改),进入重试阶段;
6)重试阶段,针对于每个任务都有一个业务数据来关联重试次数。超过次数,报告失败;
其中,在第1)步,如果持久化失败,会快速失败,便于进行下一次重试。
在第3)步,如果出现网络抖动或者异常,即MQ broker返回给发送方失败。会导致该消息数据长时间在数据库中没有被更新,即状态还是没有被MQ broker确认的状态。所以需要有另外的定时任务去检查消息入库时长,超过一定时间会踢出任务,进入步骤5)。
缺点:因为在第一步需要对数据库进行2次持久化操作,还需要额外的事务来保证。所以对于并发高的环境,其实还是需要深思熟虑的。
二次消息机制
其中涉及服务:上游发送方,MQ broker,下游接收方,回调方。

1)业务数据落库,落库完毕再发送消息,此处发送2条消息;
2)MQ broker的消息到下游服务;
3)confirm消息:下游服务消费后,重新生成一条消息,投递给MQ broker;
4)此时callback service监听下游服务生成的消息,对收到的消息进行持久化,保存到DB中;
5)callback service监听发送方的第二条消息,并去数据库中进行查询,有即处理好了,没有即处理异常;
6)第5)步异常后,进入该步骤。带着相关业务信息通知发送方重新发送该消息。
在第1)步中,第一条消息是发送给下游业务方的,为立即发送;第二条消息延迟5分钟后进行发送,是发送给callback service来作为检查条件,查看下游服务是否处理完毕该消息。
相比于第一种方案,在主业务流程中减少了DB操作,将对消息的保存移动到了callback service中。
幂等性
幂等性,就是一条数据无论经过多少次该方法的操作,对于我们拿到的结果都是一样的。
常见的幂等性解决方案:
-
唯一ID+指纹码
,利用数据主键来去重;
其中的指纹码,比如时间戳,业务规则,或者相关业务字段等。
先查询,后插入。如果根据唯一ID+指纹码
查询出数量为0,而在插入时提示已经插入而失败了,此时放弃该操作,说明有别的服务器已经替我们做了这些事情。
好处:实现简单。
坏处:高并发DB写入瓶颈。
解决方案:分库分表。 - 利用redis原子性去重。
分布式锁和exists操作。
return消息机制
在某些情况下,我们发送的消息,当前的exchange或routing不存在或者不可达,这个时候就需要return listener。
通过设置mandatory
属性来完成。该属性为true,当消息不可达时,会触发return机制;为false时,该消息删除,不报告错误。
//第三个参数就是mandatory:true开启,false关闭
channel.basicPublish(exchangeName, routingKey, true,null, msg.getBytes());
消息限流机制
在消费者里面代码如下设置:
//prefetchSize 消息大小限制,一般设置为0,表示不限制
//prefetchCount 一次最多处理多少条消息,一般设置为1。在有这么多消息返回ack后,该consumer阻塞
//global true在channel上限制,false在consumer上限制。一般设置为false
channel.basicQos(0,1,false);
//autoAck是否自动签收
channel.basicConsume(queueName, false, new MyConsumer(channel));
在自定义的类MyConsumer中,添加如下配置:
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//如果上面的basicQos中第二个数值设置为1,则此处第二个值设置为false,大于1则设置为true。表示是否多个ack返回。
channel.basicAck(envelope.getDeliveryTag(),false);
}
}