RabbitMQ消息确认(二)——消费者接收消息手动ACK
消息接收的确认机制主要有三种模式:
-
自动确认
AcknowledgeMode.NONE
RabbitMQ
成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch
捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。 -
根据情况确认
AcknowledgeMode.AUTO
这也是SpringBoot
集成RabbitMQ
默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。 -
手动确认
AcknowledgeMode.MANUAL
这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basicAck/basicNack/basicReject
后,RabbitMQ
收到这些消息后,才认为本次投递成功。
1、创建手动确认消息的队列
@Configuration
public class DirectRabbitConfig {
//Direct交换机 起名:directExchange
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange", true, false);
}
//需要手动确认消息的队列
@Bean
public Queue manualAckQueue() {
return new Queue("manualAckQueue", true, false, false);
}
//手动确认消息的队列和直连交换机绑定
@Bean
public Binding bindingDirectForManualAck() {
return BindingBuilder.bind(manualAckQueue()).to(directExchange()).with("manualAck");
}
}
2、手动确认消息的监听实现
2.1、通过配置实现
spring:
rabbitmq:
host: 148.70.153.63
port: 5672
username: libai
password: password
listener:
simple:
# 手动确认
acknowledge-mode: manual
# 拒绝消息是否重回队列
default-requeue-rejected: true
2.2、配置类实现(更加灵活)
@Configuration
@Slf4j
public class MessageManualAckListenerConfig {
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置需要手动确认消息的队列,可以同时设置多个,前提是队列需要提前创建好
container.setQueueNames("manualAckQueue");
// 设置监听消息的方法,匿名内部类方式
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
// 开始消费消息
log.info("body:\n{}", JSONUtil.toJsonPrettyStr(new String(message.getBody())));
log.info("prop:\n{}", JSONUtil.toJsonPrettyStr(message.getMessageProperties()));
// 手动确认
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 肯定确认
});
return container;
}
}
body:接收的消息内容。
messageProperties:消息的相关属性。
3、发送消息测试手动确认
3.1、调用接口
@RestController
public class MessageManualAckController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/manualAck")
public String manualAck() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("messageData", "manualAck");
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("directExchange", "manualAck", JSONUtil.toJsonStr(map));
return "ok";
}
}
3.2、查看控制台打印输出。
2020-09-17 22:35:28,635 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:32] [] body:
{
"createTime": "2020-09-17 22:35:28",
"messageId": "25398d1a-474e-48cd-a3df-460b780e9d97",
"messageData": "manualAck"
}
2020-09-17 22:35:28,636 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:33] [] prop:
{
"headers": {
"spring_listener_return_correlation": "a6622c5e-22f0-4f39-bea5-4360ef8de66b"
},
"finalRetryForMessageWithNoId": false,
"contentLengthSet": false,
"deliveryTag": 3,
"receivedExchange": "directExchange",
"priority": 0,
"receivedRoutingKey": "manualAck",
"redelivered": false,
"consumerTag": "amq.ctag-cqCpyMhe9Ak2vuv_RifFlQ",
"receivedDeliveryMode": "PERSISTENT",
"publishSequenceNumber": 0,
"contentEncoding": "UTF-8",
"contentLength": 0,
"contentType": "text/plain",
"consumerQueue": "manualAckQueue",
"deliveryTagSet": true
}
3.3、消费消息时的状态变化
通过打断点方式查看当消息未被确认时在RabbitMQ server
中的状态。
4、确认/拒绝消息
4.1、basicAck
确认消息。
第2个参数如果设为true
,则表示批量确认当前通道中所有deliveryTag
小于当前消息的所有消息。
4.2、basicNack
拒绝消息。
第2个参数如果设为true
,则表示批量拒绝当前通道中所有deliveryTag
小于当前消息的所有消息。
第3个参数如果设为true
,则表示当前消息再次回到队列中等待被再次消费。
4.3、basicReject
拒绝消息。与basicNack
作用类似,只不过一次只能拒绝单条消息。
对于拒绝消息并且重回队列使用时需要谨慎,避免使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。