死信队列 DLX,延时队列实现

2023-03-22  本文已影响0人  爱吃豆包

死信队列:(英文名)DLX, Dead-Letter-Exchange

利用DLX, 当消息在一个队列中变成死信(dead message) 之后,它能被重新publish到另一个Exchange, 这个Exchange就是DLX

什么是死信队列?

就是你的消息发布后,没有消费者去消费!就变成了死信了!在任何MQ产品中都有这个情况!

在RabbitMQ中,变成死信队列后,它会把这个消息重新publish到另一个Exchange中,这个Exchange就是DLX

消息变成死信有以下几种情况:

1.消息被拒绝(basic.reject / basic.nack) 并且requeue=fasle (队列重发设为了false)

2.消息TTL过期,(超过任何消息有效限制之后,就变成了死信

3.队列达到最大长度。(如果消息的最大大小满了,后面的消息就被送进死信队里中)

步骤:

1.DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性(这个Exchange只需要正常的去定义就好, 和平常没却别)

2.当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列

3.可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数的功能!

死信队列的设置:

首先需要设置死信队列的Exchange和Queue,然后进行绑定:

Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey:#

然后正常的声明交换机、队列、绑定,只不过我们需要在队列加上一个参数:
argument.put("x-dead-letter-exchange", “dlx.exchange”)
这个参数表示,当消息正常的路由到队列的时候,但是没有一个消费者去消费的话,就会被重新路由到 dlx.exchange 交换机上
这样消息在过期,requeue、队列在达到最大长度时,消息就可以直接路由到死信队列!

延时队列实现

可以通过死信队列来实现!给消息设置 TTL 有效期时间,比如设置 10 秒,那么在 10 秒后,这个消息成为了死信,然后会转发到另外一个 交换机上进行消息处理!

生产者

package com.example.rabbitmqapi.dlx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 DLX
 *
 *      消息变成死信有以下几种情况:
 *         1.消息被拒绝(basic.reject / basic.nack) 并且requeue=fasle (队列重发设为了false)
 *         2.消息TTL过期,(超过任何消息有效限制之后,就变成了死信)
 *         3.队列达到最大长度。(如果消息的最大大小满了,后面的消息就被送进死信队里中)
 *
 * 消费者
 *
 *      消费端 ACK 和 重回队列
 *
 * @author weiximei
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 创建channel
        Channel channel = connection.createChannel();


        // 4. 声明Exchange

        String exchangeName = "test_dlx_exchange";
        String exchangeType = "topic";
        String routingKey = "dlx.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);

        // 5. 声明消息队列
        String queueName = "test_dlx_queue";

        /**
         * 声明死信队里的属性,出现死信后,会重新发送到这个 dlx.exchange 上
         */
        Map<String,Object> agruments = new HashMap<>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");

        // 需要把 agruments 属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);

        // 6. 绑定队列和Exchange
        channel.queueBind(queueName, exchangeName, routingKey);


        /**
         * 声明死信队列
         *
         *      接下来,就可以更具接收到的死信消息,做相应的处理, 也就做一个消息的接收,处理消息(消费者)
         *      在这里只是声明了,并没对消息做处理
         *
         */
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#"); // # 表示所有队里都匹配接收


        // 7. 设置消费者为自定义的消费者, 手工签收ACK,则必须 autoAck=false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }

}

 

消费者

 
package com.example.rabbitmqapi.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 DLX
 *
 *
 * 生产者
 *
 *      在消费端的 ACK 和 重回队列
 *
 * @author weiximei
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        // 1. 创建ConnectionFactory, 并设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 创建channel
        Channel channel = connection.createChannel();


        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.saye";

        // 发送消息
        String msg = "自定义消费者, 消息发送 : Hello, DLX ";
        for (int i = 0; i < 5; i++) {

            // 携带额外的参数
            Map<String,Object> headerMap = new HashMap<>();
            headerMap.put("num", i);

            // 添加额外的属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 是否持久化 1 不持久化,2 持久化
                    .contentEncoding("UTF-8")  // 设置字符集
                    .headers(headerMap).build();

            /**
             * mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
             */
            // exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
            // routingKet 表示key键,发送到哪一个队列
            // mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
            // props 表示消息的其他属性 (BasicProperties)
            // body 表示消息内容
            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
        }

        // 关闭连接
        channel.close();
        connection.close();

    }

}


自定义消费者

package com.example.rabbitmqapi.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * 自定义消费者
 *
 * @author weiximei
 */
public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    // 第一参数:consumerTag 同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。
    //      因此 envelope.deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。
    // 第二个参数: envelope 表示消息类型信息
    // 第三个参数:properties 表示消息路由头的其他属性等
    // 第三个参数: body 消息内容
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("-------------自定义消费者------------");
        System.out.println("consumerTag : " + consumerTag);
        System.out.println("envelope : " + envelope);
        System.out.println("properties : " + properties);
        System.out.println("body : " + new String(body));

        // 获取消息的额外参数
        Integer num = (Integer) properties.getHeaders().get("num");
        // 比如我让这个的额外参数等于 0 的时候,就进行重回队列
        if (num == 0) {
            // deliveryTag 参数是消息唯一标识
            // multiple 是否是批量的,一般为false
            // requeue 是否重回队列, true 是,false 不是
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {

            /**
             * 手工签收, 第二个参数表示是否批量签收
             */
            // envelope.getDeliveryTag() 是消息的唯一标识
            channel.basicAck(envelope.getDeliveryTag(), false);
        }

    }
}
 

上一篇下一篇

猜你喜欢

热点阅读