django 任务管理-apscheduler

2022-08-03  本文已影响0人  菩提老鹰
python-apscheduler.png

主要介绍通过 apscheduler的三种

Cron

在特定时间定期运行,相比较Linux crontab 多了 second/year/week(第多少周)/start_date/end_date

主要的参数:

特殊说明
1、linux crontab 中的 week 对应到 apscheduler中是 day_of_week (取值0到6后者mon,tue,wed,thu,fri,sat,sun)
2、配置job的时候,并不是所有时间字段都是必须
3、但是需要知道的是,隐含 大于最小有效值的字段默认为*,而较小的字段默认为其最小值,除了weekday_of_week 默认为*
举例说明:

# month=6, hour=1 最小有效值字段为 hour; 其等价于 
year='*', month=6, day=*, week='*', day_of_week='*', hour=1, minute=0, second=0
# 意思是在每年 6 月每天 0 点 0 分 0 秒运行;

4、字段支持表达式,其中特殊的是 day, 支持 lastlast xxth y

Interval

以固定的时间间隔运行Job

主要的参数:

Date

某个特定时间仅运行一次的任务,类似于Linux的 at

主要的参数:

Demo演示

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
@Author:    Colin
@Github:    https://github.com/opscolin
@DateTime:  2022/8/3 11:14 AM
@File:      demo_aps.py
@Software:  PyCharm
"""

import datetime
import time
import logging

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
# from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# 日志 logger 配置
logging.basicConfig(filemode='a', filename='python-apscheduler.log')

# apscheduler store/executor 配置
job_stores = {
    # 'mongo': MongoDBJobStore(),
    # 'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
    'db-ops': SQLAlchemyJobStore(
        url='mysql+pymysql://xxxx:yyyy@192.168.a.b:3306/dbname?charset=utf8mb4')
}

executors = {
    'default': ThreadPoolExecutor(20),
    # 'default': {'type': 'threadpool', 'max_workers': 20},
    'process_pool': ProcessPoolExecutor(5)
}

job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

# 方式一
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)


# 方式二
# scheduler = BackgroundScheduler()
# scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone=utc)


# 定义任务Job
# 无参数Job
def job1():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"No params of job1  at {now}")


# 带参数 Job
def job2(name):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"Job with param {name} at {now}")


# Demo - Cron
scheduler.add_job(job2, trigger='cron', hour=11, minute=45, second=10, args=['Betty'])
# 每 30 秒执行一次
scheduler.add_job(job1, trigger='cron', second='*/20', jobstore='db-kfzops')
# 2022-12-31 之前, 第一季度和第三季度的每个月第三个周一的凌晨1点半执行
scheduler.add_job(job2, trigger='cron', args=['Betty'],
                  end_date='2022-12-31',
                  month='1-3,7-9',
                  day='3rd mon',
                  hour=1,
                  minute=30)

# Demo - Interval -> 每 30 秒执行一次
scheduler.add_job(job1, trigger='interval', seconds=30)

# Demo - Date
scheduler.add_job(job2, trigger='date', run_date='2022-08-03 14:20:00', args=['Betty'])


def main():
    scheduler._logger = logging
    scheduler.start()
    while True:
        time.sleep(1)


if __name__ == '__main__':
    main()

脚本地址:https://gitee.com/colin5063/pylearn2022/blob/master/examples/scripts/demo_apscheduler.py

总结:

遇到的问题

目前把任务的结果通过 jobstore 写入 sqlite 或者 MySQL中,job_state字段都是乱码,即使配置了字符集,也是乱码,目录该问题暂未找到原因。

如果大家有答案,欢迎留言分享


如果觉得文章对你有所帮忙,欢迎点赞收藏,或者可以关注个人公众号 全栈运维 哦,一起学习,一起健身~

上一篇 下一篇

猜你喜欢

热点阅读