RabbitMQ实现任务队列

2019-08-08  本文已影响0人  转身丶即天涯

回顾

上一章学习了如何使用pika第三方库连接RabbitMQ,并通过RabbitMQ传递消息,实现了简易的生产者消费者模式。
上一章传送门:RabbitMQ入门

文章是我学习记录用的,可以看rabbitmq中文文档
,他们解释的更专业。

任务队列

首先我们知道了什么是队列,那么什么是任务队列呢?
任务队列,也有叫工作队列的。一般情况下,我们会将耗时或者大量占用资源的操作封装成任务,然后把任务发送给任务队列(其实这里的任务队列就是RabbitMQ),然后再由工作进程(或者叫做worker)将任务取出并执行,执行结束后触发回调函数,来告知RabbitMQ任务执行完毕。

PS:上面说的概念有点乱,不过没关系,知道是什么就可以。

准备工作

现在假设有这样一个需求,我们用一个字符串来表示任务,后面每个‘.’来表示耗时1秒,比如'task1...',代表任务名为task1,耗时3秒。

准备两个脚本,send.py和receive.py,分别表示生产者和消费者。
send.py

import sys
import pika


task = sys.argv[1] or "Hello World."

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()
channel.queue_declare(queue='task_queue')
channel.basic_publish(exchange='', routing_key='task_queue', body=task)

print("sent to [task_queue]: {}".format(task))

这里的task,我是用命令行参数的形式传递进去的。稍后会在使用时说明。

receive.py

import time
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare('task_queue')

def callback(ch, method, properties, body):
    print("Recevied: {}".format(body))
    time.sleep(body.count('.'))
    print("{} done.".format(body))

channel.basic_consume(on_message_callback=callback, queue='task_queue')

print("waiting for task of task-queue. To exit press CTRL+C.")
channel.start_consuming()

模拟工作队列

写好了脚本之后,我们可以开始探索工作队列了。
现在,打开终端,并开启三个窗口。


image.png

PS:这里顺便说个Mac上的小技巧,按住cmd+空格会调出Sportlight搜索框,输入ter弹出的第一项就是终端,直接回车就能打开终端了。
然后按cmd+t快捷键来新开一个命令窗口,cmd+w来关闭当前命令窗口。

启动RabbitMQ

在第一个命令行窗口,你需要先启动RabbitMQ。执行rabbitmq-server命令即可。

在第二个命令行窗口,执行python send.py task1...


image.png

在第三个命令行窗口,执行python receive.py


image.png

哎,为什么会报错呢,看到最后一行:time.sleep(body.count('.'))报错了。

经过断点调试才发现,body是bytes类型,而count方法是str对象的。所以,恍然大悟,得做解码处理。

time.sleep(body.decode('utf-8').count('.'))

然后,就可以正常运行了,结果如下图:


image.png

这就说明,消费者已经成功处理了‘task1...’


一些常见问题的解决思路

上一篇下一篇

猜你喜欢

热点阅读