概念--Confirm确认消息
2023-03-20 本文已影响0人
爱吃豆包
理解Confirm消息确认机制:
消息的确认,是指生产者投递消息后,如果Broket(RabbitMQ消息中心)收到消息,则会给我生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到Broket,这种方式也是消息的可靠性投递的核心保障!
image.pngproducer 生产者,MQ Broker就是RabbitMQ消息中心
生产者发送一条消息,到 Broker 里面,然后会有一个 Broker confirm 到生产者,生产者会有一个 Config Listener 监听!这个整个过程是异步的,也就是我发送了消息就可以不用管了,只需要监听就好了!
如何实现Confirm 确认消息?
第一步:在 channel 上开启确认模式:channel.confirmSelect();
第二部:在 cahnnel 上添加监听:addConfirmListener ,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或记录日志等后续处理!
什么情况下会 ack呢?
比如 磁盘满了,rabbitMQ出现了异常,key的容量达到上限了
如果我 Ack(确认) 和 Nack(没有确认) 都没有接受到消息呢?
需要用到 可靠性投递!
这个情况比如出现了,网络闪断,导致我没有收到,那么就需要用定时任务,去抓取状态,然后进行消息重发!(前提是基于 可靠性投递)
生产者
package com.example.rabbitmqapi.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*
* 是在生产者处理
*
* 消息的确认,是指生产者投递消息后,如果Broket(RabbitMQ消息中心)收到消息,
* 则会给我生产者一个应答。
* 生产者进行接收应答,用来确定这条消息是否正常的发送到Broket,
* 这种方式也是消息的可靠性投递的核心保障!
*
* @author weiximei on 2019-04-01
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// IP地址或者域名
connectionFactory.setHost("192.168.1.118");
// 端口号默认是 5672
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("weiximei");
connectionFactory.setPassword("weiximei");
// 2.获取Connection
Connection connection = connectionFactory.newConnection();
// 3. 创建 channel,也就是连接信道
Channel channel = connection.createChannel();
/**
* 4.指定我们的消息确认模式:消息的确认模式
*/
channel.confirmSelect();
// 5. 声明
// 交换机名称
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm_save";
String msg = "Hello RabbitMQ,Save confirm";
// 6.发送消息
/**
* channel.basicPublish 方法有个其他的重载方法
* 其中有个参数
*
* mandatory的作用:
* 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
* 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,
* 出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
* 否则就将消息return给发送者;
*
*/
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 7.添加一个确认监听
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,
* 此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,
* 如果为false的话表示单条确认。
*/
/**
*
* @param deliveryTag 消息的唯一标签,用来确认消息,标记消息
* @param multiple 如果true表示批量执行了deliveryTag这个值以前的所有消息,
* 如果为false的话表示单条确认。
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
}
/**
*
* @param deliveryTag 消息的唯一标签,用来确认消息,标记消息
* @param multiple 如果true表示批量执行了deliveryTag这个值以前的所有消息,
* 如果为false的话表示单条确认。
* @throws IOException
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println(" ------ no ack! ------ 没有被ack确认,标识: " + deliveryTag);
}
});
}
}
package com.example.rabbitmqapi.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*
* confirm 是在生产端
*
* @author weiximei on 2019-04-01
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// IP地址或者域名
connectionFactory.setHost("192.168.1.118");
// 端口号默认是 5672
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("weiximei");
connectionFactory.setPassword("weiximei");
// 2.获取Connection
Connection connection = connectionFactory.newConnection();
// 3. 创建 channel,也就是连接信道
Channel channel = connection.createChannel();
/**
* 4.指定我们的消息确认模式:消息的确认模式
*/
channel.confirmSelect();
// 5. 声明
// 交换机名称
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm_save";
String queueName = "test_confirm_queue";
// 6.声明 exchange
// 交换机名称,交换机类型,是否持久化
channel.exchangeDeclare(exchangeName, "topic",true);
// 7. 声明 队列
/**
* declare 表示是否持久化消息类型
* exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
* 表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
* 场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
* autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
* arguments 其他的一些参数
*/
channel.queueDeclare(queueName, true, false, false,null);
// 8.绑定队列 (也就是把队列绑定到某交换机)
channel.queueBind(queueName, exchangeName, routingKey);
// 9.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 创建消费者
// 消费的队列,是否自动ack,消费者是谁
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("接收到的消息:" + msg);
}
}
}