RabbitMQ死信队列DLX应用

2020-09-28  本文已影响0人  砒霜拌辣椒

进入死信队列的场景:

  1. 消息被拒绝(basic.reject / basic.nack)并且 requeue = false
  2. 消息TTL过期(在RabbitMQ3.5.8版本之前,实现消息的延迟发送就是依靠消息过期进入死信队列然后进行消费来完成的);
  3. 队列达到最大长度;

1、使用原生API实现死信队列DLX的应用

1.1、生产者

@Slf4j
public class ProducerDLX {
    public static final String HOST = "148.70.153.63";
    public static final String USER_NAME = "libai";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(System.getProperty("password"));
        connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);

        // 创建连接和通道
        @Cleanup Connection connection = connectionFactory.newConnection();
        @Cleanup Channel channel = connection.createChannel();

        // 创建死信队列DLX
        String dlxExchangeName = "DLXExchange", dlxQueueName = "DLXQueue", dlxRoutingKey = "DLX";
        channel.exchangeDeclare(dlxExchangeName, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

        // 创建消息会自动过期的队列,并和指定的死信交换机绑定
        String exchangeName = "amq.direct", queueName = "TestDLXQueue", routingKey = "DLX";
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-message-ttl", 30 * 1000); // 设置队列里消息的ttl的时间30s
        argMap.put("x-dead-letter-exchange", dlxExchangeName); // 给队列设置死信交换机
        channel.queueDeclare(queueName, true, false, false, argMap);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 发送消息
        String msg = "测试死信队列";
        // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
        log.info("now:[{}],发送消息:[{}]", DateUtil.now(), msg);
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    }
}
  1. 创建死信队列DLXQueue并和指定交换机DLXExchange进行绑定(其实也是普通的队列、普通的交换机)。
  2. 创建另外一个正常的消息队列TestDLXQueue,设置队列的TTL过期时间,同时通过x-dead-letter-exchange属性指定死信交换机DLXExchange

1.2、测试消息过期进入死信队列

运行main函数,推送消息给TestDLXQueue队列。可以先看到消息先在TestDLXQueue队列中。

TestDLXQueue

等到30秒后没有被消费,则会把消息推送到DLXQueue死信队列中。

DLXQueue

1.3、死信队列的消费者

@Slf4j
public class ConsumerDLX {
    public static final String HOST = "148.70.153.63";
    public static final String USER_NAME = "libai";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(System.getProperty("password"));
        connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);

        // 创建连接和通道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info("now:[{}],消费消息:[{}]", DateUtil.now(), new String(body));
            }
        };
        channel.basicConsume("DLXQueue", true, consumer);
    }
}

1.4、运行测试

now:[2020-09-28 20:00:10],发送消息:[测试死信队列]
now:[2020-09-28 20:00:40],消费消息:[测试死信队列]

主要过程:
生产者 —> 原交换机amq.direct —> 原队列TestDLXQueue(超过 TTL 之后) —> 死信交换机DLXExchange —> 死信队列DLXQueue —> 最终消费者。

2、Springboot整合RabbitMQ实现死信队列DLX的应用

2.1、配置死信队列

@Bean
public Queue DLXQueue() {
    return new Queue("DLX_QUEUE", true, false, false);
}

@Bean
public DirectExchange DLXExchange() {
    return new DirectExchange("DLX_EXCHANGE", true, false);
}

@Bean
public Binding bindingDLX() {
    return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX");
}

创建死信队列DLX_QUEUE并和指定交换机DLX_EXCHANGE进行绑定(其实也是普通的队列、普通的交换机)。

2.2、配置消息队列

@Bean
public Queue testDLXQueue() {
    Map<String, Object> map = new HashMap<>();
    map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期
    map.put("x-dead-letter-exchange", "DLX_EXCHANGE"); // 给队列设置死信交换机
    return new Queue("TEST_DLX_QUEUE", true, false, false, map);
}

@Bean
public DirectExchange testDLXExchange() {
    return new DirectExchange("TEST_DLX_EXCHANGE", true, false);
}

@Bean
public Binding bindingTestDLX() {
    return BindingBuilder.bind(testDLXQueue()).to(testDLXExchange()).with("DLX");
}

创建另外一个正常的消息队列TEST_DLX_QUEUE,设置队列的TTL过期时间,同时通过x-dead-letter-exchange属性指定死信队列对应的交换机。

2.3、生产者

@RestController
public class DLXController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/testDLX")
    public String testDLX() {
        rabbitTemplate.convertAndSend("TEST_DLX_EXCHANGE", "DLX", "测试死信队列");
        return "ok";
    }
}

等到30秒后没有被消费,则会把消息推送到DLX_QUEUE死信队列中。

3、死信队列实现消息延迟发送的缺点

  1. 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟......需要创建很多队列来路由消息。
  2. 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞,即前一条消息没有出队(没有被消费),后面的消息无法投递。比如第一条消息过期TTL是30min,第二条消息TTL是10min。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递。
  3. 可能存在一定时间误差。

所以在RabbitMQ3.5.8版本之后,可以利用官方的rabbitmq-delayed-message-exchange插件来实现消息的延迟发送,可以避免上面所说的问题。
RabbitMQ实现消息延迟推送

参考链接

代码地址

上一篇 下一篇

猜你喜欢

热点阅读