Python中关于消息队列Celery的任务放重复机制处理
2019-05-24 本文已影响0人
小钟钟同学
为了防止处理多个worker重复的消费的任务的问题,实践了一下关于celery_once的处理。
环境:
image.png
;
win7+redis2.1.8+celery3.1.19+celery_once3.00
项目:(PS如果celery的app实例不是放在子ini下面的哈,会莫名其妙报错!暂时未知)
image.png前提需要期待redis-server
1:编写celery实例:ini.py
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:00
@version: v1.0.0
@Contact: 308711822@qq.com
@File: __init__.py.py
@文件功能描述:
"""
import time
from celery import Celery
#
broker = 'redis://127.0.0.1:6379/2'
backend = 'redis://127.0.0.1:6379/0'
#
# app = Celery('my_task', broker=broker, backend=backend)
# from celerytakls import task1
# task1.init()
#
# @app.task
# def add(x, y):
# task1.getV(1)
# time.sleep(5) # 模拟耗时操作
# return x + y
#
# @app.task
# def jianshao(x, y):
# task1.getV(1)
# time.sleep(5) # 模拟耗时操作
# return x + y
#
# if __name__ == '__main__':
# app.start()
# ===============
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('my_task', broker=broker,backend=backend)
# 一般之前的配置没有这个,需要添加上
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# 在原本没有参数的里面加上base
@celery.task(base=QueueOnce)
def slow_task(x,y):
sleep(30)
return "Done!"
2:然后启动worker
PS:celery的app实例在celerytakls的下面 所有 -A后面的实例所在的位置
(lin-cms-flask) D:\python_learn\lin-cms-flask>celery worker -A celerytakls --loglevel=info
image.png
3:编写生产者,消息的发生:xiaofeizhe.py
#!/usr/bin/evn python
# coding=utf-8
import datetime
from datetime import timedelta
# from celerytakls import task1
from celerytakls import slow_task
# task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8)
slow_task.apply_async(args=[1336, 8]) # 也可用 task1.add.delay(2, 8)
4:启动多次的生产者是观察发现拨错!!
image.png
5:不同的任务参数,句可以多次提交
image.png补充:
@task(base=QueueOnce, once={'graceful': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
另外如果要手动设置任务的 key,可以指定 keys 参数
@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
修改 task 参数
@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b