MQ

07-RabbitMQ持久化

2021-09-24  本文已影响0人  紫荆秋雪_文

一、概念

手动应答解决的是任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非预先设定不这样做。确保消息不会丢失需要做两件事:“队列和消息标记为持久化”。

二、队列持久化

1、队列持久化代码设置

        /**
         * 创建一个队里
         *
         * 队列名称
         * 队列是否持久化
         * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
         * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

2、重启生成者

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

3、生成消息

/**
 * 消息生产者
 */
public class Producer {

    // 队列名称
    private static final String QUEUE_NAME = "ack_queue";


    public static void main(String[] args) throws Exception {

        // 创建一个Channel
        Channel channel = ConnectionUtils.getConnection().createChannel();

        /**
         * 创建一个队里
         *
         * 队列名称
         * 队列是否持久化  true:持久化
         * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
         * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);


        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            //准备消息
            String msg = scanner.next();

            /**
             * 发布消息
             *
             * 发送到那个交换机
             * 路由key
             * 其他参数
             * 发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("发送消息完成:" + msg);
        }
    }

}
11
发送消息完成:11
22
发送消息完成:22
33
发送消息完成:33
44
发送消息完成:44
55
发送消息完成:55

小结:队列持久化成功,但是要想队列中消息也存在需要消息也要持久化

三、消息持久化

1、消息生产者代码设置

 /**
   * 发布消息
   *
   * 发送到那个交换机
   * 路由key
   * 其他参数
   * 发送消息的消息体
   */
  channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

2、测试流程与上面队列持久化一样 image.png

3、重启RabbitMQ服务 image.png

image.png

三、不公平分发

RabbitMQ默认的分发消息使用的是轮询分发,但是在有些场景下是不妥的,向上面的测试代码中在正常情况下发送的消息11、22、33、44、55、66,Consumer1和Consumer2会平分消费上面的消息,但是由于Consumer2中处理速度低,这就造成Consumer1处理完消息空闲等待,而Consumer2还没有去不执行完消息;这就浪费了Consumer1的处理能。所以需要“不公平分发”,让“能者多劳”。

1、在消费端代码设置

int prefetchCount = 1;
channel.basicQos(prefetchCount);

2、Producer

/**
 * 消息生产者
 */
public class Producer {

    // 队列名称
    private static final String QUEUE_NAME = "ack_queue";


    public static void main(String[] args) throws Exception {

        // 创建一个Channel
        Channel channel = ConnectionUtils.getConnection().createChannel();

        /**
         * 创建一个队里
         *
         * 队列名称
         * 队列是否持久化  true:持久化
         * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
         * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);


        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            //准备消息
            String msg = scanner.next();

            /**
             * 发布消息
             *
             * 发送到那个交换机
             * 路由key
             * 其他参数
             * 发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
            System.out.println("发送消息完成:" + msg);
        }
    }

}

3、Consumer1

/**
 * 接收消息
 */
public class Consumer1 {

    // 队列名称
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        // 创建一个Channel
        Channel channel = ConnectionUtils.getConnection().createChannel();

        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        System.out.println("C1等待接收消息。。。。。");

        // 接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String msg = new String(message.getBody());
            System.out.println(msg);
            // 单个应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        // 取消消息回调
        CancelCallback cancelCallback = (String consumerTag) -> {
            System.out.println("消息消费被中断。。。");
        };

        /**
         * 消费消息
         *
         * 消息队列
         * 消费成功之后是否要自动应答
         * 消费成功/失败回调
         */
        //标记消息手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

4、Consumer2

/**
 * 接收消息
 */
public class Consumer2 {

    // 队列名称
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        // 创建一个Channel
        Channel channel = ConnectionUtils.getConnection().createChannel();
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        System.out.println("C2等待接收消息。。。。。");

        // 接收消息回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String msg = new String(message.getBody());
            System.out.println(msg);
            // 单个应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        // 取消消息回调
        CancelCallback cancelCallback = (String consumerTag) -> {
            System.out.println("消息消费被中断。。。");
        };

        /**
         * 消费消息
         *
         * 消息队列
         * 消费成功之后是否要自动应答
         * 消费成功/失败回调
         */
        //标记消息手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

5、消息生成

11
发送消息完成:11
22
发送消息完成:22
33
发送消息完成:33
44
发送消息完成:44
55
发送消息完成:55
66
发送消息完成:66

6、Consumer1消费情况

22

7、Consumer2消费情况

11
33
44
55
66
上一篇下一篇

猜你喜欢

热点阅读