超详细的RabbitMQ快速入门!!你不拿走吗?

2021-07-26  本文已影响0人  张清柏

什么是mq

MQ,全称是Message Queue,是基于数据结构中“先进先出”的一种数据结构,指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

mq的使用场景

在技术小虫的工作中,在以下场景中用到过

mq有哪些产品和对比

image.png

为什么是rabbitmq

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。
RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式。

安装启动(ubuntu 18) 参考文章

#先查看一下我的版本号
root@guofu:~# cat /etc/issue
Ubuntu 18.04.5 LTS \n \l

#从前面的mq对比中已经说了,rabbitmq是erlang实现的,所以需要安装erlang
   26  sudo apt-get install erlang-nox
# 添加公钥
   27  wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 更新软件包
   28  sudo apt-get update
# 安装rabbitmq ,安装完毕自动启动
   29  sudo apt-get install rabbitmq-server
# 查看rabbitmq的运行状态  service rabbitmq-server status  也可以查看
   30  systemctl status rabbitmq-server
   info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 13s ago

#服务的启动、停止、重启
   31  sudo service rabbitmq-server stop
   32  sudo service rabbitmq-server start
   33  sudo service rabbitmq-server 

# 安装可视化的web操作页面
   34  sudo rabbitmq-plugins enable rabbitmq_management
   35  sudo service rabbitmq-server restart
   36  curl  http://localhost:15672

至此,rabbitmq安装完毕,web页面也可以访问了。默认用户名和密码是guest/guest,但是,rabbitmq默认会创建guest用户,但是只能服务器本机登录,建议创建其他新用户,授权,用来做其他操作。所以我们接下来开始创建一个新的用户

# 查看所有用户
   38  sudo rabbitmqctl list_users
#增加用户admin 密码是passwd(根据需求自定义即可)
   39  sudo rabbitmqctl add_user admin  passwd
# 给普通用户分配管理员角色
   40  sudo rabbitmqctl set_user_tags admin administrator
#赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源,也是添加远程访问权限
   41  sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'

使用admin远程登录


image.png

配置文件解读

rabbitmq-env.conf rabbitmq的环境变量

root@guofu:~# cd /etc/rabbitmq/
root@guofu:/etc/rabbitmq# ls
enabled_plugins  rabbitmq-env.conf
root@guofu:/etc/rabbitmq# cat rabbitmq-env.conf
# Defaults to rabbit. This can be useful if you want to run more than one node
# per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine
# combination. See the clustering on a single machine guide for details:
# http://www.rabbitmq.com/clustering.html#single-machine
#NODENAME=rabbit  --节点名称,如果服务是集群的形式,每个节点的名称必须唯一

# By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
# available. Set this if you only want to bind to one network interface or#
# address family.
#NODE_IP_ADDRESS=127.0.0.1 --节点的ip地址

# Defaults to 5672.
#NODE_PORT=5672 --节点的端口号

# Default rabbitmq-server wait timeout.

mq服务器的架构

相关参考
相关参考

image.png image.png
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost
Creating vhost "test_vhost"
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
/
test_vhost

# 查看用户列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
admin   [administrator]
guest   [administrator]

# 分配访问权限  set_permissions [-p <vhost>] <user> <conf> <write> <read>
# 需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。
root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host  admin ".*" ".*" ".*"


从web页面可以看到,exchange可以选择的有四种,持久化方式有两种,一种是内存,一种是硬盘

image

生产者将消息给交换机,交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列,每个队列可以有一个消费者接收消息进行消费逻辑。需要我们自己创建交换器并进行绑定,创建多个队列进行绑定即可,若一个消费者绑定多个队列则进行轮询,因为mq有阅后即焚的特点,只能保证一个消费者阅读接受。常用于群发消息。

image

生产者将消息发送到交换机信息携带具体的路由key,交换机的类型是direct,将接收到的信息中的routingKey,比对与之绑定的队列routingkey。消费者监听一个队列,获取消息,执行消费逻辑。一个队列可以绑定一个routingKey也可以绑定多个。在消息进行路由时会携带一个routingKey寻找对应的队列。

image

生产者发送消息,消息中带有具体的路由key,交换机的类型是topic,队列绑定交换机不在使用具体的路由key而是一个范围值,例如: .yell.,hlll.iii,jjj.#。其中* 表示一个字符串(不能携带特殊字符)#表示任意

队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,

消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃

all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

接下来我们根据上面的exchang的不同类型做一个演示
# 创建vhost
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost
Creating vhost "guofu_vhost"
#查看vhost列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
guofu_vhost
/
test_vhost
# 创建用户和密码
root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu
Creating user "guofu"
#查看用户列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
vhost1  []
admin   [administrator]
guofu   []
guest   [administrator]
# 给用户设置角色,否则远程登录不了
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator
Setting tags for user "guofu" to [administrator]
#给用户 vhost的权限,3个* 代表 配置 读 写的权限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*"
Setting permissions for user "guofu" in vhost "guofu_vhost"
# 查看用户权限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu
Listing permissions for user "guofu"
guofu_vhost     .*      .*      .*

配置完毕后,我们在页面也可以看到,已经生效了


image.png
package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
    //交换机
    var exchange="guofu_exchange"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
        )

    //定义消息
    msgBody:="i am a msg3"
    //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange,     //exchange
        "", //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err !=nil{
        panic(err)
    }
}


package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
    //交换机
    var exchange="guofu_exchange"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
        )

    //定义消息
    msgBody:="i am a msg3"
    //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange,     //exchange
        "", //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err !=nil{
        panic(err)
    }
}

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    //交换机
    var exchange = "guofu_exchange_test"
    var queue = "guofu_queue_test"
    var key = ""
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //试探性声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )

    //试探性创建队列
    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(queue, key, exchange, false, nil)

    //  消费队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        queue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d:=range msg{
        fmt.Println(string(d.Body))
        d.Ack(false)

    }




}

image.png
package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    //交换机
    var exchange = "direct_guofu_exchange"
    var queue = "direct_guofu_queue"
    var key = "direct_key"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //试探性声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //试探性创建队列
    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(queue, key, exchange, false, nil)

    //  消费队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        queue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d:=range msg{
        fmt.Println(string(d.Body))
        d.Ack(false)

    }




}


image.png
package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
    var exchange = "direct_guofu_exchange"
    var key = "direct_key"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
        )

    //定义消息
    msgBody:="i am a direct"
    //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange,     //exchange
        key, //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })


    if err !=nil{
        panic(err)
    }
}

image.png

如代码所示,我创建了三个队列,绑定的key 分别是 #.animal.#,#.plant.#,yellow.#,

    var exchange = "topic_guofu_exchange"
    var queue = "topic727_yellow"
    var key = "yellow.#"

    var exchange = "topic_guofu_exchange"
    var queue = "topic727_animal"
    var key = "#.animal.#"


    var exchange = "topic_guofu_exchange"
    var queue = "topic727_plant"
    var key = "#.plant.#"

那么当我推送消息的时候,如果我topic绑定的路由键 是 yellow.animal.plant ,那么推送的时候 三个消息队列都会被匹配。我们来看一下


执行前
package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
    var exchange = "topic_guofu_exchange"
    var key = "yellow.animal.plant "
    var queue = "topic727"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )

    //试探性创建队列
    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(queue, key, exchange, false, nil)

    //定义消息
    msgBody := key
    //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
    err = ch.Publish(
        exchange, //exchange
        key,      //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err != nil {
        panic(err)
    }
}

上一篇 下一篇

猜你喜欢

热点阅读