.Net Core&RabbitMQ基本使用
队列模式
https://www.rabbitmq.com/getstarted.html
a091c5bdb621dcfce225e4b109abcdba.png c18cb1252755cc9bc1ba398640a1df47.png对以上几种模式进行简要分类,可以分成如下三类(RPC暂不考虑)
-
简单队列模式,单发单收,一对一模式
-
Worker模式,单发多收(一个消息一个接收者,多个消息多个接收者),一对多模式
-
发布订阅模式,包括发布订阅、路由、通配符模式,这三种只是交换机类型不同
简单队列模式
队列作为中间件,在生产者和消费者中间承担消息传递的通道
7ce5643d4108dfcc32219f445ffd2f2d.png创建两个控制台项目RabbitMQDemo.Basic.Producer和RabbitMQDemo.Basic.Consumer并安装Nuget包以支持对RabbitMQ操作
install-package rabbitmq.client
生产者代码
-
通过IConnectionFactory,IConnection和IModel来创建连接和信道。IConnection实例对象负责与RabbitMQ 服务端的连接,信道是在这连接基础上创建虚拟连接,通过复用来减少性能开销且便于管理。
-
通过QueueDeclare方法创建消息队列,设置消息队列本身的一些属性信息。
-
发送消息时调用BasicPublish来发送消息,通过exchange和routingkey参数满足大部分匹配消息队列的场景。
var connFactory = new ConnectionFactory { HostName = "xxx.xxx.xxx.xxx", Port = 5672, UserName = "rabbitmqdemo", Password = "rabbitmqdemo@test", VirtualHost = "rabbitmqdemo" };
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var queueName = "helloworld";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
while (true)
{
Console.WriteLine("消息内容(exit退出):"); var message = Console.ReadLine();
if (message.Trim().ToLower() == "exit")
{
break;
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
Console.WriteLine("消息内容发送完毕:" + message);
}
}
}
消费者代码
-
消费端同样先创建连接和信道
-
同样在消费端也会进行队列声明,以确保当生产者并未创建或是手动没有创建情况下不会出现队列不存在的异常。
-
定义一个EventingBasicConsumer对象的消费者,然后定义接收事件,输出从消息队列中接收的数据,
-
最后调用BasicConsume方法来启动消费者监听
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var queueName = "helloworld";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
执行过程
运行代码,可以在管理页面中看到队列声明好了
ed399242827706f79cf3bf584fb8db49.png生产者发送消息,经RabbitMQ,消费者端接收消息
cc673d1c49ad4f6c3ca5d3d95c83f6ea.pngWorker模式
简单队列模式下,当多开消费者时,便演化到了Worker模式,这种情况下不再考虑基础的怎么用,而是要如何协调多个消费者的工作。
5f41e9f5c8eb54684312392d1d3625cf.png与简单队列模式类似再建立三个控制台项目RabbitMQDemo.Worker.Producer和RabbitMQDemo.Worker.ConsumerA和RabbitMQDemo.Worker.ConsumerB并安装Nuget,抄袭第一部分代码,更改个队列名字,然后直接跑起来,其实是一样的消费模式。
基本使用
当发送多条消息,两个消费者都能够展示消息,并且,其中的消息总是只会被一个消费者所拥有,默认分配方式是轮询。
1fd0b2097b953dff0bba2446a44744e8.png消费能力
现在,思考下如何能够各消费者的消费能力,来消费消息,这更侧重于消费端了。
将ConsumerA、B在消费时各增加Sleep 1秒和10秒,以区分消费能力的不同。当再次发送消息时,因消费端出现着处理消息耗时的不同,展示数据的时间也不同,但是消息的分配却没有变化。
cbb1ce4956bf9a0683fb6fed2b53c3f8.png需要进一步均衡的分配任务,按照消费能力高的分配多,消费能力低的分配少。
8e8bb76461c4a432595b02fa504b6dd0.png当消费能力不同时,可以将消费的任务均衡分配,这样来使得整体消费端的能力充分发挥。
负载能力
RabbitMQ提供了一个属性可以设置各消费端的能力,以此可以根据能力分配消息。
在消费端代码中更改下Qos(quality-of-service),即消费端最多接收未被ack的消息个数,举个例子:
-
如果输入1,当消费端接收到一个消息,但是没有应答(活还在干别再分配任务了),则消费端不会收到下一个消息,消息只会在队列中阻塞。
-
如果输入3,那么可以最多有3个消息不应答(可以同时干三个活),如果到达了3个,则当有分配给这个消费端的消息时只会在阻塞到队列中,消费端不会接收到消息。
对ConsumerA、B分别设置值prefetchCount为10和1。
ConsumerA: channel.BasicQos(0, prefetchCount:10, false);
ConsumerB: channel.BasicQos(0, prefetchCount:1, false);
当再次发送消息时,会因为因为A、B两端的消费能力不同而出现消息聚集侧重于一端
生产者发送一堆消息,两个消费者自身的消费能力不同,设置的能够消费的容量不同,这样分配得到的消息数量也不同。
生产者代码
此处并未做任何大的改变,只是将队列名更改为当前模式的名字以示区分。
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var queueName = "worker";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
while (true)
{
Console.WriteLine("消息内容(exit退出):");
var message = Console.ReadLine();
if (message.Trim().ToLower() == "exit")
{
break;
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
Console.WriteLine("消息内容发送完毕:" + message);
}
}
}
消费者代码
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var queueName = "worker";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Thread.Sleep(10000);
var message = ea.Body;
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
Exchange模式
发布订阅,路由和通配符这三种可以算为一种模式,区别仅仅是交互机类型不同。
-
发布订阅模式:使用fanout类型交换机
-
路由模式:使用direct类型交换机
-
通配符模式:使用topic类型交换机
生产者将消息及RoutingKey发送到指定交换机,消费者创建各自的消息队列并绑定到交换机,交换机根据路由规则匹配生产者发送的RoutingKey转发消息到相应队列中,其本身不存储消息。
Exchange类型
简要介绍这几种交换机类型,本身只是对路由规则的匹配方式不同。
-
fanout: 把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中。
-
direct: 把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
-
topic: 把消息路由到那些BindingKey和RoutingKey相匹配的队列中。
-
header: 不依赖RoutingKey的匹配规则,而是根据消息内容中的headers属性匹配(性能差,不实用,使用少)。
注意:BindingKey为交换机和队列绑定时指定的RoutingKey,发送消息时也会给定一个RoutingKey,两者会按照交换机类型的不同而匹配
发布订阅模式(fanout)
fanout模式下会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中。
92cc84886f66830d61699a39ab197938.png当生产者发送消息到指定交换机,该交换机会将消息路由到绑定的Queue1和Queue2,两个队列分别转发给其下绑定的消费者,从单个队列视角来看,便是Worker模式了。
生产者代码
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var exchangeName = "publishsubscribe_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
while (true)
{
Console.WriteLine("消息内容(exit退出):");
var message = Console.ReadLine();
if (message.Trim().ToLower() == "exit")
{
break;
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
Console.WriteLine("消息内容发送完毕:" + message);
}
}
}
如上生产者端在worker模式的基础上,改动了几处
-
消息队列声明变成了交换机声明,其类型为fanout。
-
发送消息时由指定相应的队列名改成了空,而指定了交换机名称。
-
routingKey留空,fanout无需关心routingKey。
消费者代码
此处设置两个queue,分别为publishsubscribe_exchange_worker_1和publishsubscribe_exchange_worker_2
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var exchangeName = "publishsubscribe_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
var queueName = exchangeName + "_worker_1";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000);
var message = ea.Body;
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
与Worker的消费者端相比,代码上也做了些调整,其余是保持一致的。
-
增加了交换机名称并声明了交换机且类型为fanout,这和生产者端保持一致了
-
将队列名和交换机名进行了绑定
-
routingKey留空,fanout无需关心routingKey。
执行过程
当启动生产者端和消费者端时,交换机和两个队列都声明完毕
2072c3227a823cd3c188d80b4d4ddaf5.png同时,点入交换机中,可以看到该交换机下绑定了两个队列
323dddf9f9e0c0525df826b160bea5ea.png这样一来,当有消息到达交换机,交换机可以根据消息名来路由到相应的队列。因此处设置的routekey是空的,两个队列绑定时用的routekey也是空的,因此两个队列都符合路由规则,则消息会同时存在于两个队列中。
5a7ce1071c5cffab95f7a483cab3f2a4.png路由模式(direct)
direct模式下会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
be99696c61af4a1395499542d1842742.png当生产者发送了一个消息且发送的RoutingKey为Warning时,交换机会根据该RoutingKey匹配并转发消息到Queue1和Queue2,两个队列都满足了路由规则,当RoutingKey为Info是,仅Queue2满足,则将消息转发给Queue2。
生产者代码
生产者端在worker模式的基础上,只需改动几处
-
交换机类型从fanout变更为direct
-
生产者发送消息时指定RoutingKey,原先是留空
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var exchangeName = "routing_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
while (true)
{
Console.WriteLine("消息RoutingKey(warning or info):");
var routingKey = Console.ReadLine();
Console.WriteLine("消息内容(exit退出):");
var message = Console.ReadLine();
if (message.Trim().ToLower() == "exit")
{
break;
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine("消息内容发送完毕:" + message);
}
}
}
消费者代码
接收者端在发布订阅模式的基础上增加了交换机和队列时绑定的key,用于交换机路由规则时选择队列。
dba94405bcb401e11100754605d09ea2.png如下为Queue2下的消费者,为Queue2设置了info和warning两个RoutingKey用于交换机和队列绑定。
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var exchangeName = "routing_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
var queueName = exchangeName + "_worker_1";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var routingKey1 = "warning";
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey1);
var routingKey2 = "info";
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey2);
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000);
var message = ea.Body;
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
执行过程
运行代码,交换机、队列及两者的绑定先完成,可以在管理页面中看到声明的信息。
f6a4ad89fe8f2b70a2cda7c7e52d7e47.png当生产者发送消息且RoutingKey为warning,两个队列都满足路由条件接收到消息,两个消费者都展示了消息。
当发送消息且RoutingKey为info,queue2队列满足路由条件接收了消息,一个消费者展示了消息。
e2323339318908e083d18b00b8146759.png通配符模式(topic)
topic模式会把消息路由到那些BindingKey和RoutingKey相匹配的队列中。
db004d1d1459794f8a8c57701d8cd74b.pngtopic类型与direct类型相似,但匹配规则上有所不同,direct需要完全匹配,topic可以设置通配符以达到局部匹配即可满足。
和direct不同的是,topic设定的RoutingKey(不论是BindingKey还是RoutingKey)都需要为带"."的字符串。比如a.b、c.d.e、fff.gggg.hhhh等,最多为 255 个字节。
在交换机和队列绑定时,给定的RoutingKey可以依照如下来设置。
-
:匹配0~N个单词
-
:匹配1个单词
举例说明下,比如两个RoutingKey分别为#.created和index.,当生产者发送消息时给定的RoutingKey为a.created、aa.created或是b.created等都满足#.created的规则,index.a、index.b或index.c等都满足index.*的规则。
生产者代码
在路由模式的基础上更改交换机类型为topic
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var exchangeName = "topics_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
while (true)
{
Console.WriteLine("消息RoutingKey:");
var routingKey = Console.ReadLine();
Console.WriteLine("消息内容(exit退出):");
var message = Console.ReadLine();
if (message.Trim().ToLower() == "exit")
{
break;
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine("消息内容发送完毕:" + message);
}
}
}
消费者代码
接收者端在路由模式的基础上更改了交换机和队列绑定的key,可以方便满足多种情况下的需要。
9d0bddeb71ebf439df84a5de84cb1822.png如下为Queue2下的消费者,为Queue2设置了index.*和#.created.#两个RoutingKey用于交换机和队列绑定。
var connFactory = new ConnectionFactory
{
HostName = "xxx.xxx.xxx.xxx",
Port = 5672,
UserName = "rabbitmqdemo",
Password = "rabbitmqdemo@test",
VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
var exchangeName = "topics_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
var queueName = exchangeName + "_worker_2";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var routingKey1 = "index.*";
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey1);
var routingKey2 = "#.created.#";
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey2);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Thread.Sleep(10000);
var message = ea.Body;
Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
}
}
执行过程
运行代码,交换机、队列及绑定关系,相应RoutingKey都在管理页面中展示
76cb58e987f01c4c50006443321f172a.png当生产者发送消息且RoutingKey为#.created,两个队列都满足路由条件接收到消息,两个消费者都展示了消息。
当生产者发送消息且RoutingKey为#.created.#,queue2队列满足了路由条件接收了消息,一个消费者展示了消息。
53cdfdbd115cf213c9cf9ca3813bd01b.png总结
对于在生产者和消费者间解耦,完成异步协作,消息队列可太方便了,暂不深入考虑三者间如何可靠传输,仅看消息队列提供的多种交换机模式,很大程度上满足实际使用中需要用到的很多功能。