rabbitmq -- 消息的可靠性

2020-11-28  本文已影响0人  爱码士吴小佳

rabbitmq作为我们系统之间沟通的桥梁,消息的可靠性就显得格外重要。
假如rabbitmq crash掉之后重新启动,原本的交换机、队列、消息都会消失,如果我们队列中存在一些很重要的消息的时候,我们并不愿意这样的事情发生,这就需要借助rabbitmq的持久化机制。

rabbitmq的持久化

交换机、队列设置可以为持久化,消息的投递模式也可以选择持久化(三个持久化)。
持久化的消息在进入持久化队列之后会写入到rabbitmq的持久性日志文件中,消息被消费掉后,会把持久性日志文件中该消息标记为等待垃圾收集
在broker重启之后,持久化的交换机、队列会重新初始化,持久化的消息会投递到原先的队列上

怎么做到消息持久化

三个持久化:
交换机持久化、队列持久化、消息持久化

代码示例:

<?php

try{
    $config = [
        "host" => "127.0.0.1",
        "port" => "5672",
        "login" => "wuj13",
        "password" => "Aaa294515505",
        "vhost" => "test_vhost"
    ];
    $conn = new AMQPConnection($config);
    //连接rabbitmq
    if(!$conn->connect()){
       throw new Exception("连接失败");
    }
    //创建信道
   $channel = new AMQPChannel($conn);
    //创建交换机
    $exchange = new AMQPExchange($channel);
    $exchangeName = "exchange_one";
    //设置交换机名称
    $exchange->setName($exchangeName);
    //设置交换机类型
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    //设置为持久化交换机
    $exchange->setFlags(AMQP_DURABLE);
    //生成交换机
    $exchange->declareExchange();


    //创建队列
    $queueName ="queue_test";
    $queue = new AMQPQueue($channel);
    //设置队列名称
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    //生成队列
    $queue->declareQueue();
    //队列绑定交换机
    $queue->bind($exchangeName,$queueName);


    $message = "hello world";
    //向交换机发布消息
    $re = $exchange->publish($message,$queueName,AMQP_NOPARAM,["delivery_mode" => 2]);
    if($re){
        echo "成功发送消息".$message.PHP_EOL;
    }
    else{
        throw new Exception("发送消息失败");
    }

}
catch(Exception $exception){
    echo "操作异常 ".$exception->getMessage();
}
finally{
    $channel->close();
    $conn->disconnect();
}



其中
$exchange->setFlags(AMQP_DURABLE)设置交换机为durable代表持久类型
$queue->setFlags(AMQP_DURABLE)设置队列为持久类型
$exchange->publish($message,$queueName,AMQP_NOPARAM,["delivery_mode" => 2])向交换机发送消息时设置消息的delivery_mode为2代表消息是持久类型的

代码运行后,我们到管理界面去看看,发现有一条持久化的消息


image.png

我们把rabbitmq服务给重启一下,交换机、队列、消息,全部都还在,代表我们的消息持久化已经基本做好了,大家可以自己去试一试。
但我们不妨想一想,如果我们的broker在把消息写入日志文件的时候crash掉了,那我们的消息不就丢了吗?对的,其实这是有很小的概率会出现这样的情况,这个时候可以借助mirror queue来解决这个问题,这个我们后面会专门讲这个东西,不然篇幅就太长了。

那我们先认为很大程度上已经解决了rabbitmq这一端因为宕机而导致消息丢失的问题,但是我们还面临着两个问题:
1 我们的程序怎么保证我们的消息能够成功到达队列
2 怎么保证我们的消息消费成功的
我们一个个来解决!

我们先看一幅图


image.png

消息要到达队列中并处以稳定状态(已持久化),要经过三个流程
1 消息到达交换机
2 交换机把消息路由到队列中
3 消息持久化写入日志
第一个流程可能由于网络的波动、延迟等原因,消息没有准确达到交换机
第二个流程可能routing key错误等原因,消息没有准确到达队列
第三个流程可能在消息写入日志的过程中,broker crash掉了,重新后消息丢失。
也就是说,在没有其他措施的情况下,客户端发送消息这个操作,消息如果想要在队列中处于稳定地状态,要经过三个独立的环节,并不是一个原子操作,三个环节中的任意一个环节出错,都有可能导致消息丢失,而我们的生产者对此一无所知,会影响我们业务的正常进行。

看到原子化,我们首先想到用事务来解决这个问题,但是在rabiitmq中,事务会很大幅度地降低性能,我们来看一个对比数据:
在同样开启持久化的情况下,没有开启事务,写入10万条数据仅需11秒

image.png 微信截图_20201128222508.png

而在同样的条件下,我们模拟10万次事务的独立开启提交,写入10万条数据需要208秒!

image.png 微信截图_20201128222108.png

这相差了接近20倍的性能,所以一般情况下我们不会选择事务来解决消息发送可能失败的问题,我们选择"发送者确认模式"来解决这个问题。

发送者确认模式

我们可以通过$channel->confirmSelect() 来开启信道的发送者确认模式,开启之后,信道会为每一条消息分配一个唯一的消息id,即delivery_tag,从0开始,依次是1,2,3这样,每个信道之间的delivery_tag是相互独立的。
我们先来了解两个东西:ack 、nack
ack :
在发送者确认模式中,有两种情况服务端会给客户端发送ack确认信号
1 消息到达交换机,交换机发现该消息可以路由到队列,便将消息路由到队列,并向客户端发送一个ack信号(注意:消息如果是持久化的,需要等持久化完成后,才会向客户端发送ack信号。如果是开启了镜像队列,需要所有镜像中的队列收到消息后,才会发送ack信号),即消息成功入列且稳定后会向客户端发送ack信号
2 消息到达交换机,交换机发现根据消息的routing key,无法将该消息路由到任何队列,就会向客户端发送一个ack信号。如果发送消息时,我们指定了mandatory参数为true,指定该消息为强制到达,那么在向客户端发送ack信号之前,还有将该消息返回给客户端,顺序是 "消息" ->ack

nack :
消息到达交换机,交换机路由消息的过程中,由于系统内部错误而无法继续下去的时候,就会向客户端发送一个nack信号,告诉客户端,这个消息处理不了。

经过以上的阐述,我们要怎么知道这条消息到底有没有成功到达队列并处于稳定呢?
我们发现
ack无法作为消息是否成功入列的判断标准!!!
ack无法作为消息是否成功入列的判断标准!!!
ack无法作为消息是否成功入列的判断标准!!!
重要的事情说三遍
消息成功入列会给客户端发ack信号,消息无法路由到队列也会给客户端发ack信号,但是这两者是有区别的。成功入列的话是单纯发送"ack",而如果消息无法路由到队列则是发送 "消息" + "ack",且"消息"比ack信号先到

那我们的程序中拿什么来作为消息是否成功入列的判断标准呢?相信各位看客心中已经有了答案。
没错,如果返回了"消息",那么自然是失败的。如果没有返回消息体,单纯返回ack ,那就是成功的。废话不多说,上代码

<?php

try{
    $config = [
        "host" => "127.0.0.1",
        "port" => "5672",
        "login" => "wuj13",
        "password" => "Aaa294515505",
        "vhost" => "test_vhost"
    ];
    $conn = new AMQPConnection($config);
    //连接rabbitmq
    if(!$conn->connect()){
       throw new Exception("连接失败");
    }
    //创建信道
    $channel = new AMQPChannel($conn);

    //开启消息确认模式
    $channel->confirmSelect();
    $isSuccess = true;
    //设置回调方法
        $channel->setConfirmCallback(function($delivery_tag, $multiple) use ($channel,$conn,&$isSuccess){
        echo "收到ack回执:\n";
        echo "delivery_tag: $delivery_tag \n";
        echo "multiple:$multiple\n";
        if($isSuccess){
            echo "消息成功入列\n";
            /**
             * TODO
             * 1 数据库事务提交
             * 2 业务处理完成,返回成功
             */
        }
        else{
            echo "消息入列失败\n";
            /**
             * TODO
             * 1 数据库事务回滚
             * 2 失败消息记录日志
             * 3 返回失败
             */
        }
        $channel->close();
        $conn->disconnect();
        exit;

    },function($delivery_tag, $multiple,$requeue){
        echo "收到nack回执:delivery_tag: ".$delivery_tag. " multiple: $multiple"." requeue:".$requeue.PHP_EOL;
        /**
         * TODO
         * 重新发送消息
         */

    });

    //设置消息返还的回调
    $channel->setReturnCallback(function($reply_code, $reply_text, $exchange, $routing_key, $properties, $body) use(&$isSuccess){
        $isSuccess = false;
        //进行日志记录
        echo "收到return消息\n";
        echo "body: $body \n";
        echo "replay_code: $reply_code \n";
        echo "replay_text: $reply_text \n";
    });

    //创建交换机
    $exchange = new AMQPExchange($channel);
    $exchangeName = "exchange_one";
    //设置交换机名称
    $exchange->setName($exchangeName);
    //设置交换机类型
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    //设置为持久化交换机
    $exchange->setFlags(AMQP_DURABLE);
    //生成交换机
    $exchange->declareExchange();


//    //创建队列
    $queueName ="queue_test";
    $queue = new AMQPQueue($channel);
    //设置队列名称
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    //生成队列
    $queue->declareQueue();
    //队列绑定交换机
    $queue->bind($exchangeName,$queueName);

    $message = "hello world";
    for($i = 1; $i<=1;$i++){
       $exchange->publish("message".$i,"123",AMQP_MANDATORY,["delivery_mode" => 2]);
    }
    $channel->waitForConfirm();

}
catch(Exception $exception){
    echo "操作异常 ".$exception->getMessage();
    $channel->close();
    $conn->disconnect();
}

代码解析:
$channel->confirmSelect() 我们使用confirmSelect来开启信道的发送者确认模式(注意,信道一旦开启发送者确认模式,就无法使用事务,这两者不兼容)

$channel->setConfirmCallback() 是设置回调,参数一是ack回调,服务端返回ack信号时触发。参数二是nack回调,服务端返回nack信号时触发

$channel->setReturnCallback() 是设置消息返还的回调,服务端返回"消息"时触发

$channel->waitForConfirm() 是进入阻塞模式等待服务端返回通知

至于在各个回调中应该怎么去处理业务,请注意看我代码中的注释!

先说说nack回调的处理
nack回调是属于rabbitmq系统内部错误时返回的信号,这种情况属于很少见,这个时候我们应该把消息进行重新发送

而ack回调跟return回调我们要结合起来,用来判断消息有没有成功入列
如果触发了return回调,代表消息必然路由失败,这时我们就把$isSuccess变量设置为false,便于后续到达的ack信号触发ack回调的时候进行成功与否的判断
ack回调中,如果发现$isSuccess为false,证明在ack信号达到前,已经触发过return回调了,证明消息是入列失败的,否则证明单纯只有ack信号达到,则是成功入列的。
如果消息入列失败,我们需要回滚我们的数据库事务,并把失败的消息记录日志,给前端返回错误。此时不会重新发送该消息,因为即使我们重新发送消息,也是无法路由的。
如果入列成功,那就提交数据库事务,然后给前端正常返回业务数据即可。
然后就是断开信道、rabbitmq的连接,中止阻塞进程。

上一篇下一篇

猜你喜欢

热点阅读