django定时器_djcelery+mq的使用

2021-01-22  本文已影响0人  猪儿打滚

环境

python 3.6
django 2.1.8

下载安装

celery==3.1.15
django-celery==3.3.1
flower==0.9.3

代码步骤

-1、配置 settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    
    'djcelery', # 注册
    ...
]


import djcelery  # django的celery,省去了在celery中配置django环境,并且还能在django后台管理任务

## 下面是djcelery配置

# 当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的
# 所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。
djcelery.setup_loader()

CELERY_ENABLE_UTC = True
# CELERY_ENABLE_UTC = False

# CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE

BROKER_URL = 'amqp://guest@localhost//'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # 任务元数据保存到数据库中

CELERY_ACCEPT_CONTENT = ['application/json']

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

# CELERY_TASK_RESULT_EXPIRES = 86400  # celery任务执行结果的超时时间, 此配置注释后,任务结果不会定时清理

CELERYD_CONCURRENCY = 1 if DEBUG else 10 # celery worker的并发数

CELERYD_MAX_TASKS_PER_CHILD = 100  # 每个worker执行了多少任务就会销毁
from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'TestPaltForm.settings') # 项目的settings文件

app = Celery('TestPaltForm') # 项目名为入参

app.config_from_object('django.conf:settings') # 读取settings中的celery配置

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
from djcelery.models import PeriodicTask
from django.db import models


class TaskExtend(models.Model):
    """
    拓展PeriodicTask模型
    """
    create_time = models.DateTimeField('创建时间', auto_now_add=True)
    update_time = models.DateTimeField(auto_now=True, verbose_name='更新时间', help_text='更新时间')
    email_list = models.CharField('邮箱列表', max_length=2048, default='[]')
    author = models.CharField('创建人', max_length=100, default='')
    project = models.IntegerField('任务所选项目', default=0)
    periodic_task = models.OneToOneField(PeriodicTask, on_delete=models.CASCADE, related_name='taskextend')
"""
Author: LZL
"""
from django_filters import rest_framework as filters

from djcelery.models import PeriodicTask


class PeriodicTaskFilter(filters.FilterSet):
    name = filters.CharFilter(field_name="name", lookup_expr='contains')
    description = filters.CharFilter(field_name="description", lookup_expr='contains')
    author = filters.CharFilter(field_name="taskextend__author", lookup_expr='contains')
    create_time = filters.DateFromToRangeFilter(field_name='taskextend__create_time')
    update_time = filters.DateFromToRangeFilter(field_name='date_changed')

    class Meta:
        model = PeriodicTask
        fields = '__all__'
"""
Author: LZL
"""
from rest_framework import serializers
from djcelery.models import PeriodicTask

from apps.timedtask.models import TaskExtend


class TaskExtendSerializer(serializers.ModelSerializer):
    """
    用例信息序列化
    """

    class Meta:
        model = TaskExtend
        fields = '__all__'


class PeriodicTaskSerializer(serializers.ModelSerializer):
    """
    用例信息序列化
    """
    # 需要对taskextend进行序列化校验
    task_extend = TaskExtendSerializer(source='taskextend', read_only=True)
    # crontab_time = CrontabScheduleSerializer(source='crontab', read_only=True)
    # 对crontab的str返回的时间进行序列化校验:
    # return '{0} {1} {2} {3} {4} (m/h/d/dM/MY)'.format(
    #     cronexp(self.minute),
    #     cronexp(self.hour),
    #     cronexp(self.day_of_week),
    #     cronexp(self.day_of_month),
    #     cronexp(self.month_of_year),
    # )
    crontab_time = serializers.ReadOnlyField(source='crontab.__str__')

    class Meta:
        model = PeriodicTask
        fields = '__all__'
"""
Author: LZL
"""
from __future__ import absolute_import

import os
from datetime import datetime

from celery import shared_task
from djcelery.models import PeriodicTask

from TestPaltForm import settings
from ..testcases.models import TestCases
from ..testcases.serializers import TestcaseEnvSerializer
from ..envs.models import Envs
from utils import common


def run_task_by_cases(func, periodic_args=None):
    """
        执行以testcase为维度的定时任务
    :param func: 定时任务时,传入调用的定时任务函数
    :return:
    """
   # 业务代码略过


@shared_task
def periodic_run(task_args):
    """
        task_args = {
        "case_list": [18, 19],
        "env": 4,
        "project": 10,
        "periodic": 10,
        "name": "定时任务1",
        "description": "定时任务1描述",
        "receivers": ["admin@admin.com", "test@qq.com"]
    }
    :param task_args:
    :return:
    """
    return run_task_by_cases(periodic_run, task_args)

import json
import logging

from rest_framework.response import Response
from rest_framework import generics, status
from djcelery.models import PeriodicTask, CrontabSchedule, PeriodicTasks
from rest_framework.views import APIView

from .serializers import PeriodicTaskSerializer, TaskExtendSerializer
from .filtersets import PeriodicTaskFilter
from .models import TaskExtend
from .tasks import periodic_run


class PeriodicTaskView(generics.ListCreateAPIView):
    """
    提供查询,创建
    """
    queryset = PeriodicTask.objects.all().order_by('-date_changed')
    serializer_class = PeriodicTaskSerializer
    filterset_class = PeriodicTaskFilter

    def post(self, request, *args, **kwargs):
        """
        1、先创建或获取Crontab实例
        2、保存任务实例
        data 不要带日期类数据
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        data = json.loads(json.dumps(request.data))
        # 获取或创建crontab
        # crontab_time = {
        #     'minute': '1',
        #     'hour':'1',
        #     'day_of_week':'*',
        #     'day_of_month':'*',
        #     'month_of_year':'*'
        # }
        project = int(data.pop('project'))
        # 前端传递来的crontab时间dict
        crontab_time = data.get('crontab')
        if crontab_time:
            # 如果不带crontab的任务,则是手动运行的
            # 创建定时策略,并获取到实例对象
            crontab, _ = CrontabSchedule.objects.get_or_create(**crontab_time)
            data['crontab'] = crontab.id  # 获取定时策略的id
        data['task'] = 'apps.timedtask.tasks.periodic_run'  # 后期可改为动态获取
        # 保存任务实例
        email_list = data.pop('email_list')
        author = data.pop('author')
        # 序列化定时任务的参数

        serializer = PeriodicTaskSerializer(data=data)
        if serializer.is_valid():
            try:
                serializer.save()  # 保存定时任务
                obj = PeriodicTask.objects.get(pk=serializer.data['id'])
                PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                # 保存拓展信息
                TaskExtend.objects.create(**{
                    'email_list': email_list,
                    'author': author,
                    'periodic_task_id': serializer.data['id'],
                    'project': project
                })
                return Response(serializer.data)
            except Exception as e:
                return Response({'detail': serializer.errors}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
        else:
            return Response({'message': '创建失败,已有同名任务', 'detail': serializer.errors},
                            status=status.HTTP_400_BAD_REQUEST)


class PeriodicTaskDetailView(generics.RetrieveUpdateDestroyAPIView):
    """
    debugtalk信息单查询、修改、删除
    """
    queryset = PeriodicTask.objects.all().order_by('-date_changed')
    serializer_class = PeriodicTaskSerializer

    def put(self, request, *args, **kwargs):
        """
        针对编辑功能
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        data = json.loads(json.dumps(request.data))
        # 获取或创建crontab
        project = int(data.pop('project'))
        crontab_time = data.get('crontab')
        if crontab_time:
            # 如果不带crontab的任务,则是手动运行的
            crontab, _ = CrontabSchedule.objects.get_or_create(**crontab_time)
            data['crontab'] = crontab.id
        # 保存任务实例
        task_extend = data.pop('task_extend')
        task_extend['project'] = project  # 更新所属项目
        task_extend['email_list'] = data.pop('email_list')
        try:
            periodic_id = data['id']
            PeriodicTask.objects.filter(id=periodic_id).update(**data)
            obj = PeriodicTask.objects.filter(id=periodic_id).first()
            if obj:
                PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                # 保存拓展信息
                TaskExtend.objects.filter(pk=task_extend.get('id')).update(**task_extend)
            else:
                return Response({'message': '定时任务不存在'}, status=status.HTTP_400_BAD_REQUEST)
            return Response({'message': '任务修改成功'}, status=status.HTTP_200_OK)
        except Exception as es:
            return Response({'message': '修改失败,已有同名任务'}, status=status.HTTP_400_BAD_REQUEST)

    def patch(self, request, *args, **kwargs):
        """
        局部修改,只修改enabled
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        try:
            obj = PeriodicTask.objects.get(pk=kwargs.get('pk'))
            # enabled_data = json.loads(request.data.get('enabled'))
            obj.enabled = request.data.get('enabled')
            obj.save()
            PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
            return Response({'message': '{}成功'.format('启用' if obj.enabled else '禁用')}, status=status.HTTP_200_OK)
            # return Response({'enabled': obj.enabled}, status=status.HTTP_200_OK)
        except Exception as es:
            return Response({'msg': '任务状态修改失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)


class RunPeriodicTaskView(APIView):
    """
    立刻执行一次定时周期任务的任务
    """

    def post(self, request, *args, **kwargs):
        # 任务记录的id
        task_id = kwargs.get('pk')
        try:
            # args列表,第一个元素为task_info
            args = eval(PeriodicTask.objects.get(pk=task_id).args)
            periodic_run.delay(*args)
            return Response({'message': '任务开始运行,请稍后查询结果...'})
        except PeriodicTask.DoesNotExist:
            resp = {
                'message': '所运行任务不存在,id:{}'.format(task_id)
            }
            return Response(resp)
"""
Author: LZL
"""
from django.urls import path

from .views import PeriodicTaskView, PeriodicTaskDetailView, RunPeriodicTaskView

urlpatterns = [
    # 定时任务
    path('periodic/', PeriodicTaskView.as_view()),
    path('periodic/<int:pk>/', PeriodicTaskDetailView.as_view()),
    path('run_periodic/<int:pk>/', RunPeriodicTaskView.as_view()),  # 手动运行(定时)任务
]

生成的数据表

数据表

启动

启动worker:celery -A 项目名 worker -l info -P eventlet
启动beat :celery -A 项目名beat -l info
启动celery后台(需要查看才启动):celery flower
启动mq:自行百度
django后台也可以查看定时任务

上一篇下一篇

猜你喜欢

热点阅读