爬虫架构|Celery+RabbitMQ快速入门(二)
在上一篇文章爬虫架构|Celery+RabbitMQ快速入门(一)中简单介绍了Celery和RabbitMQ的使用以及它们之间的合作流程。本篇文章将继续讲解它们是如何配合工作的。
一、Celery介绍和基本使用
Celery是一个基于Python开发的分布式异步消息任务队列,它简单、灵活、可靠,是一个专注于实时处理的任务队列,同时也支持任务调度。通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用Celery。举几个适用场景:
1)可以在 Request-Response 循环之外执行的操作:发送邮件、推送消息。
2)耗时的操作:调用第三方 API、视频处理(前端通过 AJAX 展示进度和结果)。
3)周期性任务:取代 crontab。
Celery有以下几个优点:
简单:一旦熟悉了Celery的工作流程后,配置和使用是比较简单的。
高可用:当任务执行失败或执行过程中发生连接中断,Celery 会自动尝试重新执行任务。
快速:一个单进程的Celery每分钟可处理上百万个任务。
灵活: Celery的大部分组件都可以被扩展及自定制。
二、选择Broker
Celery的基本架构和工作流程如下图2-1所示:
图2-1 Celery工作流程常用的Broker有RabbitMQ、Redis、数据库等,我们这里使用的是RabbitMQ,如下图2-2所示:
图2-2 Celery+Broker工作流程三、Celery安装使用
Celery是一个Python的应用,而且已经上传到了PyPi,所以可以使用pip或easy_install安装:
pip install celery
安装完成后会在PATH(或virtualenv的bin目录)添加几个命令:celery、celerybeat、celeryd 和celeryd-multi。我们这里只使用 celery 命令。
四、创建Application和Task
Celery的默认broker是RabbitMQ,仅需配置一行就可以:
broker_url = 'amqp://guest:guest@localhost:5672//'
rabbitMQ 没装的话请装一下,安装看这里http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
创建一个Celery Application用来定义任务列表。
实例化一个Celery对象app,然后通过@app.task 装饰器注册一个 task。任务文件就叫tasks.py:
from celery import Celery
app = Celery(__name__, broker='amqp://guest:guest@localhost:5672//')
@app.task
def add(x, y):
return x + y
五、运行 worker,启动Celery Worker来开始监听并执行任务
在 tasks.py 文件所在目录运行
$ celery worker -A tasks.app -l INFO
这个命令会开启一个在前台运行的 worker,解释这个命令的意义:
worker: 运行 worker 模块。
-A: –app=APP, 指定使用的 Celery 实例。
-l: –loglevel=INFO, 指定日志级别,可选:DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
其它常用的选项:
-P: –pool=prefork, 并发模型,可选:prefork (默认,multiprocessing), eventlet, gevent, threads.
-c: –concurrency=10, 并发级别,prefork 模型下就是子进程数量,默认等于 CPU 核心数
完整的命令行选项可以这样查看:
$ celery worker --help
六、调用Task
再打开一个终端, 进行命令行模式,调用任务。
from tasks import add
add.delay(1,2)
add.apply_async(args=(1,2))
上面两种调用方式等价,delay() 方法是 apply_async() 方法的简写。这个调用会把 add 操作放入到队列里,然后立即返回一个 AsyncResult 对象。如果关心处理结果,需要给 app 配置 CELERY_RESULT_BACKEND,指定一个存储后端保存任务的返回值。
七、在项目中的简单使用流程
1)RabbitMQ所在服务器,启动crontab设置 crontable -user user -e设置定时执行celery application应用。
python tasks.py day
2)在task.py文件里面启动一个叫做app的Celery Application,编写一个app.task函数来produce 任务到rabbitmq。
app = Celery()
app.config_from_object(celeryconfig)
3)在每个worker里面通过命令启动worker消费任务
$ celery worker -A tasks.app -l INFO