使用RabbitMQ

2019-03-28  本文已影响0人  米来MiLai

概念

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang框架上的。

应用场景

场景一:支付的通知
生产者:微信支付完成之后在其回调方法中调用一个服务接收消息,这个服务作为生产者。
消费者:消费者服务是一个不断从队列中获取支付结果的应用,然后在app或者页面展示。
场景二:注册的短信或者邮件通知
生产者:注册成功之后的回调中,发送注册成功信息到队列生产者。
消费者:应用程序不断的获取队列中的消息,获取到就发送短信后者邮件。


image.png

应用 .Net Core with RabbitMQ

  1. Erlang运行环境,并配置环境变量 ERLANG_HOME:D:\Program Files\erl10.3
  2. RabbitMQ-win64版,并配置环境变量 RABBITMQ_SERVER:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.13,增加path变量 %RABBITMQ_SERVER%\sbin
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
rabbitmqctl status
rabbitmqctl status
rabbitmqctl list_users
rabbitmqctl add_user  admin  admin
rabbitmqctl set_permissions  admin  ".*"  ".*"  ".*"
rabbitmqctl set_user_tags admin administrator
rabbitmqctl delete_user guest
rabbitmqctl change_password {username}  {newpassowrd}

Run

3.1.消息的发送和接收

生产者
static void Main (string[] args) {
      var factory = new ConnectionFactory ();
      factory.HostName = "localhost";
      factory.UserName = "admin";
      factory.Password = "admin";

      using (var connection = factory.CreateConnection ()) {
        using (var channel = connection.CreateModel ()) {
          channel.QueueDeclare ("hello", true, false, false, null);
          string message = "Hello,RabbitMQ!";
          //5. 构建byte消息数据包
          var body = Encoding.UTF8.GetBytes (message);
          //6. 发送数据包
          channel.BasicPublish ("", "hello", null, body);
        }
      }
    }
消费者
static void Main (string[] args) {
            //1. 实例化连接工厂
            var factory = new ConnectionFactory ();
            factory.HostName = "localhost";
            factory.UserName = "admin";
            factory.Password = "admin";
            //2. 建立连接
            using (var connection = factory.CreateConnection ()) {
                //3. 创建信道
                using (var channel = connection.CreateModel ()) {
                    //4. 申明队列
                    channel.QueueDeclare ("hello", true, false, false, null);
                    //5. 构造消费者实例
                    var consumer = new EventingBasicConsumer (channel);
                    //6. 绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString (body);
                        Console.WriteLine ("已接收: {0}", message);
                    };
                    //7. 启动消费者
                    channel.BasicConsume ("hello", false, consumer);
                    Console.ReadLine ();
                }
            }
        }

运行结果


image.png

3.2 循环调度

3.3 消息确认

//6. 绑定消息接收后的事件委托
consumer.Received += (model, ea) => {
          var body = ea.Body;
          var message = Encoding.UTF8.GetString (body);
          Console.WriteLine ("已接收: {0}", message);
          channel.BasicAck (ea.DeliveryTag, false);//消息确认
          };
channel.BasicConsume ("hello", false, consumer); //channel.basicConsume(QUEUE_NAME, autoAck, consumer);

消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到。在订阅消息的时候可以指定应答模式,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。


有10条消息在队列中 未应答10条消息 应答后,消息被消化
channel.basicQos(1);此时如果没有应答的话,消费者将不再继续获取

注意:如果都没有手动应答,在没有指定获取消息的条数时,消费者可以获取所有消息,当指定时,只能获取指定条,下次就只能等待了,没法继续获取下一条了

// requeue:重新入队列,false:直接丢弃,相当于告诉队列可以直接删除掉
channel.basicReject(envelope.getDeliveryTag(), false);
当前消息会被消耗掉
被拒绝后消息删除

3.4 消息持久化

消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。

3.5 公平分发

关闭客户端后 27号消息丢失
最后补上

4 Exchange

4.1 direct

只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

* 将交换器与队列通过路由键绑定*/
 channel.queueBind(QUEUR_NAME, EXCHANGE_NAME, ROUTING_KEY);
image.png

4.2 topic

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'','#'.
其中'
'表示匹配一个单词, '#'则表示匹配没有或者多个单词

image.png

4.3 fanout

此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。
channel.ExchangeDeclare("weilai","fanout");
image.png

4.4 header

此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数
channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty,  arguments: aHeader);
上一篇下一篇

猜你喜欢

热点阅读