程序园

快速掌握RabbitMQ(三)——消息确认、持久化、优先级的C#

2019-06-15  本文已影响8人  编程小世界

1 消息确认

在一些场合,如转账、付费时每一条消息都必须保证成功的被处理。AMQP是金融级的消息队列协议,有很高的可靠性,这里介绍在使用RabbitMQ时怎么保证消息被成功处理的。消息确认可以分为两种:一种是生产者发送消息到Broke时,Broker给生产者发送确认回执,用于告诉生产者消息已被成功发送到Broker;一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于告诉消费者消息已成功被消费者接收。

看我主页简介免费C++学习资源,视频教程、职业规划、面试详解、学习路线、开发工具

每晚8点直播讲解C++编程技术。非常感谢大家的关注

下边分别介绍生产者端和消费者端的消息确认方法。准备条件:使用Web管理工具添加exchange、queue并绑定,bindingKey为“mykey”,如下所示:

1 生产者端消息确认(tx机制和Confirm模式)

生产者端的消息确认:当生产者将消息发送给Broker,Broker接收到消息给生产者发送确认回执。生产者端的消息确认有两种方式:tx机制和Confirm模式。

1.tx机制

tx机制可以叫做事务机制,RabbitMQ中有三个与tx机制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用于将当前channel设置成transaction模式, channel.txCommit()提交事务, channel.txRollback() 回滚事务。使用tx机制,我们首先要通过txSelect方法开启事务,然后发布消息给broker服务器了,如果txCommit提交成功了,则说明消息成功被broker接收了;如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们可以捕获异常,通过txRollback回滚事务。看一个tx机制的简单实现:

varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};//创建连接connectionusing(varconnection = factory.CreateConnection())            {//创建通道channelusing(varchannel = connection.CreateModel())                {                    Console.WriteLine("生产者准备就绪....");stringmessage ="";//发送消息//在控制台输入消息,按enter键发送消息while(!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))                    {                        message = Console.ReadLine();varbody = Encoding.UTF8.GetBytes(message);try{//开启事务机制channel.TxSelect();//发送消息channel.BasicPublish(exchange:"myexchange",                                                routingKey:"mykey",                                                basicProperties:null,                                                body: body);//事务提交channel.TxCommit();                            Console.WriteLine($"【{message}】发送到Broke成功!");                        }catch(Exception)                        {                            Console.WriteLine($"【{message}】发送到Broker失败!");                            channel.TxRollback();                        }                                          }                }            }            Console.ReadKey();        }

程序运行结果如下:

2 Confirm模式

C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示开启Confirm模式; channel.WaitForConfirms() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms作用类型,也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个OperationInterrupedException类型异常。看一个Confirm模式的简单实现:

staticvoidMain(string[] args){varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};//创建连接connectionusing(varconnection = factory.CreateConnection())            {//创建通道channelusing(varchannel = connection.CreateModel())                {                    Console.WriteLine("生产者准备就绪....");stringmessage ="";//在控制台输入消息,按enter键发送消息while(!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))                    {                        message = Console.ReadLine();varbody = Encoding.UTF8.GetBytes(message);//开启Confirm模式channel.ConfirmSelect();//发送消息channel.BasicPublish(exchange:"myexchange",                                            routingKey:"mykey",                                            basicProperties:null,                                            body: body);//WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功,如果返回false表示发送失败,会自动重新发送if(channel.WaitForConfirms())                        {                            Console.WriteLine($"【{message}】发送到Broke成功!");                        }                    }                }            }            Console.ReadKey();        }

程序运行结果:

2 消费者端消息确认(自动确认和显示确认)

从Broke发送到消费者时,RabbitMQ提供了两种消息确认的方式:自动确认和显示确认。

1 自动确认

自动确认:当RabbbitMQ将消息发送给消费者后, 消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执 。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可,我们前边的例子都是使用的自动确认,这里不再详细演示,如下:

channel.BasicConsume(queue:"myqueue",autoAck: true,consumer: consumer);

注意:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了。

2 显示确认

我们知道自动确认可能会出现消息丢失的问题,我们不免会想到:Broker收到回执后才删除消息,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执,这样就保证消费端不会丢失消息了!这正是显式确认的思路。使用显示确认也比较简单,首先将Resume方法的参数 autoAck设置为false, 然后在消费端使用代码 channel.BasicAck()/BasicReject()等方法 来确认和拒绝消息。看一个栗子:

生产者代码如下:

staticvoidMain(string[] args){varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};//创建连接connectionusing(varconnection = factory.CreateConnection())            {//创建通道channelusing(varchannel = connection.CreateModel())                {                    Console.WriteLine("生产者准备就绪....");stringmessage ="";//发送消息//在控制台输入消息,按enter键发送消息while(!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))                    {                        message = Console.ReadLine();varbody = Encoding.UTF8.GetBytes(message);//基本发布channel.BasicPublish(exchange:"myexchange",                                            routingKey:"mykey",                                            basicProperties:null,                                            body: body);                        Console.WriteLine($"消息【{message}】已发送到队列");                    }                }            }            Console.ReadKey();        }

消费者代码如下:

staticvoidMain(string[] args){varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};using(varconnection = factory.CreateConnection())            {using(varchannel = connection.CreateModel())                {//定义消费者                                      varconsumer =newEventingBasicConsumer(channel);                    consumer.Received += (model, ea) =>                    {stringmessage = Encoding.UTF8.GetString(ea.Body);                        Console.WriteLine($"接受到消息【{message}】");//以news开头表示是新闻类型,处理完成后确认消息if(message.StartsWith("news"))                        {//这里处理消息balabalaConsole.WriteLine($"【{message}】是新闻消息,处理消息并确认");                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple:false);                        }//不以news开头表示不是新闻类型,不进行处理,把消息退回到queue中else{                            Console.WriteLine($"【{message}】不是新闻类型,拒绝处理");                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue:false);                        }                    };                    Console.WriteLine("消费者准备就绪....");//第五步:处理消息channel.BasicConsume(queue:"myqueue",                                          autoAck:false,                                          consumer: consumer);                    Console.ReadKey();                }            }        }

介绍一下代码中标红的两个方法:  channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false );  方法用于确认消息,deliveryTag参数是分发的标记,multiple表示是否确认多条。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false ) ; 方法用于拒绝消息,deliveryTag也是指分发的标记,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。

运行这两个应用程序,通过生产者发送两条消息,效果如下:

一些意外的情况:使用显式确认时,如果消费者处理完消息不发送确认回执,那么消息不会被删除,消息的状态一直是Unacked,这条消息也不会再发送给其他消费者。如果一个消费者在处理消息时尚未发送确认回执的情况下挂掉了,那么消息会被重新放入队列(状态从Unacked变成Ready),有其他消费者存时,消息会发送给其他消费者。

2 消息持久化/优先级

1 消息持久化(Persistent)

在前边已经介绍了exchange和queue的持久化,把exchange和queue的durable属性设置为true,重启rabbitmq服务时( 重启命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也会恢复。我们需要注意的是:如果queue设置durable=true,rabbitmq服务重启后队列虽然会存在,但是队列内的消息会丢全部丢失。那么怎么实现消息的持久化呢?实现的方法很简单:将exchange和queue都设置durable=true,然后在消息发布的时候设置persistent=true即可。看一个栗子:

staticvoidMain(string[] args){varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};//创建连接connectionusing(varconnection = factory.CreateConnection())            {//创建通道channelusing(varchannel = connection.CreateModel())                {                    Console.WriteLine("生产者准备就绪....");stringmessage ="";//在控制台输入消息,按enter键发送消息while(!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))                    {                        message = Console.ReadLine();varbody = Encoding.UTF8.GetBytes(message);//设置消息持久化varprops = channel.CreateBasicProperties();                        props.Persistent =true;                        channel.BasicPublish(exchange:"myexchange",                                            routingKey:"mykey",                                            basicProperties: props,                                            body: body);//WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功,如果返回false表示发送失败,会自动重新发送Console.WriteLine($"【{message}】发送到Broke成功!");                    }                }            }            Console.ReadKey();        }

声明exchange和queue时设置durable=true,然后执行上边的代码,传入一条消息。重启rabbitmq后,exchange,queue和消息都会恢复。我们也可以在web管理界面设置消息持久化,如下:

2 消息优先级(Priority)

我们知道queue是先进先出的,即先发送的消息,先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级等级即可。为了演示方便,约定所有vip客户的信息都以vip开头,看一下代码实现:

生产者代码 :

staticvoidMain(string[] args){varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};//创建连接connectionusing(varconnection = factory.CreateConnection())            {//创建通道channelusing(varchannel = connection.CreateModel())                {//声明交换机exchangchannel.ExchangeDeclare(exchange:"myexchange",                                            type: ExchangeType.Direct,                                            durable:true,                                            autoDelete:false,                                            arguments:null);//声明队列queuechannel.QueueDeclare(queue:"myqueue",                                      durable:true,                                      exclusive:false,                                      autoDelete:false,                                      arguments:newDictionary() {//队列优先级最高为10,不加x-max-priority的话,计算发布时设置了消息的优先级也不会生效{"x-max-priority",10}                                      });//绑定exchange和queuechannel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey");                    Console.WriteLine("生产者准备就绪....");//一些待发送的消息string[] msgs = {"vip1","hello2","world3","common4","vip5"};//设置消息优先级varprops = channel.CreateBasicProperties();foreach(stringmsginmsgs)                    {//vip开头的消息,优先级设置为9if(msg.StartsWith("vip"))                        {                            props.Priority =9;                            channel.BasicPublish(exchange:"myexchange",                                                routingKey:"mykey",                                                basicProperties: props,                                                body: Encoding.UTF8.GetBytes(msg));                        }//其他消息的优先级为1else{                            props.Priority =1;                            channel.BasicPublish(exchange:"myexchange",                                                routingKey:"mykey",                                                basicProperties: props,                                                body: Encoding.UTF8.GetBytes(msg));                        }                                          }                }            }            Console.ReadKey();        }

消费者 ,不需要对消费者做额外的配置,代码如下:

staticvoidMain(string[] args){varfactory =newConnectionFactory()            {//rabbitmq-server所在设备ip,这里就是本机HostName ="127.0.0.1",                UserName ="wyy",//用户名Password ="123321"//密码};using(varconnection = factory.CreateConnection())            {using(varchannel = connection.CreateModel())                {#regionEventingBasicConsumer//定义消费者                                      varconsumer =newEventingBasicConsumer(channel);                    consumer.Received += (model, ea) =>                    {                        Console.WriteLine(Encoding.UTF8.GetString(ea.Body));                    };                    Console.WriteLine("消费者准备就绪....");//处理消息channel.BasicConsume(queue:"myqueue",                                          autoAck:true,                                          consumer: consumer);                    Console.ReadKey();#endregion}            }        }

运行程序,结果如下,我们看到vip开头的消息被率先处理了,证明优先级是生效的

3 小结

本节简单介绍了Rabbitmq中的消息确认,消息持久化,消息优先级的实现方式,这几个功能在开发中会经常用到,RabbitMQ还有一些其他有用的功能,如Lazy queue模式,dead letter处理,queue的消息条数、字节数限制等,这里没有具体演示,有兴趣的园友可以自己研究一下。

看我主页简介免费C++学习资源,视频教程、职业规划、面试详解、学习路线、开发工具

每晚8点直播讲解C++编程技术。非常感谢大家的关注

上一篇下一篇

猜你喜欢

热点阅读