CELERY 集群管理实现
2018-10-21 本文已影响60人
MoonMonsterss
这篇需要结合[CELERY 测试多服务器]来看。
主要功能实现: 当创建新任务时,可以指定虚拟环境(服务器)执行任务。
1.代码
1.1 __init__.py
from celery import Celery
app = Celery('celery_app')
# 加载配置
app.config_from_object('celery_app.celeryconfig')
1.2 celeryconfig.py
from datetime import timedelta
from kombu import Queue, Exchange
result_serializer = 'json'
# 中间件
broker_url = 'redis://127.0.0.1:6379/7'
# 结果处理
result_backend = 'redis://127.0.0.1:6379/8'
# 时区
timezone = 'Asia/Shanghai'
# 导入任务模块
imports = (
'celery_app.task1',
'celery_app.task2'
)
# 定时任务
beat_schedule = {
'add-every-20-seconds': {
'task': 'celery_app.task1.multiply',
'schedule': timedelta(seconds=20),
'args': (5, 7)
},
'add-every-10-seconds': {
'task': 'celery_app.task2.add',
'schedule': timedelta(seconds=10),
'args': (100, 200)
}
}
# 任务队列
# 保持三个数据一致
# exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker
task_queues = (
Queue('default', exchange=Exchange('default'), routing_key='default'),
Queue('priority_high', exchange=Exchange('priority_high'), routing_key='priority_high'),
Queue('priority_low', exchange=Exchange('priority_low'), routing_key='priority_low')
)
task_routes = {
'celery_app.task1.multiply': {'queue': 'priority_high', 'routing_key': 'priority_high'},
'celery_app.task2.add': {'queue': 'priority_low', 'routing_key': 'priority_low'}
}
1.3 task1.py
import time
from . import app
@app.task
def multiply(x, y):
print('multiply')
time.sleep(4)
return x * y
1.4 task2.py
import time
from . import app
@app.task
def add(x, y):
print('add')
time.sleep(2)
return x + y
2. 执行
2.1 创建虚拟环境
创建使用pipenv( pipenv install
)创建两个虚拟环境,并将上面的代码文件分别复制到相应环境下。
2.2 启动celery
使用pipenv shell
打开虚拟环境
回到celery_app的上一层,使用一下命令
在虚拟环境(env1)中,
celery -A celery_app worker -l info -Q priority_high -P eventlet
在虚拟环境(env2)中
celery -A celery_app worker -l info -Q priority_low -P eventlet
作用是,queue=priority_high, routing_key=priority_high的任务都将在env1中执行,
queue=priority_low, routing_key=priority_low的任务都将在env2中执行。
2.3 创建任务
使用apply_async函数创建任务,args表示传参,queue结合routing_key确定使用的虚拟环境(服务器)
>>> for _ in range(30):
... re = task1.multiply.apply_async(args=[20,20],queue='priority_high',routing_key='priority_high')
... re2 = task2.add.apply_async(args=[20,20],queue='priority_low',routing_key='priority_low')
... print(re.get())
... print(re2.get())
在虚拟环境1中,只执行了queue=priority_high, routing_key=priority_high的任务
[2018-09-06 16:48:01,572: INFO/MainProcess] Received task: celery_app.task1.mult
iply[3c058487-eee8-4505-9238-f549726680fb]
[2018-09-06 16:48:01,574: WARNING/MainProcess] multiply
[2018-09-06 16:48:01,575: INFO/MainProcess] Received task: celery_app.task1.mult
iply[ead3df3d-a23c-4a57-825a-8e85d5033fd1]
[2018-09-06 16:48:05,581: INFO/MainProcess] Task celery_app.task1.multiply[ead3d
f3d-a23c-4a57-825a-8e85d5033fd1] succeeded in 4.0090000000018335s: 400
[2018-09-06 16:48:05,583: INFO/MainProcess] Received task: celery_app.task1.mult
iply[954575ef-fe0a-4752-907b-ded4e4a4173b]
[2018-09-06 16:48:05,584: WARNING/MainProcess] multiply
[2018-09-06 16:48:05,586: INFO/MainProcess] Received task: celery_app.task1.mult
iply[864f666f-8a5c-490b-950f-5ac5b37a83b5]
[2018-09-06 16:48:09,591: INFO/MainProcess] Task celery_app.task1.multiply[864f6
66f-8a5c-490b-950f-5ac5b37a83b5] succeeded in 4.008999999998196s: 400
在虚拟环境2中,执行了queue=priority_low, routing_key=priority_low的任务
[2018-09-06 16:47:49,519: WARNING/MainProcess] add
[2018-09-06 16:47:51,520: INFO/MainProcess] Task celery_app.task2.add[9ab04f96-d
1c5-4d3c-87c4-6e5707ef90c1] succeeded in 2.0119999999988067s: 40
[2018-09-06 16:47:53,542: INFO/MainProcess] Received task: celery_app.task2.add[
dcf76311-e251-4f8e-bf48-b3c422f63eec]
[2018-09-06 16:47:53,543: WARNING/MainProcess] add
[2018-09-06 16:47:55,545: INFO/MainProcess] Task celery_app.task2.add[dcf76311-e
251-4f8e-bf48-b3c422f63eec] succeeded in 2.0120000000024447s: 40
[2018-09-06 16:47:57,553: INFO/MainProcess] Received task: celery_app.task2.add[
60c201fa-bad4-4af4-9d02-bb68e718fb51]
[2018-09-06 16:47:57,554: WARNING/MainProcess] add
[2018-09-06 16:47:59,556: INFO/MainProcess] Task celery_app.task2.add[60c201fa-b
ad4-4af4-9d02-bb68e718fb51] succeeded in 2.0119999999988067s: 40
[2018-09-06 16:48:01,575: INFO/MainProcess] Received task: celery_app.task2.add[
1640912d-536e-43e2-9960-ebd58af5cb3f]
[2018-09-06 16:48:01,576: WARNING/MainProcess] add
[2018-09-06 16:48:03,580: INFO/MainProcess] Task celery_app.task2.add[1640912d-5
36e-43e2-9960-ebd58af5cb3f] succeeded in 2.0120000000024447s: 40
3.参考
https://my.oschina.net/hochikong/blog/518587
https://www.213.name/archives/1105