RabbitMQ入门

2019-08-07  本文已影响0人  转身丶即天涯

回顾

上一章,我们学习了在Mac上安装RabbitMQ,Mac安装RabbitMQ
本章,学习一下如何使用RabbitMQ这个消息队列来发送消息。

前言

python 3.6
RabbitMQ 3.7.8
pika 1.1.0
在开始之前,我们需要了解一个设计模式——”生产者消费者“模式。

生产者消费者模式

让我们看一下维基百科中的定义。

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

数据流向

我画了草图,如下:


image.png

生产者将产生的数据发送给RabbitMQ,然后消费者再从RabbitMQ中把数据读取出来,做进一步处理。

当然,实际环境中并不会只有一个生产者和一个消费者,而是会有多个生产者和多个消费者。比如下图:


image.png

这里每个生产者或者消费者都是一个进程。

使用场景

一开始我也费解,原本两个函数就能解决的事情,为什么要引入一个消息队列呢?这岂不是增加了程序的复杂性么?
那是因为我很少接触多进程编程,引入消息队列时为了解决进程间同步问题,也可以说是进程间通信问题。

我们知道一个程序是以进程作为基本单位的,也知道各个进程拥有各自的内存资源,进程之间如果想进行通信,需要借助其他媒介(比如文件,数据库,消息队列)。

创建第一个消息队列

首先,需要保证RabbitMQ已经被启动,可以在命令行中输入rabbitmq-server启动RabbitMQ。

$ rabbitmq-server

如果看到下图,表示启动成功。


image.png

然后需要安装一个python第三方包,方便我们操纵RabbitMQ。

pip install pika

生产者和消费者

我们可以创建两个python脚本,分别用来表示生产者(send.py)和消费者(receive.py)。

生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World1.')

print("[x] sent 'Hello World.'")

connection.close()

首先,导入pika包,然后创建一个BlockingConnection连接,这个连接是连接到本地的RabbitMQ的。
然后创建一个频道(channel),并在这个频道中调用queue_declare()方法声明一个队列,并将队列名hello传入。
接下来,调用channel的basic_publish()方法将数据传递给队列hello。
生产者无法直接给队列hello传递消息,需要借助交换机,也就是这里的exchange参数,使用''(空字符串)表示默认交换机。
routing_key表示队列的名字,body表示数据的内容。
最后,调用connection的close()方法来关闭和RabbitMQ的连接。

到此为止,我们的数据已经发送给本地的RabbitMQ了。接下来我们需要去写一个消费者脚本来获取数据。

消费者
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("[x] received {}".format(body))


channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)

print("[x] waiting for messages. To exit press CTRL+C")
channel.start_consuming()

前面的步骤是一样,都是先建立连接,然后创建频道。
在第5行,我在频道中又一次声明了队列'hello',这是一种容错机制,当我们不知道是生产者脚本先运行还是消费者脚本先运行时,这样是一种推荐做法。
当然,我们现在明确的知道是生产者脚本先运行,而且已经声明了队列hello,所以不写这行也没事。

接下来,定义了一个回调函数callback,它的作用是在消费者从队列中成功读取到数据时调用的。
然后,调用channel.basic_consume()方法来获取数据,on_message_callback参数需要传入一个回调函数(就是刚刚我们写的callback函数),queue参数表示队列的名字,
auto_ack参数如果为True,”自动确认“模式会被开启。具体可以看这里

最后,调用channel.start_consuming()方法一直消费队列中的数据,直到你按下Ctrl + C来终止。

验收结果

为了方便查看,我使用pycharm运行send.py脚本,使用命令行的方式运行receive.py脚本。
先启动send.py。如果没报错,结果会打印如下内容。


image.png

在命令行中再启动receive.py脚本。


image.png

PS:这里能看到两条记录是因为我刚刚按错了,多执行了一次send.py脚本。

然后再执行一次send.py,再回到命令行窗口里你会发现,又多了一条数据。


image.png

到目前为止,我们已经成功的走通我们的一个消息队列。

上一篇 下一篇

猜你喜欢

热点阅读