Python 笔记 | Celery简单使用

2020-01-14  本文已影响0人  WangLane

想了解更多的同学,推荐去看官方文档,写的很清晰:celery官方文档

这里我们使用redis作为Broker

一、 redis作为broker

1. 安装

使用redis作为broker需要安装额外的依赖, 下面这句就可以安装celery及其依赖。

pip install -U "celery[redis]"

然后你需要启动redis服务。

如果你已经配置好redis,就直接安装celery就好了。

pip install celery

2. 配置

只需要配置你的redis链接地址就可以了.

app.conf.broker_url = 'redis://localhost:6379/0'

redis链接的格式为

redis://:password@hostname:port/db_number

协议后面所有的参数都是可选参数,默认连接到localhost的6379端口, 默认数据库为 db0 .

3. 结果

如果想把结果也存储在redis中, 可以这样配置

app.conf.result_backend = 'redis://localhost:6379/0'

二、应用

首先我们要创建一个Celery应用,这个应用是用来作为你所有Celery操作的接入点。这里我们把所有的东西都塞到一个文件里面, 实际项目中需要分开写, 请参考文档

我们先创建文件 tasks.py

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

第一个参数是当前模块的名字,必选参数, 只有当任务在__main__中定义时,才会自动生成名称。

第二个参数是broker的参数, 指定你要使用的broker消息队列,这里我们使用redis

三、运行worker服务

这样我们就启动了worker服务。

celery -A tasks worker --loglevel=info

启动后会看到屏幕得到这样的输出提示:

image.png

四、调用任务

调用任务我们需要用到delay()方法

>>> from tasks import add
>>> add.delay(4, 4)

add(4,4)这个任务就会发送到队列,再由worker去处理和计算。我们可以再刚刚开启的worker服务中看到处理的提示:

image.png

调用每个任务会返回一个AsyncResult 的实例, 我们可以用它来检查任务状态,获取返回值。

image.png

五、保存结果

由于celery是异步处理, 所以如果想要拿到结果,需要有个后端(叫做backend) 暂时存储worker处理完的结果,celery有几种内置的结果后端: SQLAlchemy/Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP), 当然你可以自己定义。更多关于backend查看Result Backends.

这里我们依然使用redis。

tasks.py中的app改为:

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')

这样我们就把backend加入进去了。此时我们重新开启服务

celery -A tasks worker --loglevel=info

将刚刚的python交互shell关掉重新开启(不然使用的是之前的缓存)

image.png

ready() 方法返回任务是否处理完成, 你可以轮询结果等待任务完成, 不过在异步程序中通常不会这样使用,而是使用 get() 方法加入timeout参数。如果任务报错, get方法会再次抛出异常, 你可以指定 propagate 参数来

注意!!!
Backends 存储结果是消耗资源的,为了确保资源及时释放,每次使用完数据调用 forget()来释放资源。

关于结果存储的更多信息查看 celery.result

六、配置

Celery 使用的时候不需要太多的操作和配置,开箱即用,然鹅, 如果你深入使用的时候,会发现它又可以配置很多参数。 默认的参数有时候可能不够用, 所以就需要指定配置参数了。参数配置可以看这里 Configuration and defaults

配置可以直接在app初始化的时候指定。比如我们可以指定序列器为json(当然默认就是json)

app.conf.task_serializer = 'json'

如果有很多参数需要配置, 可以用update函数

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于大一些的项目,我们推荐使用专门的配置。配置集中化也有利于项目管理和维护。你可以通过app.config_from_object() 方法来指定配置模块。配置模块通常命名为celeryconfig,当然你也可以起别的名字。
比如我们有这样的配置文件celeryconfig.py

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

指定配置的时候:

app.config_from_object('celeryconfig')

为了验证我们的配置是否有语法错误,是否正常,我们可以import一下试试:

$ python -m celeryconfig

更多配置详情请看 Configuration and defaults

进一步学习

http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps

上一篇 下一篇

猜你喜欢

热点阅读