概念--Confirm确认消息

2023-03-20  本文已影响0人  爱吃豆包

理解Confirm消息确认机制:

消息的确认,是指生产者投递消息后,如果Broket(RabbitMQ消息中心)收到消息,则会给我生产者一个应答。

生产者进行接收应答,用来确定这条消息是否正常的发送到Broket,这种方式也是消息的可靠性投递的核心保障!

image.png

producer 生产者,MQ Broker就是RabbitMQ消息中心

生产者发送一条消息,到 Broker 里面,然后会有一个 Broker confirm 到生产者,生产者会有一个 Config Listener 监听!这个整个过程是异步的,也就是我发送了消息就可以不用管了,只需要监听就好了!

如何实现Confirm 确认消息?

第一步:在 channel 上开启确认模式:channel.confirmSelect();

第二部:在 cahnnel 上添加监听:addConfirmListener ,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或记录日志等后续处理!

什么情况下会 ack呢?

比如 磁盘满了,rabbitMQ出现了异常,key的容量达到上限了

如果我 Ack(确认) 和 Nack(没有确认) 都没有接受到消息呢?

需要用到 可靠性投递!

这个情况比如出现了,网络闪断,导致我没有收到,那么就需要用定时任务,去抓取状态,然后进行消息重发!(前提是基于 可靠性投递)

生产者

 package com.example.rabbitmqapi.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 *
 *      是在生产者处理
 *
 *      消息的确认,是指生产者投递消息后,如果Broket(RabbitMQ消息中心)收到消息,
 *      则会给我生产者一个应答。
 *      生产者进行接收应答,用来确定这条消息是否正常的发送到Broket,
 *      这种方式也是消息的可靠性投递的核心保障!
 *
 * @author weiximei on 2019-04-01
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // IP地址或者域名
        connectionFactory.setHost("192.168.1.118");
        // 端口号默认是 5672
        connectionFactory.setPort(5672);
        // 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("weiximei");
        connectionFactory.setPassword("weiximei");

        // 2.获取Connection
        Connection connection = connectionFactory.newConnection();

        // 3. 创建 channel,也就是连接信道
        Channel channel = connection.createChannel();

        /**
         * 4.指定我们的消息确认模式:消息的确认模式
         */
        channel.confirmSelect();

        // 5. 声明
        // 交换机名称
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm_save";

        String msg = "Hello RabbitMQ,Save confirm";

        // 6.发送消息

        /**
         * channel.basicPublish 方法有个其他的重载方法
         * 其中有个参数
         *
         * mandatory的作用:
         *         当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
         *         那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,
         *         出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
         *         否则就将消息return给发送者;
         *
         */

        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());


        // 7.添加一个确认监听
        //异步监听确认和未确认的消息
        channel.addConfirmListener(new ConfirmListener() {

            /**
             * 异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,
             * 此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,
             * 如果为false的话表示单条确认。
             */

            /**
             *
             * @param deliveryTag 消息的唯一标签,用来确认消息,标记消息
             * @param multiple 如果true表示批量执行了deliveryTag这个值以前的所有消息,
             *                  如果为false的话表示单条确认。
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
            }

            /**
             *
             * @param deliveryTag 消息的唯一标签,用来确认消息,标记消息
             * @param multiple 如果true表示批量执行了deliveryTag这个值以前的所有消息,
             *                 如果为false的话表示单条确认。
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(" ------  no ack! ------  没有被ack确认,标识: " + deliveryTag);
            }
        });





    }





}


package com.example.rabbitmqapi.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 *
 *      confirm 是在生产端
 *
 * @author weiximei on 2019-04-01
 */
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // IP地址或者域名
        connectionFactory.setHost("192.168.1.118");
        // 端口号默认是 5672
        connectionFactory.setPort(5672);
        // 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("weiximei");
        connectionFactory.setPassword("weiximei");

        // 2.获取Connection
        Connection connection = connectionFactory.newConnection();

        // 3. 创建 channel,也就是连接信道
        Channel channel = connection.createChannel();

        /**
         * 4.指定我们的消息确认模式:消息的确认模式
         */
        channel.confirmSelect();

        // 5. 声明
        // 交换机名称
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm_save";
        String queueName = "test_confirm_queue";

        // 6.声明 exchange
        // 交换机名称,交换机类型,是否持久化
        channel.exchangeDeclare(exchangeName, "topic",true);

        // 7. 声明 队列
        /**
         * declare 表示是否持久化消息类型
         * exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
         *           表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
         *           场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
         * autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
         * arguments 其他的一些参数
         */
        channel.queueDeclare(queueName, true, false, false,null);


        // 8.绑定队列 (也就是把队列绑定到某交换机)
        channel.queueBind(queueName, exchangeName, routingKey);

        // 9.创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        // 创建消费者
        // 消费的队列,是否自动ack,消费者是谁
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("接收到的消息:" + msg);
        }

    }

}

上一篇下一篇

猜你喜欢

热点阅读