.Net Core&RabbitMQ基本使用

2022-09-01  本文已影响0人  INSO8

队列模式

https://www.rabbitmq.com/getstarted.html

a091c5bdb621dcfce225e4b109abcdba.png c18cb1252755cc9bc1ba398640a1df47.png

对以上几种模式进行简要分类,可以分成如下三类(RPC暂不考虑)

简单队列模式

队列作为中间件,在生产者和消费者中间承担消息传递的通道

7ce5643d4108dfcc32219f445ffd2f2d.png

创建两个控制台项目RabbitMQDemo.Basic.Producer和RabbitMQDemo.Basic.Consumer并安装Nuget包以支持对RabbitMQ操作

install-package rabbitmq.client

生产者代码

var connFactory = new ConnectionFactory { HostName = "xxx.xxx.xxx.xxx", Port = 5672, UserName = "rabbitmqdemo", Password = "rabbitmqdemo@test", VirtualHost = "rabbitmqdemo" };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var queueName = "helloworld"; 
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    while (true)
                    {
                        Console.WriteLine("消息内容(exit退出):"); var message = Console.ReadLine();
                        if (message.Trim().ToLower() == "exit")
                        {
                            break;
                        }
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                        Console.WriteLine("消息内容发送完毕:" + message);
                    }
                }
            }

消费者代码

 var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var queueName = "helloworld";
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = ea.Body;
                        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
                        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }

执行过程

运行代码,可以在管理页面中看到队列声明好了

ed399242827706f79cf3bf584fb8db49.png

生产者发送消息,经RabbitMQ,消费者端接收消息

cc673d1c49ad4f6c3ca5d3d95c83f6ea.png

Worker模式

简单队列模式下,当多开消费者时,便演化到了Worker模式,这种情况下不再考虑基础的怎么用,而是要如何协调多个消费者的工作。

5f41e9f5c8eb54684312392d1d3625cf.png

与简单队列模式类似再建立三个控制台项目RabbitMQDemo.Worker.Producer和RabbitMQDemo.Worker.ConsumerA和RabbitMQDemo.Worker.ConsumerB并安装Nuget,抄袭第一部分代码,更改个队列名字,然后直接跑起来,其实是一样的消费模式。

基本使用

当发送多条消息,两个消费者都能够展示消息,并且,其中的消息总是只会被一个消费者所拥有,默认分配方式是轮询。

1fd0b2097b953dff0bba2446a44744e8.png

消费能力

现在,思考下如何能够各消费者的消费能力,来消费消息,这更侧重于消费端了。

将ConsumerA、B在消费时各增加Sleep 1秒和10秒,以区分消费能力的不同。当再次发送消息时,因消费端出现着处理消息耗时的不同,展示数据的时间也不同,但是消息的分配却没有变化。

cbb1ce4956bf9a0683fb6fed2b53c3f8.png

需要进一步均衡的分配任务,按照消费能力高的分配多,消费能力低的分配少。

8e8bb76461c4a432595b02fa504b6dd0.png

当消费能力不同时,可以将消费的任务均衡分配,这样来使得整体消费端的能力充分发挥。

负载能力

RabbitMQ提供了一个属性可以设置各消费端的能力,以此可以根据能力分配消息。

在消费端代码中更改下Qos(quality-of-service),即消费端最多接收未被ack的消息个数,举个例子:

ConsumerA: channel.BasicQos(0, prefetchCount:10, false);
ConsumerB: channel.BasicQos(0, prefetchCount:1, false);

当再次发送消息时,会因为因为A、B两端的消费能力不同而出现消息聚集侧重于一端
生产者发送一堆消息,两个消费者自身的消费能力不同,设置的能够消费的容量不同,这样分配得到的消息数量也不同。

be47fe15593291239d4e84b05ac4b28b.png

生产者代码

此处并未做任何大的改变,只是将队列名更改为当前模式的名字以示区分。

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var queueName = "worker";
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

                    while (true)
                    {
                        Console.WriteLine("消息内容(exit退出):");
                        var message = Console.ReadLine();
                        if (message.Trim().ToLower() == "exit")
                        {
                            break;
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                        Console.WriteLine("消息内容发送完毕:" + message);
                    }
                }
            }

消费者代码

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var queueName = "worker";
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(10000);
                        var message = ea.Body;
                        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
                        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }

Exchange模式

发布订阅,路由和通配符这三种可以算为一种模式,区别仅仅是交互机类型不同。

91294ab33e9c9e65619ce59cd126379b.png

生产者将消息及RoutingKey发送到指定交换机,消费者创建各自的消息队列并绑定到交换机,交换机根据路由规则匹配生产者发送的RoutingKey转发消息到相应队列中,其本身不存储消息。

Exchange类型

简要介绍这几种交换机类型,本身只是对路由规则的匹配方式不同。

注意:BindingKey为交换机和队列绑定时指定的RoutingKey,发送消息时也会给定一个RoutingKey,两者会按照交换机类型的不同而匹配

发布订阅模式(fanout)

fanout模式下会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中。

92cc84886f66830d61699a39ab197938.png

当生产者发送消息到指定交换机,该交换机会将消息路由到绑定的Queue1和Queue2,两个队列分别转发给其下绑定的消费者,从单个队列视角来看,便是Worker模式了。

生产者代码

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var exchangeName = "publishsubscribe_exchange";
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    while (true)
                    {
                        Console.WriteLine("消息内容(exit退出):");
                        var message = Console.ReadLine();
                        if (message.Trim().ToLower() == "exit")
                        {
                            break;
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
                        Console.WriteLine("消息内容发送完毕:" + message);
                    }
                }
            }

如上生产者端在worker模式的基础上,改动了几处

消费者代码

此处设置两个queue,分别为publishsubscribe_exchange_worker_1和publishsubscribe_exchange_worker_2

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var exchangeName = "publishsubscribe_exchange";
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    var queueName = exchangeName + "_worker_1";
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");

                    channel.BasicQos(0, 10, false);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000);
                        var message = ea.Body;
                        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
                        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }

与Worker的消费者端相比,代码上也做了些调整,其余是保持一致的。

执行过程

当启动生产者端和消费者端时,交换机和两个队列都声明完毕

2072c3227a823cd3c188d80b4d4ddaf5.png

同时,点入交换机中,可以看到该交换机下绑定了两个队列

323dddf9f9e0c0525df826b160bea5ea.png

这样一来,当有消息到达交换机,交换机可以根据消息名来路由到相应的队列。因此处设置的routekey是空的,两个队列绑定时用的routekey也是空的,因此两个队列都符合路由规则,则消息会同时存在于两个队列中。

5a7ce1071c5cffab95f7a483cab3f2a4.png

路由模式(direct)

direct模式下会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。

be99696c61af4a1395499542d1842742.png

当生产者发送了一个消息且发送的RoutingKey为Warning时,交换机会根据该RoutingKey匹配并转发消息到Queue1和Queue2,两个队列都满足了路由规则,当RoutingKey为Info是,仅Queue2满足,则将消息转发给Queue2。

生产者代码

生产者端在worker模式的基础上,只需改动几处

 var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var exchangeName = "routing_exchange";
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
                    while (true)
                    {
                        Console.WriteLine("消息RoutingKey(warning or info):");
                        var routingKey = Console.ReadLine();

                        Console.WriteLine("消息内容(exit退出):");
                        var message = Console.ReadLine();
                        if (message.Trim().ToLower() == "exit")
                        {
                            break;
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
                        Console.WriteLine("消息内容发送完毕:" + message);
                    }
                }
            }

消费者代码

接收者端在发布订阅模式的基础上增加了交换机和队列时绑定的key,用于交换机路由规则时选择队列。

dba94405bcb401e11100754605d09ea2.png

如下为Queue2下的消费者,为Queue2设置了info和warning两个RoutingKey用于交换机和队列绑定。

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var exchangeName = "routing_exchange";
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
                    var queueName = exchangeName + "_worker_1";
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

                    var routingKey1 = "warning";
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey1);
                    var routingKey2 = "info";
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey2);

                    channel.BasicQos(0, 10, false);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000);
                        var message = ea.Body;
                        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
                        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }

执行过程

运行代码,交换机、队列及两者的绑定先完成,可以在管理页面中看到声明的信息。

f6a4ad89fe8f2b70a2cda7c7e52d7e47.png

当生产者发送消息且RoutingKey为warning,两个队列都满足路由条件接收到消息,两个消费者都展示了消息。

当发送消息且RoutingKey为info,queue2队列满足路由条件接收了消息,一个消费者展示了消息。

e2323339318908e083d18b00b8146759.png

通配符模式(topic)

topic模式会把消息路由到那些BindingKey和RoutingKey相匹配的队列中。

db004d1d1459794f8a8c57701d8cd74b.png

topic类型与direct类型相似,但匹配规则上有所不同,direct需要完全匹配,topic可以设置通配符以达到局部匹配即可满足。

和direct不同的是,topic设定的RoutingKey(不论是BindingKey还是RoutingKey)都需要为带"."的字符串。比如a.b、c.d.e、fff.gggg.hhhh等,最多为 255 个字节。

在交换机和队列绑定时,给定的RoutingKey可以依照如下来设置。

生产者代码

在路由模式的基础上更改交换机类型为topic

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var exchangeName = "topics_exchange";
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
                    while (true)
                    {
                        Console.WriteLine("消息RoutingKey:");
                        var routingKey = Console.ReadLine();

                        Console.WriteLine("消息内容(exit退出):");
                        var message = Console.ReadLine();
                        if (message.Trim().ToLower() == "exit")
                        {
                            break;
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
                        Console.WriteLine("消息内容发送完毕:" + message);
                    }
                }
            }

消费者代码

接收者端在路由模式的基础上更改了交换机和队列绑定的key,可以方便满足多种情况下的需要。

9d0bddeb71ebf439df84a5de84cb1822.png

如下为Queue2下的消费者,为Queue2设置了index.*和#.created.#两个RoutingKey用于交换机和队列绑定。

var connFactory = new ConnectionFactory
            {
                HostName = "xxx.xxx.xxx.xxx",
                Port = 5672,
                UserName = "rabbitmqdemo",
                Password = "rabbitmqdemo@test",
                VirtualHost = "rabbitmqdemo"
            };
            using (var conn = connFactory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    var exchangeName = "topics_exchange";
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
                    var queueName = exchangeName + "_worker_2";
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

                    var routingKey1 = "index.*";
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey1);
                    var routingKey2 = "#.created.#";
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey2);

                    channel.BasicQos(0, 1, false);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(10000);
                        var message = ea.Body;
                        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
                        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }

执行过程

运行代码,交换机、队列及绑定关系,相应RoutingKey都在管理页面中展示

76cb58e987f01c4c50006443321f172a.png

当生产者发送消息且RoutingKey为#.created,两个队列都满足路由条件接收到消息,两个消费者都展示了消息。

当生产者发送消息且RoutingKey为#.created.#,queue2队列满足了路由条件接收了消息,一个消费者展示了消息。

53cdfdbd115cf213c9cf9ca3813bd01b.png

总结

对于在生产者和消费者间解耦,完成异步协作,消息队列可太方便了,暂不深入考虑三者间如何可靠传输,仅看消息队列提供的多种交换机模式,很大程度上满足实际使用中需要用到的很多功能。

https://www.cnblogs.com/shenghuotaiai/p/16170319.html

上一篇下一篇

猜你喜欢

热点阅读