RabbitMQ(十)消费者应答和发送者确认
文档:
介绍:
使用像RabbitMQ这样的消息代理的系统是分布式的,所以消息是否能到达对端或是被成功处理是无法保证的。
所以,无论生产者还是消费者,都需要一种消息传递和处理的确认机制。
消费者传递应答
当RabbitMQ发送一个消息给消费者,他需要知道消息是什么时候被成功投递了。
1)投递的标识:Delivery Tags
当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel.
delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag作为一个参数。
2)应答模式
根据所使用的确认模式,RabbitMQ可以考虑在发送(写入TCP套接字)之后立即成功传送消息,或者接收到显式(“手动”)客户机确认时成功传送。 手动发送的确认可以是肯定的或否定的,并且使用以下协议方法之一:
- basic.ack is used for positive acknowledgements
- basic.nack is used for negative acknowledgements
- basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack
积极的应答告诉RabbitMQ记录一个消息被投递了,像 basic.reject 这样的消极应答有着同样的作用。
积极应答假定消息被成功处理,消极应答表示投递没被处理,还是要被删掉。
3)一次应答多个投递
为了减少网络流量,手动应答可以被批处理。
ack unack 的mutiple参数设置为true,则可以一次性应答delivery_tag小于等于传入值的所有应答。
4)通道预取设置(QoS)
因为消息发送到客户端是异步的,在任何给定时刻在信道上通常存在多个消息竞争。此外,来自客户端的手动应答本质上也是异步的。所以总是存在一个消息未确认的滑动窗口。开发人员通常希望限制此窗口的大小,以避免消费者端的无界缓冲区问题。
可以通过使用 basicQos 这个方法来设置预取的个数。这个数值定义了一个通道最多有多少个未确认的消息。
值得重申的是,投递流程和手动客户端确认是完全异步的。 因此,如果在投递中已经有消息的情况下改变预取值,则会出现自然竞争条件,并且在信道上可能暂时存在多于预取未确认消息数量。
5)客户端错误:双重应答和未知 tag
如果客户端对同一个 delivery tag 应答超过一次,rabbitMQ会返回一个通道错误:
PRECONDITION_FAILED - unknown delivery tag 100
如果一个未知的 delivery tag 被使用的话,会返回同样的错误。
发送者确认
使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务 - 使通道事务,发布消息,提交。 在这种情况下,事务不必要地重量级,并且吞吐量减少了250倍。为了补救这一点,引入了确认机制。 它模仿协议中已经存在的消费者确认机制。
为了启用确认机制,客户端发送 confirm.select 方法。 根据是否设置了 no-wait,代理可以用confirm.select-ok进行响应。 一旦在一个通道上使用 confirm.select 方法,它就被认为处于确认模式。 事务通道不能进入确认模式,一旦通道处于确认模式,它不能进行事务。
一旦通道处于确认模式,代理和客户端计数消息(计数从第一个确认选择的1开始计数)。 然后,代理程序在通过在同一个通道上发送basic.ack来处理消息时确认消息。 delivery-tag字段包含确认消息的序列号。 代理还可以在basic.ack中设置多个字段,以指示已经处理了直到并且包括具有序列号的消息的所有消息。
1)消极确认
在特殊情况下,代理无法成功处理消息,代理将发送basic.nack而不是basic.ack。 在这个上下文中,basic.nack的字段与basic.ack中的相应字段具有相同的含义,并且应忽略requeue字段。 通过nack一个或多个消息,代理指示它不能处理消息并拒绝对它们负责; 在这一点上,客户端可以选择重新发布消息。
在将通道置于确认模式后,所有后续发布的消息将被确认或nack一次。 不能保证消息被多久确认一次。 没有消息将被确认和nack。
只有在负责队列的Erlang进程中发生内部错误时,才会传递basic.nack。
2)消息多久被确认
对于一个不可路由的消息,一旦交换器证实消息不可能路由到任何队列,代理会发布一个确认。
如果消息发布设置为mandatory,basic.return在basic.ack之前发送到客户端。 对于否定确认(basic.nack)也是如此。
对于可路由消息,当所有队列接受消息时,发送basic.ack。 对于路由到持久队列的持久消息,这意味着持久存储到磁盘。 对于镜像队列,这意味着所有镜像都接受了消息。
3)持久化消息的ACK延迟
路由到持久队列的持久消息的basic.ack是在将消息保存到磁盘后才发送的。 RabbitMQ消息存储器在间隔(几百毫秒)后将消息批量保存到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。 这意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。 为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息,并等待未完成的确认。 具体的API在不同的客户端库之间有所不同。
4)确认和保证投递
如果代理在消息写入磁盘之前崩溃,将丢失持久消息。 在某些条件下,这导致代理行为诡异。
例如,考虑这种情况:
- 客户端向持久队列发布持久消息
- 客户端消费了队列中的消息(注意消息是持久的,队列持久),但是还没有确认
- 代理挂了并重启
- 客户端重连并开始消费
在这一点上,客户端可以合理地假设消息将被再次传送。
在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。
限制
1)最大 Delivery Tag
传递标记是一个64位长的值,因此其最大值为9223372036854775807。由于Delivery Tag唯一标识每个通道的每次投递,所以,发送者或客户端在实践中不太可能超过此值。
代码示例
package com.xc.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Created by xc.
*/
public class PublisherConfirms {
private static final String QUEUE_NAME = "publisher-confirms";
private static final int MSG_COUNT = 10;
private static ConnectionFactory factory;
static {
factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
}
public static void main(String[] args) throws Exception {
// Publish MSG_COUNT messages and wait for confirms.
(new Thread(new Consumer())).start();
// Consume MSG_COUNT messages.
(new Thread(new Publisher())).start();
}
static class Publisher implements Runnable {
volatile SortedSet<Long> ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
public void run() {
try {
long startTime = System.currentTimeMillis();
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
for (long i = ackSet.first(); i <= deliveryTag; ++i) {
System.out.println("handle ack multiple, tag : " + deliveryTag);
ackSet.remove(i);
}
} else {
System.out.println("handle ack, tag : " + deliveryTag);
ackSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("handle nack, tag : " + deliveryTag);
}
});
// Publish
for (long i = 0; i < MSG_COUNT; ++i) {
ackSet.add(i);
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"nop".getBytes());
System.out.println("send msg : " + "nop");
}
// Wait
while (ackSet.size() > 0)
Thread.sleep(10);
// Cleanup
channel.close();
connection.close();
long endTime = System.currentTimeMillis();
System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);
} catch (Throwable e) {
System.out.println("foobar :(");
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
public void run() {
try {
// Setup
Connection conn = factory.newConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME, true, false, false, null);
// Consume
QueueingConsumer qc = new QueueingConsumer(ch);
ch.basicConsume(QUEUE_NAME, true, qc);
for (int i = 0; i < MSG_COUNT; ++i) {
QueueingConsumer.Delivery delivery = qc.nextDelivery();
System.out.println("got msg : " + new String(delivery.getBody()));
}
// Consume
ch.close();
conn.close();
} catch (Throwable e) {
System.out.println("Whoosh!");
e.printStackTrace();
}
}
}
}