rabbitmq -- 消息的可靠性
rabbitmq作为我们系统之间沟通的桥梁,消息的可靠性就显得格外重要。
假如rabbitmq crash掉之后重新启动,原本的交换机、队列、消息都会消失,如果我们队列中存在一些很重要的消息的时候,我们并不愿意这样的事情发生,这就需要借助rabbitmq的持久化机制。
rabbitmq的持久化
交换机、队列设置可以为持久化,消息的投递模式也可以选择持久化(三个持久化)。
持久化的消息在进入持久化队列之后会写入到rabbitmq的持久性日志文件中,消息被消费掉后,会把持久性日志文件中该消息标记为等待垃圾收集
在broker重启之后,持久化的交换机、队列会重新初始化,持久化的消息会投递到原先的队列上
怎么做到消息持久化
三个持久化:
交换机持久化、队列持久化、消息持久化
代码示例:
<?php
try{
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"login" => "wuj13",
"password" => "Aaa294515505",
"vhost" => "test_vhost"
];
$conn = new AMQPConnection($config);
//连接rabbitmq
if(!$conn->connect()){
throw new Exception("连接失败");
}
//创建信道
$channel = new AMQPChannel($conn);
//创建交换机
$exchange = new AMQPExchange($channel);
$exchangeName = "exchange_one";
//设置交换机名称
$exchange->setName($exchangeName);
//设置交换机类型
$exchange->setType(AMQP_EX_TYPE_DIRECT);
//设置为持久化交换机
$exchange->setFlags(AMQP_DURABLE);
//生成交换机
$exchange->declareExchange();
//创建队列
$queueName ="queue_test";
$queue = new AMQPQueue($channel);
//设置队列名称
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
//生成队列
$queue->declareQueue();
//队列绑定交换机
$queue->bind($exchangeName,$queueName);
$message = "hello world";
//向交换机发布消息
$re = $exchange->publish($message,$queueName,AMQP_NOPARAM,["delivery_mode" => 2]);
if($re){
echo "成功发送消息".$message.PHP_EOL;
}
else{
throw new Exception("发送消息失败");
}
}
catch(Exception $exception){
echo "操作异常 ".$exception->getMessage();
}
finally{
$channel->close();
$conn->disconnect();
}
其中
$exchange->setFlags(AMQP_DURABLE)
设置交换机为durable代表持久类型
$queue->setFlags(AMQP_DURABLE)
设置队列为持久类型
$exchange->publish($message,$queueName,AMQP_NOPARAM,["delivery_mode" => 2])
向交换机发送消息时设置消息的delivery_mode为2代表消息是持久类型的
代码运行后,我们到管理界面去看看,发现有一条持久化的消息
image.png
我们把rabbitmq服务给重启一下,交换机、队列、消息,全部都还在,代表我们的消息持久化已经基本做好了,大家可以自己去试一试。
但我们不妨想一想,如果我们的broker在把消息写入日志文件的时候crash掉了,那我们的消息不就丢了吗?对的,其实这是有很小的概率会出现这样的情况,这个时候可以借助mirror queue来解决这个问题,这个我们后面会专门讲这个东西,不然篇幅就太长了。
那我们先认为很大程度上已经解决了rabbitmq这一端因为宕机而导致消息丢失的问题,但是我们还面临着两个问题:
1 我们的程序怎么保证我们的消息能够成功到达队列
2 怎么保证我们的消息消费成功的
我们一个个来解决!
我们先看一幅图
image.png
消息要到达队列中并处以稳定状态(已持久化),要经过三个流程
1 消息到达交换机
2 交换机把消息路由到队列中
3 消息持久化写入日志
第一个流程可能由于网络的波动、延迟等原因,消息没有准确达到交换机
第二个流程可能routing key错误等原因,消息没有准确到达队列
第三个流程可能在消息写入日志的过程中,broker crash掉了,重新后消息丢失。
也就是说,在没有其他措施的情况下,客户端发送消息这个操作,消息如果想要在队列中处于稳定地状态,要经过三个独立的环节,并不是一个原子操作,三个环节中的任意一个环节出错,都有可能导致消息丢失,而我们的生产者对此一无所知,会影响我们业务的正常进行。
看到原子化,我们首先想到用事务来解决这个问题,但是在rabiitmq中,事务会很大幅度地降低性能,我们来看一个对比数据:
在同样开启持久化的情况下,没有开启事务,写入10万条数据仅需11秒
而在同样的条件下,我们模拟10万次事务的独立开启提交,写入10万条数据需要208秒!
image.png 微信截图_20201128222108.png这相差了接近20倍的性能,所以一般情况下我们不会选择事务来解决消息发送可能失败的问题,我们选择"发送者确认模式"来解决这个问题。
发送者确认模式
我们可以通过$channel->confirmSelect() 来开启信道的发送者确认模式,开启之后,信道会为每一条消息分配一个唯一的消息id,即delivery_tag,从0开始,依次是1,2,3这样,每个信道之间的delivery_tag是相互独立的。
我们先来了解两个东西:ack 、nack
ack :
在发送者确认模式中,有两种情况服务端会给客户端发送ack确认信号
1 消息到达交换机,交换机发现该消息可以路由到队列,便将消息路由到队列,并向客户端发送一个ack信号(注意:消息如果是持久化的,需要等持久化完成后,才会向客户端发送ack信号。如果是开启了镜像队列,需要所有镜像中的队列收到消息后,才会发送ack信号),即消息成功入列且稳定后会向客户端发送ack信号
2 消息到达交换机,交换机发现根据消息的routing key,无法将该消息路由到任何队列,就会向客户端发送一个ack信号。如果发送消息时,我们指定了mandatory参数为true,指定该消息为强制到达,那么在向客户端发送ack信号之前,还有将该消息返回给客户端,顺序是 "消息" ->ack
nack :
消息到达交换机,交换机路由消息的过程中,由于系统内部错误而无法继续下去的时候,就会向客户端发送一个nack信号,告诉客户端,这个消息处理不了。
经过以上的阐述,我们要怎么知道这条消息到底有没有成功到达队列并处于稳定呢?
我们发现
ack无法作为消息是否成功入列的判断标准!!!
ack无法作为消息是否成功入列的判断标准!!!
ack无法作为消息是否成功入列的判断标准!!!
重要的事情说三遍
消息成功入列会给客户端发ack信号,消息无法路由到队列也会给客户端发ack信号,但是这两者是有区别的。成功入列的话是单纯发送"ack",而如果消息无法路由到队列则是发送 "消息" + "ack",且"消息"比ack信号先到
那我们的程序中拿什么来作为消息是否成功入列的判断标准呢?相信各位看客心中已经有了答案。
没错,如果返回了"消息",那么自然是失败的。如果没有返回消息体,单纯返回ack ,那就是成功的。废话不多说,上代码
<?php
try{
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"login" => "wuj13",
"password" => "Aaa294515505",
"vhost" => "test_vhost"
];
$conn = new AMQPConnection($config);
//连接rabbitmq
if(!$conn->connect()){
throw new Exception("连接失败");
}
//创建信道
$channel = new AMQPChannel($conn);
//开启消息确认模式
$channel->confirmSelect();
$isSuccess = true;
//设置回调方法
$channel->setConfirmCallback(function($delivery_tag, $multiple) use ($channel,$conn,&$isSuccess){
echo "收到ack回执:\n";
echo "delivery_tag: $delivery_tag \n";
echo "multiple:$multiple\n";
if($isSuccess){
echo "消息成功入列\n";
/**
* TODO
* 1 数据库事务提交
* 2 业务处理完成,返回成功
*/
}
else{
echo "消息入列失败\n";
/**
* TODO
* 1 数据库事务回滚
* 2 失败消息记录日志
* 3 返回失败
*/
}
$channel->close();
$conn->disconnect();
exit;
},function($delivery_tag, $multiple,$requeue){
echo "收到nack回执:delivery_tag: ".$delivery_tag. " multiple: $multiple"." requeue:".$requeue.PHP_EOL;
/**
* TODO
* 重新发送消息
*/
});
//设置消息返还的回调
$channel->setReturnCallback(function($reply_code, $reply_text, $exchange, $routing_key, $properties, $body) use(&$isSuccess){
$isSuccess = false;
//进行日志记录
echo "收到return消息\n";
echo "body: $body \n";
echo "replay_code: $reply_code \n";
echo "replay_text: $reply_text \n";
});
//创建交换机
$exchange = new AMQPExchange($channel);
$exchangeName = "exchange_one";
//设置交换机名称
$exchange->setName($exchangeName);
//设置交换机类型
$exchange->setType(AMQP_EX_TYPE_DIRECT);
//设置为持久化交换机
$exchange->setFlags(AMQP_DURABLE);
//生成交换机
$exchange->declareExchange();
// //创建队列
$queueName ="queue_test";
$queue = new AMQPQueue($channel);
//设置队列名称
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
//生成队列
$queue->declareQueue();
//队列绑定交换机
$queue->bind($exchangeName,$queueName);
$message = "hello world";
for($i = 1; $i<=1;$i++){
$exchange->publish("message".$i,"123",AMQP_MANDATORY,["delivery_mode" => 2]);
}
$channel->waitForConfirm();
}
catch(Exception $exception){
echo "操作异常 ".$exception->getMessage();
$channel->close();
$conn->disconnect();
}
代码解析:
$channel->confirmSelect()
我们使用confirmSelect来开启信道的发送者确认模式(注意,信道一旦开启发送者确认模式,就无法使用事务,这两者不兼容)
$channel->setConfirmCallback()
是设置回调,参数一是ack回调,服务端返回ack信号时触发。参数二是nack回调,服务端返回nack信号时触发
$channel->setReturnCallback()
是设置消息返还的回调,服务端返回"消息"时触发
$channel->waitForConfirm()
是进入阻塞模式等待服务端返回通知
至于在各个回调中应该怎么去处理业务,请注意看我代码中的注释!
先说说nack回调的处理
nack回调是属于rabbitmq系统内部错误时返回的信号,这种情况属于很少见,这个时候我们应该把消息进行重新发送
而ack回调跟return回调我们要结合起来,用来判断消息有没有成功入列
如果触发了return回调,代表消息必然路由失败,这时我们就把$isSuccess
变量设置为false,便于后续到达的ack信号触发ack回调的时候进行成功与否的判断
ack回调中,如果发现$isSuccess
为false,证明在ack信号达到前,已经触发过return回调了,证明消息是入列失败的,否则证明单纯只有ack信号达到,则是成功入列的。
如果消息入列失败,我们需要回滚我们的数据库事务,并把失败的消息记录日志,给前端返回错误。此时不会重新发送该消息,因为即使我们重新发送消息,也是无法路由的。
如果入列成功,那就提交数据库事务,然后给前端正常返回业务数据即可。
然后就是断开信道、rabbitmq的连接,中止阻塞进程。