Golang 入门资料+笔记

# RabbitMQ使用总结(二)

2020-06-29  本文已影响0人  littlejian
rabbitmq.jpeg

在上一节中我们主要介绍了为什么要使用消息队列、常见的消息队列对比分析以及初步认识了RabbitMQ的整体架构和基本概念。那么在这一节中,主要是从代码的角度出发介绍一下如何使用RabbitMQ以及讲一下在实际项目中踩的坑。

这里我用的是Go语言,连接驱动采用的是开源的库amqp

github地址:https://github.com/streadway/amqp

生产者

  1. 建立连接
// 建立连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
  1. 创建channel
// 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
  1. 创建队列

    // 创建队列
     q, err := ch.QueueDeclare(
         /*name*/ "queue", // 队列名称
         /*durable*/ false, // 是否持久化
         /*autoDelete*/ false, // 是否自动删除
         /*exclusive*/ false, // 排他
         /*noWait*/ false, // 是否等待服务器确认
         /*args*/ nil, // 其他配置
     )
     failOnError(err, "Failed to declare a queue")
    

    参数说明:

    • exclusive

      排他队列只对首次创建它的连接可见,排他队列是基于连接 (Connection) 可见的,并且该连接内的所有信道 (Channel) 都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。
      同一连接中不允许建立同名的排他队列的
      这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。
      非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

    • autoDelete

      为 true 则设置队列为自动删除。
      自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
      不能把这个参数错误地理解为: 当连接到此队列的所有客户端断开时,这个队列自动删除,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

    • noWait

      当 noWait 为 true 时,声明时无需等待服务器的确认。
      该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。

  2. 创建交换机

    // 创建交换机
    ch.ExchangeDeclare(
       /*name*/ "exchange", // 交换机名称
       /*kind*/ "fanout", // 交换机类型
       /*durable*/ true, // 是否持久化
       /*autoDelete*/ false, // 是否自动删除
       /*internal*/ false, // 是否是内置交换机
       /*noWait*/ false, // 是否等待确认
       /*args*/ nil) // 其他配置
    failOnError(err, "Failed to declare a exchange")
    

    参数说明:

    • internal:

      内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,
      只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,
      内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

  3. 绑定交换机和队列

    // 绑定交换机和队列
     err = ch.QueueBind(
         /*name*/ q.Name, // 队列名称
         /*key*/ "", // Routing Key 因为采用的是fanout模式,所以这里为空
         /*exchange*/ "exchange", // 交换机名称
         /*noWait*/ false, // 是否等待确认
         /*args*/ nil) // 其他配置
     failOnError(err, "Failed to bind")
    
  4. 发送消息

    // 发送消息
     body := "Hello World!"
     err = ch.Publish(
         /*exchange*/ "exchange", // 交换机名称
         /*key*/ "", // routing key
         /*mandatory*/ false, // 是否为无法路由的消息进行返回处理
         /*immediate*/ false, // 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
         amqp.Publishing{
             ContentType: "text/plain",
             Body:        []byte(body),
             DeliveryMode: 2,  // 2代表着消息持久化,1代表否
         })
     log.Printf(" [x] Sent %s", body)
     failOnError(err, "Failed to publish a message")
    

    参数说明:

    • mandatory

      消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,
      设置为 true 表示将消息返回到生产者,否则直接丢弃消息。

    • immediate

      参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

如果在客户端提前创建了交换机和队列并且也绑定在一起了,那么可以省略3、4、5步骤,直接发送消息即可。

发送了一条消息,如果这条消息还没有被消费掉,那么在客户端就能看到这条消息:

[图片上传失败...(image-12925a-1593443389768)]

消费者

消费者和生产者步骤类似,这里就贴一下demo:

package main

import (
   "log"

   "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
   if err != nil {
      log.Fatalf("%s: %s", msg, err)
   }
}

func main() {
   // 建立连接
   conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
   failOnError(err, "Failed to connect to RabbitMQ")
   defer conn.Close()

   // 创建channel
   ch, err := conn.Channel()
   failOnError(err, "Failed to open a channel")
   defer ch.Close()

   // 创建队列
   q, err := ch.QueueDeclare(
      "queue", // 队列名称
      true,    // 是否持久化
      false,   // 是否自动删除
      false,   // 排他
      false,   // 是否等待确认
      nil,     // 其他配置
   )
   failOnError(err, "Failed to declare a queue")

   msgs, err := ch.Consume(
      q.Name, // 队列名称
      "",     // consumer
      true,   // 是否自动ack确认
      false,  // 排他
      false,  // no-local 设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
      false,  // 是否等待确认
      nil,    // 其他配置
   )
   failOnError(err, "Failed to register a consumer")

   forever := make(chan bool)
   go func() {
      for d := range msgs {
         log.Printf("Received a message: %s", d.Body)
      }
   }()

   log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
   <-forever
}

go run一下就能接收生产者发送的消息


image-20200627000417465.png

持久化

RabbitMQ 持久化包含3个部分

这里需要注意以下几点:

我之前就是因为没有指定message 持久化,重启了RabbitMQ就发送数据不见了......

数据丢失了怎么办

我们使用消息队列总不能把数据弄丢了吧,这是基本原则。

这里数据丢失主要分三种情况:生产者弄丢了数据、RabbitMQ弄丢了数据以及消费者弄丢了数据。

总结

本节主要是分享了RabbitMQ在Go里的基本使用以及我们如何去处理消息丢失的一个问题。

当然了还有其他很多更深层次的问题,比如说我们如何去确保消息队列的高可用、如何去确保消息的顺序性、遇到消息积压等等等等。因为呢本人对于RabbitMQ也没有更深入的了解,后续如果发现比较值得学习的地方再来给大家分享。

上一篇下一篇

猜你喜欢

热点阅读