Celery进阶

2019-03-15  本文已影响0人  ZplD

Celery中Task类方法重写

应用场景:下面四中方法在各自场景下执行,但源码却没有做出任何操作,所有如果有需要可重写一个类并继承Task类,重写下面的方法

from celery import Celery
import celery

class MyTask(celery.Task):
    def on_success(self, retval, task_id, args, kwargs):
        print('task done: {0}'.format(retval))
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('task fail, reason: {0}'.format(exc))
        # return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

app = Celery('tasks', backend='redis://localhost:6379/0',
                 broker='redis://localhost:6379/0')  # 配置好celery的backend和broker
@app.task(base=MyTask,name='Demo.tasks.add')
def add(x, y):
    raise KeyError
    return x + y

将任务绑定为实例方法

from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', backend='redis://localhost:6379/0',
                 broker='redis://localhost:6379/0')  # 配置好celery的backend和broker
@app.task(name='Demo.tasks.add',bind=True)
def add(self,x, y):
    logger.info(self.request.__dict__)
    return x + y

注意add方法里的self和类方法里的self相似都代表是自己本身,都是可不传的参数,可获取到与自身有关的所有参数

具体参数参考这里

任务状态回调

任务状态为以下几种:


image.png

如果有一个耗时较长的任务进行的时候,如果我们想知道他的进度的话,我们可以定义个任务状态,用来说明任务的进度

#tasks.py
@app.task(name='Demo.tasks.test_mes',bind=True)
def test_mes(self):
    for i in range(1,11):
        time.sleep(0.1)
        self.update_state(state='PROGRESS',meta={"p": i*10})
    return 'finish'

------------------------------------------------------------------------
#trigger.py
from Demo.tasks import test_mes
import sys
def pm(body):
    res = body.get('result')
    if body.get('status') == 'PROGRESS':
        print('res:'+str(res))
        sys.stdout.write('\r任务进度: {0}%'.format(res.get('p')))
        sys.stdout.flush()
    else:
        print('\r')
        print('over:'+str(res))

r = test_mes.delay()

r.get(on_message=pm, propagate=False)

使用self.update_state可以状态发送给Woker,其中state为我们自己定义的状态,而meta为传给worker的参数。在woker中可以用r.get(on_message=pm, propagate=False)将这些参数与状态传给某个函数,从而可以知道该任务的进度,其中on_message为将传来的参数放于哪个函数中,progagate为是否要传递错误信息,如果为False即不传递错误信息

定时任务处理

有些任务我们需要定时间的去启动,那么这时候我们就需要配置celery,创建celery的配置文件,如celery_config,里面放入

from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'tasks': {
        'task': 'tasks.period_task',
        'schedule': crontab(minute="*/1"),
    },

}

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

CELERYBEAT_SCHEDULE为定时启动的任务,其中task对应的value就是你要定期启动的函数,schedule就是定时时间,分别可以使用contab,timedelta都可以配置时间,如果函数需要传入参数,使用"args":参数
"schedule": crontab(minute="/10", # 每十分钟执行
"schedule": crontab(minute="
/1"), # 每分钟执行
"schedule": crontab(minute=0, hour="*/1"), # 每小时执行
"schedule": timedelta(seconds=5) # 没五秒执行一次
另外的一些参数:
BROKER_URL: Broker配置,使用redis作为消息中间件
CELERY_RESULT_BACKEND : BACKEND,使用redis存放结果
CELERY_RESULT_SERIALIZER : 结果序列化方案
CELERY_TASK_RESULT_EXPIRES : 任务过期时间

在任务文件中tasks.py输入一下代码

from celery import Celery

app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')
@app.task(bind=True,name='tasks.period_task')
def period_task(self):
    print('period task done: {0}'.format(self.request.id))

开启一个终端执行celery -A tasks beat(需注意路劲问题,在与任务文件的同级目录下执行,tasks为文件名)
开启另外一个终端执行celery -A tasks worker --pool=solo -l info


image.png

需要注意的是如何时间方面有涉及到中国地区的话,需在配置中加入时区信息CELERY_TIMEZONE = 'Asia/Shanghai',默认为以utc为标准。

任务编排

链式任务chain

有时候我们一个任务需要等待上个任务执行完才能执行的话,我们就需要用到链式任务

@app.task(name="tasks.fetch_page")
def fetch_page(url):
    print('进入第一层'+url)
    return '我是第一层'

@app.task(name="tasks.parse_page")

def parse_page(page):
    print('进入第二层'+page)
    return '我是第二层'

@app.task(name="tasks.store_page_info")

def store_page_info(info, url):

    print('进入第三层'+info+url)
    return '执行结束'

假定函数有三层每层都需要上一层的返回才能继续往下执行

Woker文件中写入以下代码

from celery import group, chain

from Demo.tasks import *
def update_page_info(url):

    # fetch_page -> parse_page -> store_page
    # 第一种写法
    # chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    #
    # chain()
    # 第二种写法,res.get()为最后返回的结果
    res = chain(fetch_page.s(url),parse_page.s(),store_page_info.s(url))()
    while True:
        if res.ready():
            print("res"+str(res.get()))
            break
update_page_info('www.baidu.com')

组任务group

并行执行组内的每个任务

group(fetch_page.s(url),parse_page.s('test1'),store_page_info.s('test2',url))()

任务分割chord

分为header和body两个部分,会先执行header在将header的结果传给body执行

chord(header=[fetch_page.s(url)],body=parse_page.s())()

任务分组chunks

按照任务个数分组,并不是并发执行

fetch_page.chunks(['1','2','3'],2)() #2代表每组的任务个数,需要注意的是如果第一个参数传入的是字符串的话,那么字符串会被分割成每个字符当作参数传入
上一篇下一篇

猜你喜欢

热点阅读