Celery基本使用
celery的简介
celery
是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它的执行单元为任务(task)
,利用多线程,如Eventlet,gevent等,它们能被并发地执行在单个或多个职程服务器(worker servers)
上。任务能异步执行(后台运行)或同步执行(等待任务完成)。
在生产系统中,celery能够一天处理上百万的任务。它的完整架构图如下:
image.png
组件介绍
-
Producer
:调用了Celery
提供的API
、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。 -
Celery Beat
:任务调度器,Beat
进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。 -
Broker
:消息代理,又称消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。Celery
目前支持RabbitMQ
、Redis
、MongoDB
、Beanstalk
、SQLAlchemy
、Zookeeper
等作为消息代理,但适用于生产环境的只有RabbitMQ
和Redis
, 官方推荐RabbitMQ
。 -
Celery Worker
:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。 -
Result Backend
:任务处理完后保存状态信息和结果,以供查询。Celery
默认已支持Redis
、RabbitMQ
、MongoDB
、Django ORM
、SQLAlchemy
等方式。
image.png在客户端和消费者之间传输数据需要序列化和反序列化。 Celery 支出的序列化方案如下所示
准备工作
- 使用的
celery
的消息代理和后端存储数据库都使用redis
,序列化和反序列化选择msgpack
1. 安装Redis,安装msgpack
安装成功后我们可以看到redis初始化没有任何数据
image.png
2.用Python调用celery,需要安装Python模块
pip insatll celery redis msgpack
简单的使用
1.创建一个简单的工程,结构如下
image.png2.主程序app_test.py
from celery import Celery
// app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件
app = Celery('proj', include=['proj.tasks'])
// Celery配置存放进proj/celeryconfig.py文件,使用app.config_from_object加载配置
app.config_from_object('proj.celeryconfig')
if __name__ == '__main__':
app.start()
3.任务函数文件tasks.py
import time
from proj.app_test import app
# tasks.py只有一个任务函数add,让它生效的最直接的方法就是添加app.task这个装饰器
@app.task
def add(x, y):
time.sleep(1)
return x + y
4.配置文件celeryconfig.py
BROKER_URL = 'redis://localhost' # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了Redis
CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
5.调用文件diaoyong.py
from proj.tasks import add
import time
t1 = time.time()
// 调用了add函数五次,delay()用来调用任务
r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)
r_list = [r1, r2, r3, r4, r5]
for r in r_list:
// 利用ready()判断任务是否执行完毕
while not r.ready():
pass
//用result获取任务的结果
print(r.result)
t2 = time.time()
print('共耗时:%s' % str(t2-t1))
6.运行例子,进入项目根目执行
celery -A proj.app_test worker -l info
# -A参数指定创建的celery对象的位置,该proj.app_test指的是app包下面的celery_tasks.py模块的celery实例,注意一定是初始化后的实例,后面加worker表示该实例就是任务执行者;
# -Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不同的任务时可以独立;如果不设会接收所有的队列的任务;
# -l参数指定worker输出的日志级别;
image.png
7.运行调用文件
image.png后台输出:程序会先将任务分发出来,每个任务一个ID
,在后台统一处理,处理完后会有相应的结果返回,同时该结果也会储存之后台数据库。可以利用ready()
判断任务是否执行完毕,再用result
获取任务的结果。