apscheduler定时任务
组件介绍
APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:
1.triggers: 任务触发器组件,提供任务触发方式
有三种可选
- cron: 类似于linux下的crontab格式,属于定时调度
- interval: 每隔多久调度一次
- date: 一次性调度
cron: 类linux下的crontab格式,属于定时调度
interval:每隔多久调度一次
date:一次性调度
#1. cron风格
(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型
year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
month (int|str) – month (1-12) -(表示取值范围为1-12月)
day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
second (int|str) – second (0-59) - (表示取值范围为0-59秒)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
#如:在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
#2.interval风格
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
#如:每隔2分钟执行一次
scheduler.add_job(myfunc, 'interval', minutes=2)
#3.date风格
run_date (datetime|str) – the date/time to run the job at -(任务开始的时间)
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
#如:在2009年11月6号16时30分5秒时执行
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
2.job stores: 任务保存组件,提供任务保存方式
- base
- memory
- mongodb
scheduler.add_jobstore('mongodb', collection='example_jobs')
- redis
scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
- rethinkdb
scheduler.add_jobstore('rethinkdb', database='apscheduler_example')
- sqlalchemy
scheduler.add_jobstore('sqlalchemy', url=url)
- zookeeper
scheduler.add_jobstore('zookeeper', path='/example_jobs')
3.executors: 任务调度组件,提供任务调度方式
- base
- debug
- gevent
- pool(max_workers=10)
- twisted
4.schedulers: 任务调度组件,提供任务工作方式
- BlockingScheduler:进程中只运行调度器时的方式
from apscheduler.schedulers.blocking import BlockingScheduler
import time
scheduler = BlockingScheduler()
def job1():
print "%s: 执行任务" % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
- BackgroundScheduler:不想使用任何框架时的方式
from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
print "%s: 执行任务" % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
while True:
pass
- AsynclOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
import asyncio
except ImportError:
import trollius as asyncio
...
...
# while True:pass
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
- GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler
...
...
g = scheduler.start()
# while True:pass
try:
g.join()
except (KeyboardInterrupt, SystemExit):
pass
- TornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler
...
...
# while True:pass
try:
IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
pass
- TwistedScheduler:Twisted方式
from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler
...
...
# while True:pass
try:
reactor.run()
except (KeyboardInterrupt, SystemExit):
pass
- QtScheduler:Qt方式
简单实例
1.安装
pip install apscheduler
2.实例
from apscheduler.schedulers.blocking import BlockingScheduler
import time
# 实例化一个调度器
scheduler = BlockingScheduler()
def job1():
print "%s: 执行任务" % time.asctime()
# 添加任务并设置触发方式为3s一次
scheduler.add_job(job1, 'interval', seconds=3)
# 开始运行调度器
scheduler.start()
#输出
Fri Sep 8 20:41:55 2017: 执行任务
Fri Sep 8 20:41:58 2017: 执行任务
...
任务操作
添加任务add_job
如果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启都会创建任务的副本
开启后任务不会马上启动,可修改trigger参数
删除任务remove_job
# 根据任务实例删除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# 根据任务id删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
任务的暂停pause_job和继续resume_job
job = scheduler.add_job(myfunc, 'interval', minutes=2)
# 根据任务实例
job.pause()
job.resume()
# 根据任务id暂停
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
任务的修饰modify和重设reschedule_job
修饰:job.modify(max_instances=6, name='Alternate name')
重设:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
调度器操作
- 开启 scheduler.start()
- 关闭 scheduler.shutdown()
- 暂停 scheduler.pause()
- 继续 scheduler.resume()