RabbitMQ死信队列

2024-01-22  本文已影响0人  h2coder

死信

Reject和Nack是什么?

// RabbitMQ是否需要重新发送给别的消费者
channel.basicReject(envelope.getDeliveryTag(), false);

死信交换机

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

使用死信交换机,接收死信

// 死信交换机
@Bean
public DirectExchange dlExchange() {
    return new DirectExchange("dl.direct");
}

// 死信队列
@Bean
public Queue dlQueue() {
    return new Queue("dl.queue");
}

// 绑定死信交换机和队列
@Bean
public Binding dlBinding() {
    return BindingBuilder
    // 队列
    .bind(dlQueue())
    // 交换机
    .to(dlExchange())
    // 配置routingKey
    .with("dl");
}
@Bean
public Queue simpleQueue() {
    return QueueBuilder.durable("simple.queue").
        deadLetterExchange("dl.direct").
        deadLetterRoutingKey("dl").build();
}
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(Message msg, Channel channel) throws Exception {
        log.debug("--消费者接收到simple.queue的消息:【" + msg + "】");
        // 模拟异常
        try {
            int i =1/0;
        } catch (Exception e) {
           channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
        }
        log.debug("消息处理完成!");
    }
}

TTL(Time To Live 生存时间值)

@Configuration
public class TTLConfig {
    // 声明死信交换机 dl.direct
    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.direct", true, false);
    }
    
    // 声明存储死信的队列 dl.queue
    @Bean
    public Queue ttlQueue(){
    // return new Queue("ttl.queue", true);
      return   QueueBuilder.durable("ttl.queue")
              .deadLetterExchange("dl.ttl.exchange")
              .deadLetterRoutingKey("dl")
              .ttl(5000)
              .build();
    }
    
    // 将死信队列与死信交换机绑定
    @Bean
    public Binding ttlBinding(){
        return BindingBuilder
        // 队列
        .bind(ttlQueue())
        // 交换机
        .to(ttlExchange())
        // routingKey
        .with("ttl");
    }
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.exchange"),
    key = "dl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
@Test
public void testTTLMessageQueue()  {
    try {
        String routingKey = "ttl";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend("ttl.direct", routingKey, message);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

总结

上一篇 下一篇

猜你喜欢

热点阅读