rabbitMQ-死信

2020-10-12  本文已影响0人  这是一个假的程序员

1.什么是死信(Dead Letter)?

官方文档:https://www.rabbitmq.com/dlx.html

一个消息变成死信的条件有:

1.消费被拒绝(basic.reject 或者 basic.nack),并且参数 requeue = false 时

2.消息TTL(存活时间)过期

3.队列达到最大长度

rabbitMQ对于死信消息的处理是:如果配置了死信队列,成为死信的消息会被丢进死信队列,如果没有则被丢弃。

2.配置死信队列

死信队列不是特殊的队列,它只是绑定在了死信交换机上而已,并且死信交换机也不是特殊的交换机,它也只是用来接收死信消息的

一般分为以下步骤:

1.配置业务队列,绑定到业务交换机上

2.为业务队列配置死信交换机和路由

3.配置死信队列,绑定到死信交换机上

模拟其中一种条件来进行实战!

配置类:

@Configuration
public class RabbitMQConfig {
    /**
     * 业务交换机
     */
    public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
    /**
     * 业务队列
     */
    public static final String BUSINESS_QUEUE_NAME = "business.queue";
    /**
     * 死信交换机
     */
    public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
    /**
     * 死信队列
     */
    public static final String DEAD_QUEUE_NAME = "dead.queue";

    public static final String DEAD_ROUTING_KEY = "dead.routing.key";

    /**
     * 声明业务交换机
    **/
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    /**
     * 声明业务队列
     *
    **/
    @Bean("businessQueue")
    public Queue businessQueue(){
        Map<String, Object> args = new HashMap<>(2);
        //声明绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        //声明死信路由key
        args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
    }

    /**
     * 业务队列绑定到业务交换机
     *
    **/
    @Bean
    public Binding businessBind(@Qualifier("businessQueue")Queue businessQueue,
                                @Qualifier("businessExchange")FanoutExchange businessExchange){
        return BindingBuilder.bind(businessQueue).to(businessExchange);
    }

    /**
     * 声明死信交换机
     **/
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_EXCHANGE_NAME);
    }

    /**
     * 声明死信队列
     *
     **/
    @Bean("deadQueue")
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE_NAME);
    }

    /**
     * 死信队列绑定到死信交换机
     *
     **/
    @Bean
    public Binding deadBind(@Qualifier("deadQueue")Queue deadQueue,
                                @Qualifier("deadExchange")DirectExchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
    }
}
exchange
businessExchange
deadExchange

业务队列监听

@Component
public class BusinessListener {
    Logger logger = LoggerFactory.getLogger(getClass());

    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE_NAME)
    public void receive(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        logger.info("收到消息:" + msg);
        if (msg.contains("dead")){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

死信队列监听

@Component
public class DeadListener {
    Logger logger = LoggerFactory.getLogger(getClass());

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_NAME)
    public void receive(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        logger.info("收到消息:" + msg);
    }
}

写一个controller来生产消息,方便测试

@RestController
@RequestMapping("my")
public class MyController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("send")
    public void send(@RequestParam("msg")String msg){
        rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,"", msg);
    }
}

发送一个消息


发送消息“msg”

结果


结果
发送一个死信消息
发送消息“deadmsg”

结果


结果
上一篇下一篇

猜你喜欢

热点阅读