python

RabbitMQ

2020-06-14  本文已影响0人  wit92
介绍

RabbitMQ是一个消息代理。

它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

一般提到RabbitMQ和消息,都用到一些专有名词。

生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用”P”来表示:

Producing

队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们能够只存储在一个队列(queue)中。队列(queue)没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能够从一个队列(queue)中获取数据。队列可以用下图标识:

Queue

注意:

一般生产者,消费者和代理不必部署在同一台机子上。注意:一般生产者,消费者和代理不必部署在同一台机子上。

1.Hello World 示例

简介
我们的“Hello world”不会很复杂——仅仅发送一个消息,然后获取它并输出到屏幕。这样以来我们需要两个程序,一个用作发送消息,另一个接受消息并打印消息内容。

大体的设计如下所示:

img

**(1)Sending 发送端****

img

queue 的定义具有幂等性(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相等),因此定义的queue已经存在,不会重复定义,且不能修改。

(2)Receiving 接收端

接收端会从 RabbitMQ server 拉取消息,因此接收端 要监听消息,然后打印输出

img

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

1.生产者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
import pika
# ######################### 生产者 #########################
credentials = pika.PlainCredentials('admin', 'admin')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
#创建频道
channel = connection.channel()
# 声明消息队列,消息将在这个队列中进行传递。如果将消息发送到不存在的队列,rabbitmq将会自动清除这些消息。如果队列不存在,则创建
channel.queue_declare(queue='hello')
#exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
#向队列插入数值 routing_key是队列名 body是要插入的内容

channel.basic_publish(exchange='',
                  routing_key='hello',
                  body='Hello World!')
print("开始队列")
#缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接
connection.close()

2.消费者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping

import pika

# ########################## 消费者 ##########################
credentials = pika.PlainCredentials('admin', 'admin')
# 连接到rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
channel = connection.channel()

# 声明消息队列,消息将在这个队列中进行传递。如果队列不存在,则创建
channel.queue_declare(queue='wzg')


# 定义一个回调函数来处理,这边的回调函数就是将信息打印出来。
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


# 告诉rabbitmq使用callback来接收信息
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
 # no_ack=True表示在回调函数中不需要发送确认标识

print(' [*] Waiting for messages. To exit press CTRL+C')

# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()
2.Work Queues

Work Queues(工作队列)

大体结构如下所示:

img

在第一节的教程里,我们创建了一个程序,发送和接收消息,从一个named queue(命名队列 )。
本节,我们会创建一个 Work Queue(工作队列),用来分发耗时任务给多个Workers(工人)。

使用Work Queues(别名:Task Queue)是为了避免立即做一个资源密集型任务,而不得不等待它完成。我们可以把这个耗时的任务封装提取起来作为message,发送给一个queue。一个Worker 后台进程会获取task,然后执行他。当有多个Workers 时,他们平分这些task。

默认,RabbitMQ会顺序的,平均的把任务发给每个consumer,到最后每个Consumer会得到相同数量的任务。

(1)Message acknowledgment消息不丢失的方法

执行一个耗时的任务,你可能会想知道任务的执行情况。是否有Consumer开始执行任务了?是否任务执行到一半死机了?
当前我们上面的代码,一旦RabbitMQ分发message给Custoerm,它就会立刻从内存删除。这种情况下,如果你关闭一个Worker,我们就会丢失他正在执行的消息。同样,我们也会丢失之前分发给他,还没有来的及执行的消息。

但是我们不想丢失任何 task。如果一个Worker死了,我们想把任务分发给其他的Worker。

为了确保message不丢失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 发送给RabbitMQ的,告诉它,task 已经接受,并处理了,RabbitMQ 可以删除它了。

如果一个consumer死机了(channel closed,connection closed or Tcp connection lost),没有返回ack,RabbitMQ就会知道task 没有处理完,该task就会重新排队。如果这时候有另外一个Consumer在线,RabbitMQ 就会把它分发给他。

默认Message acknowLedgments 是打开的,之前的例子,我们是显式的关闭了(设置 noAck=true)。

Forgotten acknowledgment 遗失acknowledgment
丢失BasicAck是很常见的错误,尽管这个错很小,但后果很严重。当Client quit,Messages 会重新分发,但是RabbitMQ 由于不能释放掉那些unacked message ,所以会消耗越来越多的内存。
为了 调试这种错误, 你可以使用rabbitmqctl来打印出 messages_unacknowledged 的message信息

生效方法:channel.basic_consume(consumer_callback, queue, no_ack=False, exclusive=False, consumer_tag=None, arguments=None)  

  即no_ack=False(默认为False,即必须有确认标识),在回调函数consumer_callback中,未收到确认标识,那么,RabbitMQ会重新将该任务添加到队列中。

生产者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
import pika
# ######################### 生产者 #########################
credentials = pika.PlainCredentials('admin', 'admin')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
#创建频道
channel = connection.channel()
# 声明消息队列,消息将在这个队列中进行传递。如果将消息发送到不存在的队列,rabbitmq将会自动清除这些消息。如果队列不存在,则创建
channel.queue_declare(queue='hello')
#exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
#向队列插入数值 routing_key是队列名 body是要插入的内容

channel.basic_publish(exchange='',
                  routing_key='hello',
                  body='Hello World!')
print("开始队列")
#缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接
connection.close()

消费者代码:

import pika
credentials = pika.PlainCredentials('admin', 'admin')
# 链接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
# 创建频道
channel = connection.channel()
# 如果生产者没有运行创建队列,那么消费者创建队列
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print
    'ok'
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 主要使用此代码


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
(2)Message durability 持久化存储

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储

channel.queue_declare(queue='wzg', durable=True) # 声明队列持久化

Ps: 但是这样程序会执行错误,因为‘wzg’这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。因此需要重新定义一个队列

channel.queue_declare(queue='test_queue', durable=True) # 声明队列持久化

注意:如果仅仅是设置了队列的持久化,仅队列本身可以在rabbit-server宕机后保留,队列中的信息依然会丢失,如果想让队列中的信息或者任务保留,还需要做以下设置

channel.basic_publish(exchange='',
                      routing_key="test_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # 使消息或任务也持久化存储
                      ))
   消息队列持久化包括3个部分:
  (1)exchange持久化,在声明时指定durable => 1
  (2)queue持久化,在声明时指定durable => 1
  (3)消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

   如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

Fair dispatch 公平调度

也许你已经主要到,上面代码实现的message的调度不是你想要的。例如,假设有两个Worker,所有的奇数的message都是耗时的操作,而偶数的message都是很简单的。你会发现一个Worker很空闲,而另一个Woker累死累活的。然而RabbitMQ不知道,还是不停的给他发任务。

这个情况的发生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message进入队列就分发message。他只是盲目的分发message。


img
3.Publish/Subscribe

我们把一个message分发给多个consumer。这种模式叫“publish/subscribe” 发布、订阅模式。

Exchanges(交换机)

RabbitMQ的消息模型的核心思想是, 生产者没有直接向队列发送任何消息

实际上,经常生产者甚至不知道一个消息将传递给任何队列。事实上,Producer只能发送message给exchange。exchange 很简单,一方面它从producers 接收messages, 另一方面,它把messages 推送给queues。

因此exchange要知道怎么处理接收到的message。是把message发给一个特定的队列?还是发给多个队列?或者丢弃?这个规则是由 exchange type 定义的。

[图片上传失败...(image-4302d5-1592213736868)]

exchange 有以下几种:direct, topic, headers 和 fanout。这一节,我们主要使用最后一种——fanout。

(1)模式1 Fanout

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

1.可以理解为路由表的模式
2.这种模式不需要routing_key(即使指定,也是无效的)
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个     Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

注意:

这个时候必须先启动消费者,即订阅者。因为随机队列是在consumer启动的时候随机生成的,并且进行绑定的。producer仅仅是发送至exchange,并不直接与随机队列进行通信。

生产者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
# rabbitmq 发布者
import pika

credentials = pika.PlainCredentials('admin', 'admin')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
channel = connection.channel()
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='logs_fanout',
                         type='fanout')

message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='logs_fanout',  # 指定exchange
                      routing_key='',  # fanout下不需要配置,配置了也不会生效
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消费者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping

#  rabbitmq 订阅者
import pika

credentials = pika.PlainCredentials('admin', 'admin')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
channel = connection.channel()

# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='logs_fanout',
                         type='fanout')

# 随机创建队列
result = channel.queue_declare(exclusive=True)  # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue
# 将队列与exchange进行绑定
channel.queue_bind(exchange='logs_fanout',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


# 从队列获取信息
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
(2)模式2direct

fanout类型exchange 不够灵活,它只能盲目的分发。
因此这里我们使用 direct类型的exchange来替代。direct 类型exchange背后的算法很简单——一个消息只会发送给queue的bingding key 完全匹配message的routing key的队列。

大体结构如下所示:

img

我们看到 direct类型的exchange X 有两个queue绑定到它。第一个 bingding key是orange。第二个有两个bingding Key:black和green。
因此,如果一个message的routing key是orange会发送给Q1队列,如果是blcak或green则会发送给Q2,其他的消息则会被丢弃掉。

1.一般情况可以使用rabbitMQ自带的Exchange:””  (该Exchange的名字为空字符串), 也可以自定义Exchange 
2.这种模式下不需要将Exchange进行任何绑定(bind)操作。当然也可以进行绑定。可以将不同的routing_key与不   同的queue进行绑定,不同的queue与不同exchange进行绑定
3.消息传递时需要一个“routing_key”
4.如果消息中中不存在routing_key中绑定的队列名,则该消息会被抛弃。
 如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key.

消费者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
# 消费者
import pika

credentials = pika.PlainCredentials('admin', 'admin')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
channel = connection.channel()
# 定义exchange和类型
channel.exchange_declare(exchange='direct_test',
                         type='direct')

# 生成随机队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = ['error', ]
# 将随机队列与routing_key关键字以及exchange进行绑定
for severity in severities:
    channel.queue_bind(exchange='direct_test',
                       queue=queue_name,
                       routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


# 接收消息
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

生产者代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
# 发布者
import pika

credentials = pika.PlainCredentials('admin', 'admin')
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.103',5672,'/',credentials))
channel = connection.channel()
# 定义交换机名称及类型
channel.exchange_declare(exchange='direct_test',
                         type='direct')

severity = 'info'
message = '123'
# 发布消息至交换机direct_test,且发布的消息携带的关键字routing_key是info
channel.basic_publish(exchange='direct_test',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

Multiple bindings

img

多个队列绑定同样的key是合法的。在这种情况下,direct 类型的exchage的行为和fanout表现的一样。

(3)模式3 Topic

message 发送到一个topic类型的exchange不包含任意的routing_key——而是一系列"."分隔的word。这些word可以任意,但是通常是一些连接message的一些特性。例如"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。但是最多255个字节。

[图片上传失败...(image-dc8aa0-1592213736868)]

路由键模糊匹配,其实是路由键(routing_key)的扩展,就是可以使用正则表达式,和常用的正则表示式不同,这里的话“#”表示所有、全部的意思;“*”只匹配到一个词。

任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routing_key),   Exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。
2.这种模式需要routing_key,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个            routing_key  为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法 与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。

  具体代码这里不在多余写,参照第二种模式的就可以,唯一变动的地方就是exchange type的声明,以及进行绑定和发送的时候routing_key使用正则模式即可。

RabbitMQ 实战(一)Ubuntu 16.04 安装 RabbitMQ

https://blog.csdn.net/nextyu/article/details/79250174

上一篇 下一篇

猜你喜欢

热点阅读