vdjango

django使用原生celery

2019-02-22  本文已影响0人  bigyuang

包版本:
django: 2.0.6
celery: 4.2.1
flower: 0.9.2
redis: 2.10.6
eventlet 0.24.1
参考链接:
celery: 4.2.1
flower: 0.9.2
django celery的分布式异步之路(一) 起步
使用celery的backend异步获取结果
Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案
异步任务队列Celery在Django中的使用

├─celery_django_demo
    └─celery_demo
        ├─...
        ├─tasks.py
        └─views.py
    └─celery_django_demo
        ├─...
        └─celery.py

celery.py

#  celery配置文件
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django_demo.settings')

app = Celery('celery_django_demo',
             broker='redis://127.0.0.1:6379/2',
             backend='redis://127.0.0.1:6379/3',
             include=['celery_demo.tasks']
      )
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
    CELERY_ACKS_LATE=True,
    CELERY_ACCEPT_CONTENT=['pickle', 'json'],
    CELERYD_FORCE_EXECV=True,
    CELERYD_MAX_TASKS_PER_CHILD=500,
    BROKER_HEARTBEAT=0,
)

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,   # celery任务执行结果的超时时间,即结果在backend里的保存时间,单位s
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

platforms.C_FORCE_ROOT = True

tasks.py

from __future__ import absolute_import, unicode_literals
import json
import logging
import traceback
import time
from celery import shared_task

logger = logging.getLogger(__name__)

@shared_task
def celery_demo_task(param_dict):
    logger.info('foot task start! param_dict:%s' % param_dict)
    time.sleep(5)
    return 'finished'

@shared_task
def celery_demo_task2(param_dict):
    logger.info('foot task start! param_dict:%s' % param_dict)
    time.sleep(5)
    return 'finished'

@shared_task
def celery_demo_task3(param_dict):
    logger.info('foot task start! param_dict:%s' % param_dict)
    time.sleep(5)
    return 'finished'

def do_work(user):
    dispatch(celery_demo_task, {'x': user})
    dispatch(celery_demo_task3, {'z': user})
    sub_work(user)

def sub_work(user):
    dispatch(celery_demo_task2, {'y': user})


# 分发任务
def dispatch(task, param_dict):
    param_json = json.dumps(param_dict)
    try:
        task.apply_async(
            [param_json],
            retry=True,
            retry_policy={
                'max_retries': 1,
                'interval_start': 0,
                'interval_step': 0.2,
                'interval_max': 0.2,
            },
        )
    except Exception as ex:
        logger.info(traceback.format_exc())
        raise

views.py

import traceback
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import do_work


@csrf_exempt
def hello(request):
    if request.method == 'GET':
        try:
            user = request.GET.get('username')
            do_work(user)
            return JsonResponse({'code': 0, 'msg':'success'})
        except:
            return JsonResponse({'code': -1, 'msg': traceback.format_exc()})

pre:配置好settings.py urls.py
启动django:python manage.py runserver
启动celery:celery -A celery_django_demo worker -l info -P eventlet
可视化网页监控:celery flower --broker=redis://127.0.0.1:6379/2

在浏览器打开flower http://localhost:5555/, 用postman发送get请求(或者在浏览器请求) http://127.0.0.1:8000/hello?username=xxx,就可以在flower监控celery执行了。

注:
增删改task之后,要重启一下celery

----------------------------------------------------欢迎指正----------------------------------------------------

上一篇 下一篇

猜你喜欢

热点阅读