python

【Celery】python分布式框架基础

2017-10-31  本文已影响420人  lndyzwdxhs

0x01 基本概念

架构图

如上图所示,由三部分组成:消息中间件(message broker)、任务执行单元(worker)、任务执行结果存储(task result store)

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

0x02 实例

创建实例app:

# celery.py
from celery import Celery

app = Celery('task_name', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')

# 加载celery配置
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Europe/Oslo',
    CELERY_ENABLE_UTC=True,

## 可以把配置写到py文件内加载
app.config_from_object('django.conf:settings')

## 自动发现任务(需要在app下创建tasks.py模块)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

创建tasks.py:

# tasks.py
from celery import app

@app.task
def add(x, y):
    return x+y

启动worker(启动完,当然此时broker中还没有任务,worker此时相当于处于待命的状态)

celery -A tasks task_name --loglevel-info

触发任务:

#trigger.py
from tasks import add

result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用

while not result.ready():
    time.sleep(1)

print 'task done: {0}'.format(result.get())

delay返回的是一个AsyncResult对象,里面存的就是一个异步的结果,当任务完成时result.ready()为True,然后用result.get()取结果即可。

至此,一个简单的任务队列就完成了。


欢迎关注微信公众号(coder0x00)或扫描下方二维码关注,我们将持续搜寻程序员必备基础技能包提供给大家。


上一篇下一篇

猜你喜欢

热点阅读