Celery消息队列的配置简例(模拟优先级)

2024-01-02  本文已影响0人  Mr韩_xianfeng

本例将使用django+celery+redis演示如何配置celery的任务队列以及如何使用他们
django==3.0.4
celery==4.4.0
redis==3.3.10
大家都知道redis不像rabbitmq支持任务的优先级,但可以使用不同的队列来区分不同的任务类型,比如优先级较高且耗时较短的任务都放到queue1,而耗时较长的可以放到queue2。这样不同类型的任务就不再互相干扰了,这不是个完美的方案,但可以满足大部分场景了。

配置

方法1

CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = {
    "default": {
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "queue0": {
        "routing_key": "P0",
        "exchange": "P0",
        "exchange_type": "P0",
    }
}

方法2

from kombu import Queue,Exchange
CELERY_QUEUES = ( 
  Queue('default', Exchange('default'), routing_key='default'), 
  Queue('P0', Exchange('P0'), routing_key='P0'),
  ) 

其中路由键支持正则匹配,比如routing_key="WEB.” 则以WEB开头的路有都归到default队列.
其它参数自行查阅吧

调用

@app.task(name='DEMO_P0', queue='P0')
def task_p0():
    print('this is p0')
    return 0

注:也可以在调用时指定队列
r = task1.apply_async(args=(1, 2), queue='queue1', routing_key='queue1')
亦或使用CELERY_ROUTES配置路由

启动

celery -A core worker -E -l info
celery -A core worker -E -l info -Q P0
这里第一行是消费默认的default队列,第二行是单独消费P0队列
注:这里启动时我没有指定-P eventlet,因为测试时发现会提示django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread.
网上有几个解法如版本问题、关闭线程数据库连接等都不太好使,也没再深入研究。如果哪位知晓也烦请告知,感谢。

上一篇下一篇

猜你喜欢

热点阅读