定时任务、Celery、消息队列

2022-06-06  本文已影响0人  李霖弢

python定时任务有以下常见方案

import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict

def confdict_handle():
    while True:
        try:
            loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('本地时间:'+str(loca_time))
            time.sleep(10)
        except Exception as e:
            print('发生错误,错误信息为:', e)
            continue


def main():
    '''
    主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
    '''
    try:
        # 启动定时任务,多个任务时,使用多线程
        task1 = threading.Thread(target=confdict_handle)
        task1.start()
    except Exception as e:
        print('发生异常:%s' % str(e))

if __name__ == '__main__':
    main()

Celery

Celery是一个任务队列管理工具,可用于实现异步接口、定期删除/缓存Redis数据、定期发送消息等。Celery本身不提供消息存储

消息队列

使用场景
  1. 解耦
    如在订单与库存系统中加入消息队列,使两个系统解耦
  2. 异步任务
    如发送短信、邮件、刷新缓存等
  3. 流量削峰
    如秒杀活动等高并发场景
broker选择

相比于redis的优势:

  1. 发布确认:生产者发布消息给Broker后,会收到Broker的反馈,保证发布成功
  2. 消费确认:消息提交给消费者后,如未成功消费确认,会返回到消息队列
  3. 高可用性:自带集群和负载均衡
  4. 持久化:redis只能将整个数据库持久化,而RabbitMQ可以对每条队列或消息分别设置持久化
基本使用DEMO
  1. 安装redis和celery
    如果celery>=4.0,需要确保redis>=2.10.4
apt-get install redis-server
pip install redis
pip install celery
  1. 建立task
#tasks.py
from celery import Celery
 
app = Celery('tasks',  backend='redis://:yourpassword@localhost:6379/0', broker='redis://:yourpassword@localhost:6379/0') #配置好celery的backend和broker
 
@app.task  #普通函数装饰为 celery task
def add(x, y):
    return x + y

也可以通过app.config_from_object() 加载配置模块:

app.config_from_object('celeryconfig')

# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
  1. 启动worker(开始从队列中读取任务并执行)
celery -A tasks worker --loglevel=info

可通过celery worker --help查看命令列表,常用包括以下内容:

  1. 触发任务
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
    time.sleep(1)
print 'task done: {0}'.format(result.get())
#settings.py
from celery.schedules import timedelta, crontab
# 默认使用UTC时区,建议改为`'Asia/Shanghai'`
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
    "task_every_30_minutes": {# 该key随便起,不需要调用。
        "task": "task_every_30_minutes",# 实际调用的是这个task
        "schedule": timedelta(minutes=30)
    },
    "task_every_day_start": {
        "task": "task_every_day_start",
        "schedule": crontab(minute=0, hour=0)
    },
    "task_every_even_hour": {
        "task": "task_every_even_hour",
        'schedule': crontab(minute=0, hour='0,2,4,6,8,10,12,14,16,18,20,22'),
    },
}

在django中使用celery
上一篇 下一篇

猜你喜欢

热点阅读