07-RabbitMQ持久化
2021-09-24 本文已影响0人
紫荆秋雪_文
一、概念
手动应答解决的是任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非预先设定不这样做。确保消息不会丢失需要做两件事:“队列和消息标记为持久化”。
二、队列持久化
1、队列持久化代码设置
/**
* 创建一个队里
*
* 队列名称
* 队列是否持久化
* 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
* 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
2、重启生成者
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
- 队列修改参数后,需要删除掉已经存在的同名的旧队列,否则报上述错误
3、生成消息
/**
* 消息生产者
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
// 创建一个Channel
Channel channel = ConnectionUtils.getConnection().createChannel();
/**
* 创建一个队里
*
* 队列名称
* 队列是否持久化 true:持久化
* 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
* 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
//准备消息
String msg = scanner.next();
/**
* 发布消息
*
* 发送到那个交换机
* 路由key
* 其他参数
* 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("发送消息完成:" + msg);
}
}
}
- 生成消息
11
发送消息完成:11
22
发送消息完成:22
33
发送消息完成:33
44
发送消息完成:44
55
发送消息完成:55
- 队列 image.png
-
现在消息队列中的消息
image.png - 重启RabbitMQ image.png
- 队列依然在 image.png
- 消息队列中消息 image.png
小结:队列持久化成功,但是要想队列中消息也存在需要消息也要持久化
三、消息持久化
1、消息生产者代码设置
/**
* 发布消息
*
* 发送到那个交换机
* 路由key
* 其他参数
* 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
2、测试流程与上面队列持久化一样 image.png
3、重启RabbitMQ服务 image.png
image.png三、不公平分发
RabbitMQ默认的分发消息使用的是轮询分发,但是在有些场景下是不妥的,向上面的测试代码中在正常情况下发送的消息11、22、33、44、55、66,Consumer1和Consumer2会平分消费上面的消息,但是由于Consumer2中处理速度低,这就造成Consumer1处理完消息空闲等待,而Consumer2还没有去不执行完消息;这就浪费了Consumer1的处理能。所以需要“不公平分发”,让“能者多劳”。
1、在消费端代码设置
int prefetchCount = 1;
channel.basicQos(prefetchCount);
2、Producer
/**
* 消息生产者
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
// 创建一个Channel
Channel channel = ConnectionUtils.getConnection().createChannel();
/**
* 创建一个队里
*
* 队列名称
* 队列是否持久化 true:持久化
* 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
* 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
//准备消息
String msg = scanner.next();
/**
* 发布消息
*
* 发送到那个交换机
* 路由key
* 其他参数
* 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("发送消息完成:" + msg);
}
}
}
3、Consumer1
/**
* 接收消息
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
// 创建一个Channel
Channel channel = ConnectionUtils.getConnection().createChannel();
int prefetchCount = 1;
channel.basicQos(prefetchCount);
System.out.println("C1等待接收消息。。。。。");
// 接收消息回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = new String(message.getBody());
System.out.println(msg);
// 单个应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 取消消息回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息消费被中断。。。");
};
/**
* 消费消息
*
* 消息队列
* 消费成功之后是否要自动应答
* 消费成功/失败回调
*/
//标记消息手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
4、Consumer2
/**
* 接收消息
*/
public class Consumer2 {
// 队列名称
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
// 创建一个Channel
Channel channel = ConnectionUtils.getConnection().createChannel();
int prefetchCount = 1;
channel.basicQos(prefetchCount);
System.out.println("C2等待接收消息。。。。。");
// 接收消息回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = new String(message.getBody());
System.out.println(msg);
// 单个应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 取消消息回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息消费被中断。。。");
};
/**
* 消费消息
*
* 消息队列
* 消费成功之后是否要自动应答
* 消费成功/失败回调
*/
//标记消息手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
5、消息生成
11
发送消息完成:11
22
发送消息完成:22
33
发送消息完成:33
44
发送消息完成:44
55
发送消息完成:55
66
发送消息完成:66
6、Consumer1消费情况
22
7、Consumer2消费情况
11
33
44
55
66