Celery-分布式任务队列学习笔记
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
以上是celery自己官网的介绍
celery的应用场景很广泛
- 处理异步任务
- 任务调度
- 处理定时任务
- 分布式调度
好处也很多,尤其在使用python构建的应用系统中,无缝衔接,使用相当方便。
Celery
安装
安装Celery
推荐使用pip安装,如果你使用的是虚拟环境,请在虚拟环境里安装
$ pip install celery
安装消息中间件
Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件
你希望用什么中间件和后端就请自行安装,一般都使用redis或者RabbitMQ
安装Redis
在Ubuntu系统下使用apt-get命令就可以
$ sudo apt-get install redis-server
如果你使用redis作为中间件,还需要安装redis支持包,同样使用pip安装即可
$ pip install redis
能出现以下结果即为成功
redis 127.0.0.1:6379>
其他的redis知识这里不左介绍,如果有兴趣,可以自行了解
如果你使用RabbitMQ,也请安装RabbitMQ
安装RabbitMQ
$ sudo apt-get install rabbitmq-server
使用Celery
简单直接使用
可以在需要的地方直接引入Celery,直接使用即可。最简单的方式只需要配置一个任务和中间人即可
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/3')
@app.task
def add(x, y):
return x + y
我这里使用了redis作为中间件,这是可以按自己的习惯替换的
由于默认的配置不是最切合我们的项目实际需要,一般来说我们都需要按我们自己的要求配置一些,
但是由于需要将项目解耦,也好维护,我们最好使用单独的一个文件编写配置。
单独配置配置文件
比上面的稍微复杂一点,我们需要创建两个文件,一个为config.py的celery配置文件,在其中填写适合我们项目的配置,在创建一个tasks.py文件来编写我们的任务。文件的名字可以按你的喜好自己命名。
config.py内容为:
# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
'tasks.add': 'low-priority',
}
# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
'tasks.add': {'rate_limit': '10/m'}
}
配置好以后可以用以下命令检查配置文件是否正确(config为配置文件名)
$ python -m config
tasks.py内容为:
# coding=utf-8
from celery import Celery
app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')
@app.task
def add(x, y):
return x + y
还有一种同一设置配置的方式,不是很推荐
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
在app使用前先需要用以上方法批量更新配置文件。
在应用上使用
工程目录结构为
proj/
__init__.py
# 存放配置和启动celery代码
celery.py
# 存放任务
tasks.py
celery.py为:
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker='redis://localhost:6379/3',
backend='redis://localhost:6379/4',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
tasks.py为:
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
启动celery只需要在proj同级目录下:
$ celery -A proj worker -l info
在django中使用celery
我们的django的项目的目录结构一般如下
proj/
manage.py
myapp/
proj/
__init__py
settings.py
urls.py
wsgi.py
想要在django项目中使用celery,我们首先需要在django中配置celery
我们需要在与工程名同名的子文件夹中添加celery.py文件
在本例中也就是proj/proj/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
# 括号里的参数为工程名
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
然后我们需要在同级目录下的init.py文件中配置如下内容
proj/proj/init.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
然后我们就可以把需要的任务放到需要的app下的tasks.py中,现在项目目录结构如下
proj/
manage.py
myapp1/
__init__.py
tasks.py
views.py
model.py
tests.py
myapp2/
__init__.py
tasks.py
views.py
model.py
tests.py
proj/
__init__py
settings.py
urls.py
wsgi.py
可能的一个tasks.py文件内容如下:
myapp1/tasks.py为:
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
@shared_task
def add(x, y):
# 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
time.sleep(5)
print(x+y)
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task修饰器可以让你创建task不需要app实体
在需要的地方调用相关任务即可,例如在myapp1/views.py中调用
from django.shortcuts import render
from .tasks import add
def index(request):
# 测试celery任务
add.delay(4,5)
return render(request,'index.html')
然后就可以启动项目,celery需要单独启动,所以需要开两个终端,分别
启动web应用服务器
$ python manage.py runserver
启动celery
$ celery -A proj worker -l info
然后访问浏览器就可以在启动celery的终端中看到输出
测试结果
扩展
- 如果你的项目需要在admin中管理调度,请使用django-celery-beat
- 使用pip安装django-celery-beat
$ pip install django-celery-beat
不要在使用django-celery,这个项目已经停止更新好好多年。。。。
- 在settings.py中添加这个app
INSTALLED_APPS = (
...,
'django_celery_beat',
)
- 同步一下数据库
$ python manage.py migrate
- 设置celery beat服务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
然后在就可以admin界面看到了。
- 如果你想使用Django-ORM或者Django Cache作为后端,需要安装django-celery-results扩展(笔者不建议)
- 使用pip安装django-celery-results
$ pip install django-celery-results
不要在使用django-celery,这个项目已经停止更新好好多年。。。。
- 在settings.py中添加这个app
INSTALLED_APPS = (
...,
'django_celery_results',
)
- 同步一下数据库
$ python manage.py migrate django_celery_results
- 配置后端,在settings.py中配置
# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'
# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'
基本使用大概就是上述这些,其他具体配置和使用还需自己研读官方文档
注:
- 上述环境在ubuntu16.04 lts django1.9中搭建测试成功
- 上述文字皆为个人看法,如有错误或建议请及时联系我