django+celery+rabbitmq实践
看到标题,相信大家就知道这个帖子要讲啥了……如果你希望在django中使用celery执行异步任务,用MQ(rabbimq,下同)做消息中间件,那么此贴完全可以满足你,包括celeryp、MQ配置,以及MQ路由配置等,相信你会喜欢。
简单说明一下:
1,django:web框架
2,celery: 用于创建执行异步任务
3,RabbitMQ:消息队列,主要用于消息存储
对于celery,rabbimq安装没啥好说的, pip直接装就好了,配置与启动也不赘述。关键环节简要说明如下:
一,在django中配置和使用celery
配置大致如下:
(1),工程目录下,创建celery.py, 内容编辑如下:
#!/usr/bin/python#!coding=utf-8
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
app = Celery('project')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
'''
autodiscover_tasks instructs celery to auto-discover all asynchronous tasks for all the applications listed under `INSTALLED_APPS`,
Celery will look for definitions of asynchronous tasks within a file named `tasks.py` file in each of the application directory.
'''
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request)) 另外。django settings文件下配置:
BROKER_URL= “”CELERY_RESULT_BACKEND =“”CELERYD_MAX_TASKS_PER_CHILD = 200 #每个worker执行了多少任务就会死掉
CELERYD_CONCURRENCY = 20 # celery worker的并发数.不是worker也越多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_PREFETCH_MULTIPLIER = 3 # celery worker 每次去rabbitmq取任务的数量
CELERY_CREATE_MISSING_QUEUES = True
CELERY_TASK_RESULT_EXPIRES = 3600 # 任务执行结果的超时时间
……
(2), 各app根目录下创建tasks.py
app相关的task均放置在tasks.py中,并用装饰器shared_task装饰,示例如下:
@shared_task(
# ignore_result=True, #忽略返回值
name="*****", #任务名称
exchange="***", #推送到MQ 的exchange分组
routing_key="***" # key
)
def get_jid_result(*args, **kwargs): pass 任务调度方法:
1)后台定时执行task,可安装djcelery模块,在django后台调度已注册的任务,关于djcelery的使用方法,在此不详细介绍。
2)触发调度,在需要调用task的d地方,直接调用各task的delay()方法即可(当然要先import),如:
get_jid_result.delay(*args, **kwargs)
二, celery woker
工程目录下执行sudo -u apache bash -c 'celery -A patools worker -l info (注意,此处使用apache用户执行,celery建议不要使用superuser 启动),启动celery worker。
建议:为确保celery worker进程平稳运行, 可以使用superivsor守护该进程的执行, 配置示例如下:
[*****]
directory=*****(工程根目录)
command=sudo -u apache bash -c 'celery -A prj worker -l info' (其中prj为工程名)
autostart=true
autorestart=true
stopwaitsecs=10
startretries=5
stderr_logfile=/var/log/***_in.log
stdout_logfile=/var/log/***_out.log
进入supervisorctl, 启动相应进程,OK!
三,中间件rabbitmq
对于中间件的选择,redis,MQ都可以,差异不作详细说明,视应用情况而选择。实践中采用rabbimq作为中间件,在此,对rabbimq路由做简单说明。
django settings配置,如celery配置部分, 另外,配置消息路由:
CELERY_DEFAULT_QUEUE = 'default'CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'# routing task
CELERY_IMPORTS = (
# 注册各应用tasks文件在此,注意,必须确保注册模块存在,否则将导致worker进程报错
"app1.tasks",
"app2.tasks",
# other apps
)
#交换器定义,示例如下:
default_exchange = Exchange('default', type='direct')
# bulkops related exchange
bulkops_exchange = Exchange("bulkops_exchange", type='direct')
# alphaops related exchange
Alphaops_exchange = Exchange("Alphaops_exchange", type='direct')# queue绑定# queue绑定,示例如下
CELERY_QUEUES = {
"default": { # 默认队列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "/"
},
"bulkops": { # 默认队列
"exchange": 'bulkops_exchange',
"exchange_type": "direct",
"routing_key": "bulkops.task"
},
# register other app-queue here
}
# 路由定义, 示例如下:
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None): #为任务指定route
# 示例1
if task.startswith('bulkops.tasks'):
return {
'exchange':'bulkops_exchange',
'exchange_type': 'direct',
'routing_key':'bulkops.task',
}
# 示例2
elif task.startswith('Alphaops.tasks'):
return {
'exchange': 'Alphaops_exchange',
'exchange_type': 'direct',
'routing_key': 'Alphaops.task'
}
else: # 默认被放到默认队列
return None
CELERY_ROUTES = (MyRouter(),)
以上是我在搭建基于Django框架的过程中对celery及MQ配置应用实践的小结,希望对有相关需求的同志们有些许的帮助,不足之处多多批评指正。