coding

Celery 进阶学习

2017-11-29  本文已影响32人  esrever

Celery 进阶学习

参考链接: Celery 4.1.0 documentation

初始文件

安装部署celery相关的pip包,参考文档或Celery 部署小记

另外,本文使用ipython作为控制台的交互式解释器,pip install ipython

tasks.py例1

from celery import Celery


class CeleryConfig():
    broker_url = 'redis://localhost'
    result_backend = 'redis://localhost'
    timezone = 'Asia/Shanghai'


app = Celery()
app.config_from_object(CeleryConfig)


@app.task
def add(a, b):
    return a + b

以上文件可以正常通过以下命令启动

celery -A tasks worker --loglevel=info

例1中使用类的方式来加载配置,其他方式有:

Configuration

抽象任务(类)

所有task都必须使用@app.task装饰器来装饰,经过装饰器之后,这些任务会继承Task类。可以通过继承Task类,来创建一个抽象类,供task装饰

tasks.py例2

from celery import Celery


# 抽象tasks
from celery import Task
class DebugTask(Task):
    # 在调用之前打印一行字
    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)


class CeleryConfig():
    broker_url = 'redis://localhost'
    result_backend = 'redis://localhost'
    timezone = 'Asia/Shanghai'


app = Celery()
app.config_from_object(CeleryConfig)


@app.task(base=DebugTask)
def add(a, b):
    return a + b

ipython调试

In [1]: from tasks import add

In [2]: add.delay(2, 3)
Out[2]: <AsyncResult: d9e63190-0591-403d-a5be-8b59893fcb2d>

celery输出

[2017-11-20 15:52:05,660: INFO/MainProcess] Received task: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]  
[2017-11-20 15:52:05,662: WARNING/ForkPoolWorker-4] TASK STARTING: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]
[2017-11-20 15:52:05,666: INFO/ForkPoolWorker-4] Task tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d] succeeded in 0.00480927500029793s: 5

acks_late选项

task在经过worker确认(acknowledge)之后,才会从worker的任务队列中移除。并且worker维护的任务队列可以保留相当大量的队列信息,即使这个worker被杀掉,任务信息仍然可以转移到其他的worker

无限期阻塞的任务

由于网络传输等问题,导致任务无限期阻塞,会阻止此worker实例执行其他工作,解决方案是:

connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))

Prefork池预取设置

prefork池默认将异步发送尽可能多的任务到进程中(进程预取任务)。对于延时短的任务,这样会加快速度,但是如果是高延时的任务,该进程后面的任务会长期处于等待。

默认设置: worker会发送任务给缓冲区可写的进程,例子如下

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3

使用-Ofair选项可以关闭预取设置,此时,worker会发送任务给真正可用于工作的进程,例子如下

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send T3 to process B
# B executes T3

<- T3 complete sent by process B
<- T1 complete sent by process A

task options

@app.task(options...)

link

logging

worker会自动建立log,当然你也可以自定义log,例子如下

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

参数检测(typing)

>>> @app.task
... def add(x, y):
...     return x + y

# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

# typing 属性
>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

# Works locally, but the worker reciving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

隐藏敏感信息(避免进入log)

v4.0之后,且task_protocol为2或以上才有效(该值在4.0之后默认为2)

可以使用argsreprkwargsrepr调用参数来覆盖敏感信息,例子如下

>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

>>> charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

但实际上,只要可以从broker中读取数据,仍然可以获得这些“敏感信息”,所以如果需要高度保密的数据,要使用其他方法存储(加密等)

重试(Retrying)

当任务执行出现错误情况,可以通过设置retry来解决可恢复的错误。celeryretry机制会确保由相同的队列去执行此task-id的原始任务。简单的例子如下

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

特别注意,retryraise出来的,所以,即使后面有代码,也不会执行。另外,这个异常会被worker视为需要重试,以便在启用result backend时,可以存储正确的状态(RETRY)

重试的各种选项和应用:

以下是@app.task的参数

以下三个选项是v4.1版本支持的

task方法选项列表

类似@app.task(option1=xx, option2=yy),括号内的参数即选项
http://docs.celeryproject.org/en/master/userguide/tasks.html#general

一些选项(部分参考文档即可):

状态

link

Handlers

在任务返回、失败、重试、成功、超时等事件发生的时候,触发特定的方法:after_return, on_failure, on_retry, on_success, on_timeout ...

可用于状态转移的监控,如发邮件提醒等

一个自定义请求的例子如下

import logging
from celery.worker.request import Request

logger = logging.getLogger('my.package')

class MyRequest(Request):
    'A minimal custom request to log failures and hard time limits.'

    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        super(Request, self).on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )
        logger.warning(
            'Failure detected for task %s',
            self.task.name
        )

class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

@app.task(base=MyTask)
def some_longrunning_task():
    # use your imagination

最佳实践

Tips and Best Practices

Optimizing

  1. 忽略不需要的结果,ignore_result=True
  2. 尽量避免使用同步子任务(task调用需要依赖其他task执行的结果,这样会造成互相等待,陷入死锁) Avoid launching synchronous subtasks
  3. 设置broker_pool_limit,默认为10,可以根据使用连接的线程的数目调整 link
  4. worker_prefetch_multiplier表示一次prefetch多少条消息乘以并发进程数,默认值为4(每个进程4个消息)。对于长时间的任务,可以把这个值设置为1,其实就相当于关闭预取;对于短时任务,可以设置大一些,比如64,128等;对于长短不一的任务,可以通过Routing Tasks,即分队列的方式执行
  5. 针对长任务,许多人希望的是让当前执行的任务数与保留待确认的任务数目相同,且都等于当前并发数(如-c 10,此时在执行的任务是10个,等待的任务数也是10个)。满足这样的要求选项:task_acks_late = Trueworker_prefetch_multiplier = 1
  6. 在默认的prefork模式下,进程池中的进程可能处于空闲或忙碌的状态。-O是优化选项,如果是default,进程是预取来自worker中的任务的,可能造成长时间的等待;如果是fair,进程只在有空闲的时候,才会去取任务执行。设置fair对于耗时长的任务来说比较有利
上一篇 下一篇

猜你喜欢

热点阅读