RabbitMQ之消息确认与Return机制
上一篇:https://www.jianshu.com/p/44b2885a5253
在上一篇中,笔者介绍了怎么让RabbitMQ如何保证数据不丢失, 但除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达RabbitMQ服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达RabbitMQ服务器的。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ 针对这个问题,提供了两种解决方式:
-
通过事务机制实现
-
通过发送方确认机制实现
一、事务机制
RabbitMQ 客户端中与事务机制相关的方法有3个: channel.txSelect,channel.txCommit,channel.txRollback
。channel.txSelect 用于开启事务, channel.txCommit 用于提交事务; channel.txRollback 用于回滚事务。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回滚。
public static void main(String[] args) throws IOException {
//...
//开启事务
channel.txSelect();
try{
channel.basicPublish(exchange,"key-1",null,"发送路由key为 = key-1 的消息".getBytes());
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
//...
}
输出结果:发送成功
流程图
根据上图可以看出开启事务机制与未开启事务机制多了四个步骤:
- 1、客户端发送 Tx.Select ,将信道置为事务模式。
- 2、 Broker 回复 Tx.Select-Ok ,确认己将信道置为事务模式。
- 3、在发送完消息之后,客户端发送 Tx.Commit 提交事务。
- 4、 Broker回复 Tx.Commit.Ok ,确认事务提交。
下面来看一下事务回滚,上代码
//开启事务
channel.txSelect();
try{
channel.basicPublish(exchange,"key-1",null,"发送路由key为 = key-1 的消息".getBytes());
int i = 1/0;
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
输出结果:发送失败,进行日志记录
image.png
在事务提交之前捕获到异常,之后显式地提交事务回滚。
如果要发送多条消息,则将 channel.basicPublish,channel.txCommit 等方法包裹进循环内即可。
//开启事务
channel.txSelect();
for (int a = 0; a < 10; a++) {
try{
channel.basicPublish(exchange,"key-1",null,"发送路由key为 = key-1 的消息".getBytes());
int i = 1/0;
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会降低RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢? 下面就来介绍RabbitMQ提供另外一种方式:发送方确认机制。
发送方确认机制
注:发送方确认机制是确认生产者是否成功发送消息到交换机
一、原理
生产者通过调用channel.confirmSelect 方法将channel设置成confirm模式,一旦channel进入confirm模式,所有在该channel上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中的deliver-tag包含了确认消息的序号,此外broker也可以设置basic.ack的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等channel返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack消息。
在channel被设置成 confirm模式之后,所有被发送的后续消息都将被 ack或者被nack一次。,不会出现一条消息既被 ack 又被 nack 情况,并且 RabbitMQ 没有对消息被 confirm 的快慢做任何保证。
原生api方式
1、普通confirm模式
每发送一条消息后就调用 channe.waitForConfirms方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。和事务机制一样。
//将信道置为 publisher confirm 模式
channel.confirmSelect();
channel.basicPublish(exchange,"key-2",null,"发送路由key为 = key-1 的消息".getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
如果发送多条消息,只需要将 channel.basicPublish、channel.waitForConfirms方法包裹在循环里面即可。
//将信道置为 publisher confirm 模式
channel.confirmSelect();
for (int i = 1; i < 10; i++) {
channel.basicPublish("exchange-1","key-3",null,"发送路由key为 = key-1 的消息".getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
}
2、批量confirm
每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回(也是同步的,只是一次发送多条信息,然后统一确定)。
//将信道置为 publisher confirm 模式
channel.confirmSelect();
for (int i = 1; i < 10; i++) {
channel.basicPublish("exchange-1", "key-1", null, "发送路由key为 = key-1 的消息".getBytes());
}
//批量确认信息,发送的消息中,如果有一条是失败的,则所有消息发送都会失败
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
3、异步confirm
异步 confirm 方法的编程实现最为复杂。在客户端 Channel 接口中提供的
addConfirmListener方法可以添加 ConfirmListener这个回调接口,这个
ConfirmListener 接口包含两个方法: handleAck、handleNack,分别用来处理RabbitMQ 回传的 Basic.Ack、Basic.Nack 。在这两个方法中都包含有两个参数 deliveryTag(标记消息的唯一有序序号)
、multiple(是否批量confirm true代表是)
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange-1", "key-1", null, "发送路由key为 = key-1 的消息".getBytes());
}
channel.addConfirmListener(new ConfirmListener() {
//参数一:deliveryTag: 消息的编号
//参数二:multiple:是否批量confirm true 是
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
//注意这里需要添加处理消息重发的场景
}
});
System.out.println("其他逻辑");
//异步就不需要关闭连接了
SpringBoot(2.2.4.RELEASE)
在yml中配置是否需要消息确认
publisher-confirm-type: CORRELATED
publisher-confirm-type有三个选项:
- NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
编码
实现ConfirmCallback
@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {
Logger logger = LoggerFactory.getLogger(InfoConfirm.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 此方法用于监听消息是否发送到交换机
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
logger.info("消息成功发送到交换机");
logger.info("id = {} ",correlationData.getId());
byte[] body = correlationData.getReturnedMessage().getBody();
logger.info("message = {}",new String(body));
}else {
logger.info("消息发送到交换机失败");
logger.info("cause = {}",cause);
logger.info("id = {} ",correlationData.getId());
byte[] body = correlationData.getReturnedMessage().getBody();
logger.info("message = {}",new String(body));
}
}
}
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
发送方法
@GetMapping
public void send(){
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
Message message = new Message("returnedMessage:哈哈哈".getBytes(),null);
correlation.setReturnedMessage(message);
rabbitTemplate.convertAndSend("exchange-1","key-1","发送消息",correlation);
}
Return机制
上面已经讲了发送方确认机制,我们已经知道发送方确认机制是确认生产者是否成功发送消息到交换机
。交换机是否发送到具体的队列那我们就不知道了。如果想知道交换机是否将消息发送到队列,就需要用到return机制:监控交换机是否将消息发送到队列
。
在客户端 Channel 接口中提供的addReturnListener方法,可以添加 ReturnListener这个回调接口,这个ReturnListener接口包含一个方法:handleReturn,用来处理交换机发送消息到队列失败,则执行此方法。
原生api
channel.confirmSelect();
/**
* mandatory:当mandatory 参数设为 true 时,
* 交换器无法根据自身的类型和路由键找到一个符合条件的队列时,
* 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。
* 当 mandatory参数设置为 false 时,出现上述情形,则消息直接被丢弃。
*/
channel.basicPublish("exchange-1", "key-4",true, null, "发送路由key为 = key-1 的消息".getBytes());
//消息确认机制
channel.addConfirmListener(new ConfirmListener() {
//参数一:deliveryTag: 消息的编号
//参数二:multiple:是否批量confirm true 是
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
//注意这里需要添加处理消息重发的场景
}
});
//Return机制
channel.addReturnListener(new ReturnListener() {
/*
* 参数1:响应code
* 参数2:响应文本
* 参数3:交换机名称
* 参数4:路由key
* 参数5:消息的基本属性集
* 参数6:消息内容
*/
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机发送消息到队列失败,则执行此方法
System.out.println("replyCode =" + replyCode);
System.out.println("replyText =" + replyText);
System.out.println("exchange =" + exchange);
System.out.println("routingKey =" + routingKey);
System.out.println("properties =" + properties);
System.out.println("body =" + new String(body));
}
});
System.out.println("其他逻辑");
image.png
注:调用channel.basicPublish时,需要将mandatory参数设置为true
SpringBoot
在yml中配置开启Return机制
publisher-returns: true
编码
实现 ReturnCallback 接口
@Component
public class Return implements RabbitTemplate.ReturnCallback {
Logger logger = LoggerFactory.getLogger(InfoConfirm.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
*/
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
}
//处理交换机发送消息到队列失败,则执行此方法。
@Override
public void returnedMessage(Message message,
int replyCode, String replyText,
String exchange, String routingKey) {
logger.info("交换机到队列失败=====》");
logger.info("message = {}",new String(message.getBody()));
logger.info("replyCode = {}",replyCode);
logger.info("replyText = {}",replyText);
logger.info("exchange = {}",exchange);
logger.info("routingKey = {}",routingKey);
}
}
发送消息
@GetMapping
public void send(){
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
Message message = new Message("returnedMessage:哈哈哈".getBytes(),null);
correlation.setReturnedMessage(message);
rabbitTemplate.convertAndSend("exchange-1","key-55","发送消息",correlation);
}
草图一张
https://blog.csdn.net/u013256816/article/details/55515234