RabbitMQ 如何保证消息的可靠性
队列持久化
// 队列消息持久化
boolean durable = true;
channel.queuDeclare = (ACK_QUEUE_NAME,durable,flase,false,null);
上面的代码就是进行消息持久话,当然还有其他写法,例如:
@Bean
public Queue directProductQueue(){
return QueueBuilder.durable(队列名);
其他写法不一一赘述。
如果队列A之前没有持久化,重启RabbitMQ后,队列会消息,并且,在代码里将队列A改为了持久化,需要先将原来的队列删除掉,否则会报错。
持久化后,在控制台中会显示"D",这样的话,即使重启RabbitMQ,队列A也会照样存在。
消息持久化
队列持久化并不能让消息持久化,如果RabbitMQ宕机,重启后,持久化后的队列还会存在,因为消息默认保存在内存中,所以消息会丢失,如果想让消息不丢失,或者丢失的少,最好将消息进行持久化,需要在生产段进行配置
Message message1 = MessageBuilder.withBody(msgBody.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setContentEncoding("UTF-8")
.setCorrelationId(msgId).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(exchange, routingKey, message1,correlationData);
以上代码中,setDeliveryMode(MessageDeliveryMode.PERSISTENT) 就是将消息进行了持久化。即使RabbitMQ宕机,消息也不会全部丢失,为什么不能保证全部不丢失呢?因为在一种极端情况下,例如RabbitMQ在将消息写入磁盘的过程中,RabbitMQ宕机,此时,还未写入磁盘的部分消息就会丢失。
当然有很多方法可以保证消息尽可能不丢失,例如生产者发送消息后立马将消息写入数据库,即使RabbitMQ让部分消息丢失,我们也可以通过数据库里的消息进行补偿,例如重发消息,但是发消息时同时写库,对性能会有一些影响。
发布确认
什么是发布确认,发布确认就是生产者发布的消息被投递到指定队列后,broker会通过回调函数告诉生产者消息投递成功了,要注意,这只是消息投递成功了,而不是消费成功。
发布确认是否开启需要自己手动设置,比如可以在application.yml中设置如下:
rabbitmq:
addresses: xx.xx.xx.x
port: 5672
username: xxx
password: xxxxxxxx
publisher-confirms: true #是否开启回调
-
单个确认
单个确认发布属于同步确认,发一条消息确认一次,缺点是发布消息比较慢,这种方式最多提供每秒不超过数百条的发布消息吞吐量。 -
批量确认
相比于单个确认,批量确认极大的提高了吞吐量,但是当发生故障时,不能确定哪条消息出了问题,同时,批量确认也是同步的。 -
异步确认
异步确认不会同步等待broker的确认信息,异步响应broker的确认信息。
先贴一下代码:
@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
// 设置生产者消息确认
rabbitTemplate.setConfirmCallback(this);
}
/**
* 消息发送到 Broker 后触发回调
*
* @param correlationData bean
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 如果消息被确认了,走一套逻辑
} else {
//如果消息没有被确认,是否补偿?持久化到数据库还是定期处理? correlationData.getId()
}
}
对于消息被确认还是没有被确认的具体处理逻辑需要自己去写,你可以在发消息前将消息先存入redis或者MySQL,为每一条消息设置一个唯一的id(可以用UUID、雪花算法等等),就是correlationData.getId(),当消息没有被确认,可以拿着这个唯一的id将完整的消息取出来,做消息补偿还是只是记录错误日志自己定夺。
手动ack
消息到达队列后,准备被消费者消费,消息被成功消费后,即业务处理完成后,进行手动ack,RabbitMQ默认是自动ack的,就是只要开始消费,就会被自动ack,自动ack后,队列中对应的这条消息就没了,生产中最好用手动ack。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
如果在消费过程中出现了问题,可以将消息reject,reject后可以选择消息重新入队或者消息直接被丢弃,下面代码中的 false 表示不重新入队,如果重新入队,可能会带来一个问题,就是如果这条消息永远会在被消费的过程中产生错误,那么这条消息就会不断地被重新入队,会造成死循环。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
死信队列
消息在被消费的过程中发生错误怎么办呢,重新入队有风险,可以将消息发到死信队列进行处理,不影响原队列。
先说一下什么是死信,就是由于某些原因导致队列中的某些消息无法被消费,这些消息如果没有后期的处理,就会变成死信,用来处理死信的队列就是死信队列,当然死信队列还可以当作延迟队列用。
设置死信队列的方法可以参考下方代码:
@Configuration
public class RabbitConfig {
// 交换机
public static final String EXCHANGE_TEST= "exchangeTest";
// 路由键
public static final String ROUTING_KEY_TEST = "routingKeyTest";
// 队列
public static final String DIRECT_QUEUE_TEST = "direct.queuetest";
/**
* 交换机
**/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_TEST);
}
/**
* 队列
**/
@Bean
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE_TEST)
//死信交换机声明
.withArgument("x-dead-letter-exchange", DeadMQConfig.DIRECT_DEAD_EXCHANGE_NAME)
//死信消息的路由key
.withArgument("x-dead-letter-routing-key", DeadMQConfig.DIRECT_DEAD_ROUTING_KEY_NAME)
.build();
}
/**
* Binding,将该routing key的消息通过交换机转发到该队列
*/
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTING_KEY_TEST );
}
}
参考:
[1] 尚硅谷-《消息中间件RabbitMQ》
[2] https://blog.csdn.net/qq_32662795/article/details/88742397
[3] https://www.cnblogs.com/he-erduo/p/13558308.html