RabbitMQ(十)消费者应答和发送者确认

2016-11-06  本文已影响9632人  薛晨

文档:

https://www.rabbitmq.com/confirms.html

介绍:
使用像RabbitMQ这样的消息代理的系统是分布式的,所以消息是否能到达对端或是被成功处理是无法保证的。

所以,无论生产者还是消费者,都需要一种消息传递和处理的确认机制。

消费者传递应答

当RabbitMQ发送一个消息给消费者,他需要知道消息是什么时候被成功投递了。

1)投递的标识:Delivery Tags

当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel.

delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag作为一个参数。

2)应答模式

根据所使用的确认模式,RabbitMQ可以考虑在发送(写入TCP套接字)之后立即成功传送消息,或者接收到显式(“手动”)客户机确认时成功传送。 手动发送的确认可以是肯定的或否定的,并且使用以下协议方法之一:

积极的应答告诉RabbitMQ记录一个消息被投递了,像 basic.reject 这样的消极应答有着同样的作用。

积极应答假定消息被成功处理,消极应答表示投递没被处理,还是要被删掉。

3)一次应答多个投递

为了减少网络流量,手动应答可以被批处理。

ack unack 的mutiple参数设置为true,则可以一次性应答delivery_tag小于等于传入值的所有应答。

4)通道预取设置(QoS)

因为消息发送到客户端是异步的,在任何给定时刻在信道上通常存在多个消息竞争。此外,来自客户端的手动应答本质上也是异步的。所以总是存在一个消息未确认的滑动窗口。开发人员通常希望限制此窗口的大小,以避免消费者端的无界缓冲区问题。

可以通过使用 basicQos 这个方法来设置预取的个数。这个数值定义了一个通道最多有多少个未确认的消息。

值得重申的是,投递流程和手动客户端确认是完全异步的。 因此,如果在投递中已经有消息的情况下改变预取值,则会出现自然竞争条件,并且在信道上可能暂时存在多于预取未确认消息数量。

5)客户端错误:双重应答和未知 tag

如果客户端对同一个 delivery tag 应答超过一次,rabbitMQ会返回一个通道错误:

PRECONDITION_FAILED - unknown delivery tag 100

如果一个未知的 delivery tag 被使用的话,会返回同样的错误。

发送者确认

使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务 - 使通道事务,发布消息,提交。 在这种情况下,事务不必要地重量级,并且吞吐量减少了250倍。为了补救这一点,引入了确认机制。 它模仿协议中已经存在的消费者确认机制。

为了启用确认机制,客户端发送 confirm.select 方法。 根据是否设置了 no-wait,代理可以用confirm.select-ok进行响应。 一旦在一个通道上使用 confirm.select 方法,它就被认为处于确认模式。 事务通道不能进入确认模式,一旦通道处于确认模式,它不能进行事务。

一旦通道处于确认模式,代理和客户端计数消息(计数从第一个确认选择的1开始计数)。 然后,代理程序在通过在同一个通道上发送basic.ack来处理消息时确认消息。 delivery-tag字段包含确认消息的序列号。 代理还可以在basic.ack中设置多个字段,以指示已经处理了直到并且包括具有序列号的消息的所有消息。

1)消极确认

在特殊情况下,代理无法成功处理消息,代理将发送basic.nack而不是basic.ack。 在这个上下文中,basic.nack的字段与basic.ack中的相应字段具有相同的含义,并且应忽略requeue字段。 通过nack一个或多个消息,代理指示它不能处理消息并拒绝对它们负责; 在这一点上,客户端可以选择重新发布消息。

在将通道置于确认模式后,所有后续发布的消息将被确认或nack一次。 不能保证消息被多久确认一次。 没有消息将被确认和nack。

只有在负责队列的Erlang进程中发生内部错误时,才会传递basic.nack。

2)消息多久被确认

对于一个不可路由的消息,一旦交换器证实消息不可能路由到任何队列,代理会发布一个确认。

如果消息发布设置为mandatory,basic.return在basic.ack之前发送到客户端。 对于否定确认(basic.nack)也是如此。

对于可路由消息,当所有队列接受消息时,发送basic.ack。 对于路由到持久队列的持久消息,这意味着持久存储到磁盘。 对于镜像队列,这意味着所有镜像都接受了消息。

3)持久化消息的ACK延迟

路由到持久队列的持久消息的basic.ack是在将消息保存到磁盘后才发送的。 RabbitMQ消息存储器在间隔(几百毫秒)后将消息批量保存到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。 这意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。 为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息,并等待未完成的确认。 具体的API在不同的客户端库之间有所不同。

4)确认和保证投递

如果代理在消息写入磁盘之前崩溃,将丢失持久消息。 在某些条件下,这导致代理行为诡异。

例如,考虑这种情况:

在这一点上,客户端可以合理地假设消息将被再次传送。

在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。

限制

1)最大 Delivery Tag

传递标记是一个64位长的值,因此其最大值为9223372036854775807。由于Delivery Tag唯一标识每个通道的每次投递,所以,发送者或客户端在实践中不太可能超过此值。

代码示例

package com.xc.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * Created by xc.
 */
public class PublisherConfirms {

    private static final String QUEUE_NAME = "publisher-confirms";

    private static final int MSG_COUNT = 10;

    private static ConnectionFactory factory;

    static {
        factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
    }


    public static void main(String[] args) throws Exception {
        // Publish MSG_COUNT messages and wait for confirms.
        (new Thread(new Consumer())).start();
        // Consume MSG_COUNT messages.
        (new Thread(new Publisher())).start();

    }

    static class Publisher implements Runnable {

        volatile SortedSet<Long> ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

         public void run() {
             try {
                 long startTime = System.currentTimeMillis();

                 // 创建一个新的连接
                 Connection connection = factory.newConnection();
                 // 创建一个频道
                 Channel channel = connection.createChannel();

                 channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                 channel.confirmSelect();

                 channel.addConfirmListener(new ConfirmListener() {
                     public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                         if (multiple) {
                             for (long i = ackSet.first(); i <= deliveryTag; ++i) {
                                 System.out.println("handle ack multiple, tag : " + deliveryTag);
                                 ackSet.remove(i);
                             }
                         } else {
                             System.out.println("handle ack, tag : " + deliveryTag);
                             ackSet.remove(deliveryTag);
                         }
                     }

                     public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                         System.out.println("handle nack, tag : " + deliveryTag);
                     }
                 });

                 // Publish
                 for (long i = 0; i < MSG_COUNT; ++i) {
                     ackSet.add(i);
                     channel.basicPublish("", QUEUE_NAME,
                             MessageProperties.PERSISTENT_TEXT_PLAIN,
                             "nop".getBytes());
                     System.out.println("send msg : " + "nop");
                 }

                 // Wait
                 while (ackSet.size() > 0)
                     Thread.sleep(10);

                 // Cleanup
                 channel.close();
                 connection.close();

                 long endTime = System.currentTimeMillis();
                 System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);

             } catch (Throwable e) {
                 System.out.println("foobar :(");
                 e.printStackTrace();
             }
         }

    }

    static class Consumer implements Runnable {

        public void run() {
            try {
                // Setup
                Connection conn = factory.newConnection();
                Channel ch = conn.createChannel();
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);

                // Consume
                QueueingConsumer qc = new QueueingConsumer(ch);
                ch.basicConsume(QUEUE_NAME, true, qc);
                for (int i = 0; i < MSG_COUNT; ++i) {
                    QueueingConsumer.Delivery delivery = qc.nextDelivery();
                    System.out.println("got msg : " + new String(delivery.getBody()));
                }

                // Consume
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("Whoosh!");
                e.printStackTrace();
            }
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读