RabbitMQ 学习笔记3 - 使用amqp库连接Rabbit

2020-04-02  本文已影响0人  张云飞Vir

0. 背景

使用Go 操作RabbitMQ 收发消息,可以 使用Go RabbitMQ客户端库 连接 RabbitMQ 来实现。

1. amqp 类库介绍

amqp 类库 是使用Go 操作 RabbitMQ 的一个 Go RabbitMQ客户端

在安装好 RabbitMQ 服务端后,就可以使用 Go 开发客户端程序来连接RabbitMQ,发送消息,或者取消息。

开始之前
要连接使用 RabbitMQ 首先要了解 AMQP 协议的基本概念,我的另一篇文章
做了介绍,本文末也有一些 AMQP的一些资源。

一些基本概念:

消息队列

本文编写两个示例:

2. 开始使用

2.1 发送端

新建一个文件 send.go,编写 go 代码。

步骤分解如下:
(1)建立连接
conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")

(2)打开channel
这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。

(3)声明一个队列
q, err := ch.QueueDeclare( ... )

(4)创建消息
msg := amqp.Publishing{ ... }

(5)发布消息
err = ch.Publish( ... )

代码如下:

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接 RabbitMQ
    conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
    failOnError(err, "连接失败")
    defer conn.Close()

    // 建立一个 channel ( 其实就是TCP连接 )
    ch, err := conn.Channel()
    failOnError(err, "打开通道失败")
    defer ch.Close()

    // 创建一个名字叫 "hello" 的队列
    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "创建队列失败")

    // 构建一个消息
    body := "Hello World!"
    msg := amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    }

    // 构建一个生产者,将消息 放入队列
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        msg)
    log.Printf(" [x] Sent %s", body)
    failOnError(err, "Failed to publish a message")
}

2.2 接收端

步骤分解如下:
(1)建立连接
conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")

(2)打开channel
这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。

(3)声明一个队列
q, err := ch.QueueDeclare( ... )

(4)构建一个消费者
msgChan, err := ch.Consume( ... )

(5)不断的读取消息

for d := range msgChan {
log.Printf("收到消息: %s", d.Body)
}

新建一个文件 receive.go,编写 go 代码。

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接 RabbitMQ
    conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
    failOnError(err, "连接失败")
    defer conn.Close()

    // 建立一个 channel ( 其实就是TCP连接 )
    ch, err := conn.Channel()
    failOnError(err, "打开通道失败")
    defer ch.Close()

    // 创建一个名字叫 "hello" 的队列
    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "创建队列失败")

    // 开启一个 消费者
    // 返回值是 ch 类型
    msgChan, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "注册消费者 ,失败")

    //帮助阻塞
    forever := make(chan bool)

    // 开启一个 go程
    go func() {
        for d := range msgChan {
            log.Printf("收到消息: %s", d.Body)
        }
    }()

    log.Printf(" 等待消息...")
    <-forever
}

2.3 执行

方法:

执行后的效果截图:


image.png

3. 可能遇到的问题

遇到 “Reason: "username or password not allowed"”
缺少权限,可能账户密码错误,也可能使用了 guest 账户未处理远程连接。考虑新建一个高权限的用户。
新建账户的方法请参考我的另一篇文章

遇到 “no access to this vhost”
为 admin 赋予权限,使之可以访问 vhost
下面的指令 为 admin 赋予权限,使得可以访问 vhost 名字为 / 的资源。

rabbitmqctl set_permissions -p / admin "." "." "."
说明:
/ 是个 vhost 资源名称
"." "." ".
" 标识权限的类型,和读写权限。

4. 参考

https://www.rabbitmq.com/tutorials/tutorial-one-go.html

https://godoc.org/github.com/streadway/amqp

AMQP的一些资源
http://www.rabbitmq.com/tutorials/amqp-concepts.html
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/amqp-0-9-1-reference.html

END

上一篇下一篇

猜你喜欢

热点阅读