Golang RabbitMQ快速入门教程 --- 2022-0

2022-05-19  本文已影响0人  一位先生_

Golang RabbitMQ 最简单的队列模式,只有一个消息生产者,一个消息消费者。

image.png

说明:
P 代表生产者 , C 代表消费者,红色代表队列。

提示:如果不了解RabbitMQ,请先阅读rabbitmq基础概念章节。

1.安装依赖包

go get github.com/streadway/amqp

导入依赖包

import (
  "github.com/streadway/amqp"
)

2.发送消息

下面分步骤演示消息生产者如何完成消息推送

2.1. 连接RabbitMQ Server

// 连接RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

连接地址说明:

amqp://账号:密码@RabbitMQ地址:端口/

2.2. 创建Channel

大部分操作都是在Channel(信道 )完成的。

ch, err := conn.Channel()
defer ch.Close()

2.3. 声明队列

代表我们需要读写哪个队列

q, err := ch.QueueDeclare(
  "hello", // 队列名字
  false,   // 消息是否持久化
  false,   // 不使用的时候删除队列
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)

2.4. 推送消息

// 消息内容
body := "Hello World!"

// 推送消息
err = ch.Publish(
  "",     // exchange(交换机名字),这里忽略
  q.Name, // 路由参数,这里使用队列名字作为路由参数
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // 消息内容
  })

2.5.发送消息完整代码

package main

// 导入包
import (
    "log"
    "github.com/streadway/amqp"
)

// 错误处理
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接RabbitMQ
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 创建信道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 声明要操作的队列
    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

   // 要发送的消息内容
    body := "Hello World!"

    // 发送消息
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", body)
}

3.接收消息

接收消息的前面三个步骤跟发送消息一样,分别对应2.1、2.2、2.3章节。
完整的接收消息代码如下:

package main

// 导入包
import (
    "log"
    "github.com/streadway/amqp"
)

// 错误处理
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接RabbitMQ
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 创建信道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 声明要操作的队列
    q, err := ch.QueueDeclare(
        "hello", // 队列名需要跟发送消息的队列名保持一致
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // 创建消息消费者
    msgs, err := ch.Consume(
        q.Name, // 队列名
        "",     // 消费者名字,不填,则自动生成一个唯一ID
        true,   // 是否自动提交消息,即自动告诉rabbitmq消息已经处理成功。
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    // 循环拉取队列中的消息
    for d := range msgs {
        // 打印消息内容
        log.Printf("Received a message: %s", d.Body)
    }
}
上一篇下一篇

猜你喜欢

热点阅读