RabbitMQ丢消息的解决方案

2024-01-22  本文已影响0人  h2coder

3种丢消息的场景

发送消息到交换机或队列时,丢消息

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
@Slf4j
@Configuration
// 实现ApplicationContextAware接口,可以从已有的spring上下文取得已实例化的bean
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate实例
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        
        // 设置confirm callback,投递消息到交换机成功或失败,都会回调此方法
        // 注:如果投递成功,方法的ack参数为true,失败则为false
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息投递完成,ack = {}, cause = {}", ack, cause);
            }
        });
        
        // 设置return callback,从交换机投递到队列失败时,才会回调该方法
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // 记录日志
                log.info("消息投递失败,replyCode = {},replyText = {},exchange = {},routingKey = {},message = {}", replyCode, replyText, exchange, routingKey, message.toString());
            }
        });
    }
}

MQ软件重启,丢消息

消费者没有正常消费消息,丢消息

默认消费者收到消息后,MQ就会将消息从队列中删除,也就是阅后即焚,我们需要设置MQ的确认模式,一般我们可以设置为auto自动或manual手动,以下以手动为例

手动模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 确认模式有3种,manual、auto、none
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(Message msg, Channel channel) throws Exception {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
        // 模拟异常
        System.out.println(1 / 0);
        
        // 业务执行正常,才回复ack
        // 参数一:deliveryTag,也就是消息的标识,从msg中获取
        // 参数二:multiple,如果MQ是集群,true则是需要通知集群中的所有MQ
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
        
        log.debug("消息处理完成!");
    }
}

自动模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 确定模式:auto,为自动ack

重试次数

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒 2  4  8  16  32
          multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败策略

// 错误消息交换机
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}

// 错误消息队列
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}

// 绑定错误交换机和错误消息队列
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder
    // 队列
    .bind(errorQueue)
    // 交换机
    .to(errorMessageExchange)
    // 设置routingKey
    .with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
上一篇下一篇

猜你喜欢

热点阅读