分布式Celery
前言
大家都知道Celery是分布式任务队列,可是在学习过程中,发现基本都是怎么用celery去异步操作,很少有讲到如何分布式。于是把自己的采坑过程分享给大家。
一、准备工作
1.选择一个消息中间件Broker
官方推荐的有RabbitMQ和Redis,二选一安装。具体安装可参考
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#first-steps
本人选择Redis做Broker.
2.准备一个虚拟机
确保本机和虚拟机都可以连上redis。有其他网络互通的机器也可以。
二、简单的demo
1.创建项目和python环境
mkdir myCelery
cd myCelery
virtualenv -p python2 envp2
source envp2/bin/activate
2.安装Celery和Redis
pip install celery==4.0
pip install redis
请注意这里安装的是4.0.按照官方的说法4.0是既支持Python2又支持python3的版本。官方内容如下:
Version Requirements
Celery version 4.0 runs on
Python ❨2.7, 3.4, 3.5❩
PyPy ❨5.4, 5.5❩
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
3.码代码
vim tasks.py
conding:
from celery import Celery
app = Celery('tasks', broker='redis://10.211.55.2:6379/0')
@app.task
def add(x, y):
return x + y
4.运行
把tasks作为worker运行起来
celery -A tasks worker -l info
5.发起任务
另开一个terminal
cd myCelery
envp2/bin/pyton
#python terminal 下
>>> from tasks import add
>>> add.delay(4, 4)
观察运行tasks的terminal,有结果8出来就证明一切正常。熟悉了简单的流程后,我们就在这个基础上进一步深入。
三、分布式celery
开启虚拟机,把tasks.py拷贝过去,python环境配置完成后,打开terminal。运行tasks
celery -A tasks worker -l info
这时观察本机运行tasks的terminal,多输出如下内容:
[2018-01-24 21:03:10,114: INFO/MainProcess] sync with celery@ubuntu
而虚拟机上的terminal将会输出如下内容:
[2018-01-24 21:03:09,007: INFO/MainProcess] Connected to redis://10.211.55.2:6379/0
[2018-01-24 21:03:09,017: INFO/MainProcess] mingle: searching for neighbors
[2018-01-24 21:03:10,036: INFO/MainProcess] mingle: sync with 1 nodes
[2018-01-24 21:03:10,037: INFO/MainProcess] mingle: sync complete
[2018-01-24 21:03:10,052: INFO/MainProcess] celery@ubuntu ready.
至此,集群已经成功连接。下面我们来测试一下。在本机tasks.py同级新建test_tasks.py文件,输入下面内容并新开terminal运行:
from tasks import add
#循环执行10次add函数
for i in range(10):
add.delay(i, i)
来观察一下本机tasks terminal:
[2018-01-24 21:37:18,360: INFO/MainProcess] Received task: tasks.add[33586201-082a-4e8f-8153-8b9d757990af]
[2018-01-24 21:37:18,361: INFO/ForkPoolWorker-7] Task tasks.add[33586201-082a-4e8f-8153-8b9d757990af] succeeded in 0.0004948119749315083s: 2
[2018-01-24 21:37:18,362: INFO/MainProcess] Received task: tasks.add[a6e363cf-fd25-4b0d-9da4-04bac1c5476c]
[2018-01-24 21:37:18,363: INFO/MainProcess] Received task: tasks.add[7fd2b545-f87b-49d3-941f-b8723bd1b039]
[2018-01-24 21:37:18,365: INFO/ForkPoolWorker-2] Task tasks.add[7fd2b545-f87b-49d3-941f-b8723bd1b039] succeeded in 0.000525131996255368s: 8
[2018-01-24 21:37:18,365: INFO/ForkPoolWorker-8] Task tasks.add[a6e363cf-fd25-4b0d-9da4-04bac1c5476c] succeeded in 0.000518032000400126s: 6
[2018-01-24 21:37:18,366: INFO/MainProcess] Received task: tasks.add[79d1ead3-1077-4bfd-8300-3a335f533b74]
[2018-01-24 21:37:18,368: INFO/MainProcess] Received task: tasks.add[0d0eefab-c6f0-4fa6-945c-6c7931b74e7b]
[2018-01-24 21:37:18,368: INFO/ForkPoolWorker-4] Task tasks.add[79d1ead3-1077-4bfd-8300-3a335f533b74] succeeded in 0.00042340101208537817s: 12
[2018-01-24 21:37:18,369: INFO/MainProcess] Received task: tasks.add[230eb9d1-7fa5-4f18-8fd4-f535e4c190d2]
[2018-01-24 21:37:18,370: INFO/ForkPoolWorker-6] Task tasks.add[0d0eefab-c6f0-4fa6-945c-6c7931b74e7b] succeeded in 0.00048609700752422214s: 16
[2018-01-24 21:37:18,370: INFO/ForkPoolWorker-7] Task tasks.add[230eb9d1-7fa5-4f18-8fd4-f535e4c190d2] succeeded in 0.00046275201020762324s: 18
在来看一下虚拟机上的terminal:
[2018-01-24 21:37:17,261: INFO/MainProcess] Received task: tasks.add[f95a4a20-e245-4cdc-b48a-4b79416b14c1]
[2018-01-24 21:37:17,263: INFO/ForkPoolWorker-1] Task tasks.add[f95a4a20-e245-4cdc-b48a-4b79416b14c1] succeeded in 0.00107001400102s: 0
[2018-01-24 21:37:17,264: INFO/MainProcess] Received task: tasks.add[3ddfbcda-7d75-488c-bc69-243f991bb49a]
[2018-01-24 21:37:17,267: INFO/MainProcess] Received task: tasks.add[b0a36bfe-87c4-43ef-9e6e-fb8740dd26d0]
[2018-01-24 21:37:17,267: INFO/ForkPoolWorker-1] Task tasks.add[3ddfbcda-7d75-488c-bc69-243f991bb49a] succeeded in 0.00107501300226s: 4
[2018-01-24 21:37:17,270: INFO/ForkPoolWorker-2] Task tasks.add[b0a36bfe-87c4-43ef-9e6e-fb8740dd26d0] succeeded in 0.00111001500045s: 10
[2018-01-24 21:37:17,272: INFO/MainProcess] Received task: tasks.add[7bcec842-65e5-407d-9e7d-99183956ef3e]
[2018-01-24 21:37:17,277: INFO/ForkPoolWorker-1] Task tasks.add[7bcec842-65e5-407d-9e7d-99183956ef3e] succeeded in 0.000870012001542s: 14
我们只观察succeeded in 及后面的值,本机输出了: 2 8 6 12 16 18
虚拟机上输出了:0 4 10 14
充分证明了10个add任务已经被分发处理了。
四、结束语
理解了celery的运行规则后,可以很方便的搭建分布式系统。后续将会写一篇celery如何和flask系统集成,介绍celery在Python2和Python3中使用的坑。