消息队列之RabbitMQ-消息确认机制

2020-07-16  本文已影响0人  冰河winner

RabbitMQ的消息确认有两种:

1、投递确认

RabbitMQ对生产者发送消息的确认有两种实现方式:

1.1 confirm

生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障。

1.png

一旦 channel 处于 confirm 模式,broker 和 client 都将启动消息计数(以 confirm.select 为基础从 1 开始计数)。broker 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm 。delivery-tag 域的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的 multiple 域来表明到指定序列号为止的所有消息都已被 broker 正确的处理了。

confirm在代码实现上分为两步:

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReturnListeningProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
      
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "item.update";
        String errRoutingKey = "error.update";

        //指定消息的投递模式:confirm 确认模式
        channel.confirmSelect();

        //发送
        for (int i = 0; i < 3 ; i++) {
            String msg = "this is return——listening msg ";
            //@param mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除
            if (i == 0) {
                channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
            } else {
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
            System.out.println("Send message : " + msg);
        }

        //添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功的回调函数
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("succuss ack");
            }
            /**
             * 返回失败的回调函数
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.printf("defeat ack");
            }
        });

        //添加一个 return 监听
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("return body: " + new String(body));
            }
        });

    }
}

以上采用的异步 confirm 模式,除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,但是现实场景中很少使用。

而后面的Return Listener 用于处理些不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,如果需要监听这种不可达的消息,就要使用 Return Listener。

在基础API中有一个关键的配置项Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broker 端自动删除该消息。

1.2 事务

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.transation.config.MqConfig;
 
public class TxSend {
 
    private static final String QUEUE_NAME = "test_queue_tx";
 
    public static void send() throws Exception {
        Connection connection = MqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "hello tx!";
        try {
            // 用户将当前channel设置成transaction
            channel.txSelect();
 
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
 
            // 异常,使回滚
            int x=1/0;
 
            System.out.println("【发送端】消息:"+msg);
            //提交事务
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("发生异常,回滚");
        }
        channel.close();
        connection.close();
    }
}

如果RabbitMQ返回ack失败,生产端也无法确认消息是否真的发送成功,也会造成数据丢失。最好的办法是使用RabbitMQ的事务机制,但是RabbitMQ的事务机制效率极低,每秒钟处理的消息仅几百条,不适合并发量大的场景。

为了达到消息的可靠投递,还可以借助外部工具实现,比如redis:

再比如利用队列与定时任务:

不通过设置redis超时时间触发超时事件进行重发,而是取出消息放入一个ackFailList中,然后开启定时任务,扫描ackFailList,重发失败的msg。

如果把List保存在内存中,不具备持久化的功能,并不安全,可以考虑保存到数据库中,防止消息丢失。

2、消费确认

为了保证消息能可靠到达消费端,RabbitMQ也提供了消费端的消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

import com.rabbitmq.client.*;
import java.io.IOException;

public class AckAndNackConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代码绑定,在管理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if ((Integer) properties.getHeaders().get("num") == 0) {
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //6. 设置 Channel 消费者绑定队列,设为非自动ack
        channel.basicConsume(queueName, false, consumer);

    }
}
上一篇下一篇

猜你喜欢

热点阅读