Python RabbitMQ原理和使用场景以及模式

2019-08-19  本文已影响0人  河码匠

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件。

一、RabbitMQ 原理简介

1. RabbitMQ 角色

2. 连接 RabbitMQ 方式

客户端通过 TCP 连接到 RabbitMQ Server。
连接成功后 RabbitMQ 会创建一个 AMQP 信道。
信道是创建在 TCP 上的虚拟连接,AMQP 命令都是通过信道发送出去的,每个信道都会有一个唯一的 ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

3. RabbitMQ 中的关键词

4. RabbitMQ 消息持久化

RabbitMQ 会将持久化的消息写入磁盘上的持久化日志文件,等消息被消费之后,RabbitMQ 会把这条消息标识为等待垃圾回收。

RabbitMQ 默认情况下是关闭消息持久化的。需要在创建队列的时候设置。

设置如下:

# durable=True 队列持久化
channel.queue_declare(queue='test', durable=True)

只做队列持久化是不行的还需要在加上消息持久化

channel.basic_publish(
    exchange="",
    routing_key="test", 
    body="hello world",
    properties=pika.BasicProperties(delivery_mode=2,) 
)

4. RabbitMQ 虚拟主机

虚拟主机是 RabbitMQ 创建的虚拟消息服务器。为了在一个 RabbitMQ 上实现多用户隔离。为此提供了一个虚拟主机(virtual hosts - vhosts)的概念。

命令 说明 参数
rabbitmqctl list_vhosts 列举所有虚拟主机
rabbitmqctl add_vhost <vhost_name> 添加虚拟主机 vhost_name 创开虚拟主机的名称
rabbitmqctl delete_vhost <vhost_name> 删除虚拟主机 vhost_name 删除虚拟主机的名称
rabbitmqctl add_user <username> <password> 添加用户 username 用户名
password 密码
rabbitmqctl set_user_tags <username> <tag> 设置用户标签 username 用户名
tag 标签 如:administrator, monitoring, management
rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> 设置权限(如:队列交换机的创建和删除、发布消息、读取消息等) user 用户名
conf 正则表达式匹配置资源能够被用户访问。
write 正则表达式匹配置资源能够被用户读。
read 正则表达式匹配置资源能够该用户访问
示例:rabbitmqctl set_permissions ceshi ".*" ".*" ".*" 给 ceshi 用户最高权限

二、RabbitMQ 模式

前三个说的是 RabbitMQ 的 Exchange 的类型。RPC 是消费者和生产者互通的一种方式。

1. Direct 直连交换机

当一个绑定了 routing_key = 1 的消息被发送给直连交换机时,交换机会把消息发送给绑定了routing_key = 1的队列。

直连交换机经常用来循环分发任务给多个消费者。然后消息的负载均衡是发生在消费者之间的,而不是队列之间。
如下例:

消息生产者

import pika

config = pika.ConnectionParameters(
    host='127.0.0.1',
    credentials=pika.PlainCredentials('test', 'test'),
)

# 创建 MQ 连接
conn = pika.BlockingConnection(config)
channel = conn.channel()

# 在频道中创建一个队列
channel.exchange_declare(exchange='ceshi', type='direct')

# 发送消息到指定队列
# exchange 指定交换器
# routing_key 设置路由键
# body 发送的内容
channel.basic_publish(
    exchange='ceshi',
    routing_key='1',
    body='Hello World!'
)

conn.close()

消息消费者

import pika

config = pika.ConnectionParameters(
    host='127.0.0.1',
    port=5672,
    credentials=pika.PlainCredentials('test', 'test'),
)

# 创建 MQ 连接
conn = pika.BlockingConnection(config)
channel = conn.channel()

# 如果使用exchange, 这里检测 exchange 是否存在,如不存在创建。存在检测是否正确且是否符合 exchange_type 类型
channel.exchange_declare(exchange='ceshi', exchange_type='direct')

# 在频道中创建一个队列
channel.queue_declare(queue='hello')

# 将队列绑定到指定的 exchange
# routing_key 类似密钥,只接收 routing_key 正确的信息
channel.queue_bind(exchange='ceshi', queue='hello', routing_key='1')

# 回调函数四个必须的参数 body 是传入的内容
# channel: BlockingChannel
# method: spec.Basic.Deliver
# properties: spec.BasicProperties
# body: str or unicode
def callback(channel, method, properties, body):
    print channel
    print method
    print properties
    print body

# 指定队列调用的函数
# no_ack 参数 True 时处理完成后没有返回信息。False 时在处理完后应答
channel.basic_consume(
    callback,
    queue='hello',
    no_ack=False
)
print 'waiting ...'
channel.start_consuming()

\color{red}{注意}:在修改消费者的routing_key后,需要重新创建队列。

2. Fanout 扇型交换机(订阅者模式)

他回将消息发送给绑定到它身上的所有队列,而不理会绑定的路由键。

如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。

扇型交换机实现消息的广播。

以下几个应用场景

  • 体育比赛用它来给手机客户端发送比分数据
  • 广播各种状态和产品的更新
  • 在群聊时分发消息给群聊中的用户

消息生产者

#!coding=utf-8
import pika

config = pika.ConnectionParameters(
    host='127.0.0.1',
    credentials=pika.PlainCredentials('test', 'test'),
)

conn = pika.BlockingConnection(config)
channel = conn.channel()

# 修改 type 为 fanout
channel.exchange_declare(exchange='ceshi2', type='fanout')

channel.basic_publish(
    exchange='ceshi2',
    routing_key='',
    body='Hello World!'
)

conn.close()

在生产者中只需要修改 exchange_declare 他的 typefanout 即可

消息消费者

#! coding=utf-8
import pika

config = pika.ConnectionParameters(
    host='127.0.0.1',
    port=5672,
    credentials=pika.PlainCredentials('test', 'test'),
)

conn = pika.BlockingConnection(config)
channel = conn.channel()
channel.exchange_declare(exchange='ceshi2', exchange_type='fanout')

# 订阅者模式
# 生成随机 queue_name
# 订阅者之间的 queue name 不能重复
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='ceshi2', queue=queue_name, routing_key='')

def callback(channel, method, properties, body):
    print channel
    print method
    print properties
    print body

channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=False
)
print 'waiting ...'
channel.start_consuming()

3. Topic 主题交换机(模糊匹配)

通过发送者和接收者之间的 routing_key 相互匹配,将消息路由给一个或多个队列。

主题交换机通常用来实现消息的多路由广播

以下应用场景

  • 发布不同分类或者不同标签的新闻(例如,发送游戏类新闻和体育类新闻)
  • 系统中不同种类服务的调用(例如:发布系统和付费系统的调用)

例如:
接收者(消费者) routing_key='a.*'
发送者(生产者) routing_key = 'a.b.c.d'
结果:匹配失败

因为:
* 表示匹配一个单词
# 表示匹配0个或多个单词

消息生产者

#!coding=utf-8
import pika

config = pika.ConnectionParameters(
    host='127.0.0.1',
    credentials=pika.PlainCredentials('test', 'test'),
)

conn = pika.BlockingConnection(config)
channel = conn.channel()

# 修改 type 为 topic
channel.exchange_declare(exchange='ceshi3', type='topic')

channel.basic_publish(
    exchange='ceshi3',
    routing_key='a.b.c.d',
    body='Hello World!'
)

conn.close()

消息消费者

#! coding=utf-8
import pika

config = pika.ConnectionParameters(
    host='127.0.0.1',
    port=5672,
    credentials=pika.PlainCredentials('test', 'test'),
)

conn = pika.BlockingConnection(config)

channel = conn.channel()

channel.exchange_declare(exchange='ceshi3', exchange_type='topic')
channel.queue_declare(queue='hello')

# routing_key='a.*' 此时是无法接受到信息的,因为生产者发送的是 a.* a. 后面一个单词。修改 routing_key='a.#' 即可接受成功
channel.queue_bind(exchange='ceshi3', queue='hello', routing_key='a.*')


def callback(channel, method, properties, body):
    print channel
    print method
    print properties
    print body

channel.basic_consume(
    callback,
    queue='hello',
    no_ack=False
)
print 'waiting ...'
channel.start_consuming()

4. RPC

生产者发送消息给消费者,并接收消费者处理完的结果
原理:

  1. 生产者会创建一个新的队列,用来接收消费者返回的信息。
  2. 生产者在发送消息的同时,还会发送 1步骤中创建的队列名和一个 correlation_id 用来验证
  3. 当消费者处理完数据后,会把结果和correlation_id发送到 1步骤创建的队列中去
  4. 生产者会使用一个循环 while 来监测返回结果 self.response
  5. 生产者获取到数据后比对 correlation_id 是否一致,然后结束此次发送流程

消息生产者

#! coding=utf-8
import pika
import uuid


class RpcClient(object):
    def __init__(self):
        config = pika.ConnectionParameters(
            host='127.0.0.1',
            credentials=pika.PlainCredentials('test', 'test'),
        )

        self.conn = pika.BlockingConnection(config)
        self.channel = self.conn.channel()

        # 在频道中创建一个队列
        result = self.channel.queue_declare(exclusive=True)

        # 生成队列名
        self.queue_name = result.method.queue

        # 指定队列要调用的函数
        self.channel.basic_consume(
            self.on_request,
            no_ack=True,
            queue=self.queue_name
        )

    def on_request(self, channel, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, message):
        self.response = None
        self.corr_id = str(uuid.uuid4)

        # 发送消息到指定队列
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc',
            # 消费者返回处理结果需要的队列信息(reply_to)和相关 id(correlation_id).
            # 以便消费者知道返回给那个生产者
            properties=pika.BasicProperties(
                reply_to=self.queue_name,
                correlation_id=self.corr_id
            ),
            body=message
        )

        while self.response is None:
            # 是一个等待消息的阻塞过程,连接的任何消息都可以使它脱离阻塞状态
            self.conn.process_data_events()
        return self.response

fibonacci_rpc = RpcClient()

print "等待处理结果"

response = fibonacci_rpc.call('Hello World!')
print "处理完返回信息: %s" % response

消息消费者

#! coding=utf-8
import pika
import time


config = pika.ConnectionParameters(
    host='127.0.0.1',
    credentials=pika.PlainCredentials('test', 'test'),
)

conn = pika.BlockingConnection(config)
channel = conn.channel()

channel.exchange_declare(exchange='rpc')
channel.queue_declare(queue='rpc')


def on_request(channel, method, props, body):
    print '处理中, 收到内容:%s' % body

    # 发送消息到指定队列
    # exchange 指定 exchange
    # routing_key 设置为队列的名称
    # body 发送的内容
    channel.basic_publish(
        exchange='',
        # props.reply_to 是生产者发送过来 接收处理结果的 指定队列
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body='处理完成'
    )
    # 告知 rabbit 消息已经处理完
    channel.basic_ack(delivery_tag=method.delivery_tag)

# 使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,只有工作者完成任务之后,才会再次接收到任务
channel.basic_qos(prefetch_count=1)

# 接收的结果调用 on_request 函数处理
channel.basic_consume(on_request, queue='rpc')
print("waiting....")

# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

conn.close()

三、RabbitMQ 使用场景

1. 任务相互依赖

例:

三个任务,任务1、任务2、任务3。他们之间有相互的制约。执行任务1的前提是要有任务2的结果。执行任务2需要任务3的结果。

一般会使用crontab来做计划任务。预估一下每个任务的完成时间。然后制定任务。当数量变大处理时间变成,就需要经常修改crontab任务。

使用 RabbitMQ 后每个任务结束时只需要发送一个结束信息即可

如:任务2订阅任务3的信息。当任务3完成后发送一个完成消息。任务2接收到完成消息后开始执行,在执行结束后发送任务2完成消息。任务1订阅任务2的消息,然后执行。

优点:

2. 下发任务后不关心结果

例:

有三个用户A、B、C 他们分别发送文章,但后台会根据用户的级别做不同的操作。
A 是普通用户,系统发布。
B 是 VIP 用户,系统发布和推荐给关注这部分内容的客户。
C 是黑卡客户,系统发布、推荐给关注这部分内容的客户、在这个分类中置顶这篇文章。
文章发布服务只关心是否成功,剩下的操作都不关心。可以使用 RabbitMQ 服务将其他操作分离。

优点:

3. 任务执行时间很长还需要等待结果

例:

有一个操作 A。用户执行这个任务后,又非常需要结果。

解决方法:
  1. 用户调用操作 A。
  2. 操作 A 直接返回调用成功。此时只是调用成功。
  3. 操作 A 告诉后台应该执行什么程序。
  4. 后台执行完成后,将完成消息发送给 RabbitMQ。
  5. 用户订阅 RabbitMQ 中的操作 A 的消息。

为什么不是后台直接将结果发送给用户呢。因为一旦增加了订阅用户,就需要修改后台程序,这样很恶心。

四、开启 WEB 服务

RabbitMQ 自带管理后台,安装后需要配置开启

进入 RabbitMQ 安装目录中的 sbin 目录执行

rabbitmq-plugins enable rabbitmq_management

http://localhost:15672/

用户名密码均为guest

上一篇 下一篇

猜你喜欢

热点阅读