专家经验

Python定时任务之APScheduler源码分析(一)

2020-10-27  本文已影响0人  guoweikuang

前言

前面有一篇文章简单介绍了Python的一些任务调度库,并描述了 APScheduler 的工作原理及架构,这里再回顾一下 APScheduler 的架构, 这次主要以 v2.1.0 版本为分析目标,因为 2.x 版本与 3.x 版本之间加了很多功能,如异步任务,代码也重构了不少,但是基本功能及概念没有变化

APScheduler 的架构

1、APScheduler 基本概念

APScheduler 由四个组件构成(注:该部分翻译至官方文档):

2、APScheduler 架构图

apscheduler架构图

APScheduler 代码结构

主要代码逻辑都是由下面的文件组成,可以看出代码结构也根据功能不同而归档, 例如作业存储模块及触发器模块都单独作为一个包管理,后面扩展其它作业存储或触发器也很容易管理,不会出现代码结构混乱,简单而言,代码层次分明

├── apscheduler
│   ├── __init__.py
│   ├── events.py
│   ├── job.py
│   ├── jobstores
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── mongodb_store.py
│   │   ├── ram_store.py
│   │   ├── redis_store.py
│   │   ├── shelve_store.py
│   │   └── sqlalchemy_store.py
│   ├── scheduler.py
│   ├── threadpool.py
│   ├── triggers
│   │   ├── __init__.py
│   │   ├── cron
│   │   │   ├── __init__.py
│   │   │   ├── expressions.py
│   │   │   └── fields.py
│   │   ├── interval.py
│   │   └── simple.py
│   └── util.py

官方示例

阅读源码,我一般第一步先阅读该库的官方文档,先了解这个功能是什么用途,怎么使用,然后查看相关的单元测试代码,这样更有助于理解里面某个模块或者某个类的用途及效果,最后以官方示例入手,一步一步深入到源码内部了解。那现在假设你已经阅读过官方文档,知道这个库的用途了,开始探索之旅吧!

"""
设置一个每隔 3 秒就运行一次的任务
"""
from datetime import datetime

from apscheduler.scheduler import Scheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = Scheduler(standalone=True)
    scheduler.add_interval_job(tick, seconds=3)
    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

从示例代码看,第一步要实例化一下 Scheduler 类,这个类根据名称就可以猜出它的功能了,它对应着架构图里的调度器模块,后续添加任务调度、启动运行任务、触发任务执行,都要通过它来操作。仔细查看 Scheduler 实例化时还传了个 standalone 参数, 这个参数做什么用呢?接下来进入 Scheduler 类里面,查看它实例化时做了什么操作,看看 standalone 是什么作用

Scheduler 类

可以看出 __init__ 初始化时, 实例化了事件类(Event),并获取了一些锁,主要是 configure 函数进行任务开始前的配置,

可以看出实例化传的 standalone 参数是装载在 options 传递给 configure 进行配置使用

class Scheduler(object):
    """
    This class is responsible for scheduling jobs and triggering
    their execution.
    """

    _stopped = False
    _thread = None

    def __init__(self, gconfig={}, **options):
        self._wakeup = Event()
        self._jobstores = {}
        self._jobstores_lock = Lock()
        self._listeners = []
        self._listeners_lock = Lock()
        self._pending_jobs = []
        self.configure(gconfig, **options)

接下来看看 configure 做了什么工作, 可以看出,它的作用就是根据配置文件给调度器配置各种参数,值得注意的是,下面几个配置参数及其默认值, 参数配置的作用描述如下:

参数名称 默认值 解释
misfire_grace_time 1(s) 在允许作业执行被延迟之前的最长时间(以秒为单位),比如这个值设置了 30s, 一个任务设置了 10:00:00运行,但10:00:00 由于一些原因没有执行,错过了运行时间,但在 10:00:20 时调度器检查这个任务还在这个设置误差时间内,就可以继续执行,具体见解释
coalesce True 将几个待执行的作业合并为一个,比如一个任务由于某个原因堆积了10次没有执行,该值为 True,只执行最后一次任务, 为 False 时,则测试执行 10次相同的任务,具体见解释
daemonic True 控制调度器线程是否为守护程序。当standalone为True时,此选项无效。如果设置为 False, 当程序即将完成时必须显式关闭调度器, 否则调度器会阻止程序终止,设置为 True, 调度器自动随主程序的结束一起结束,但可能会在退出时引发异常
standalone False 设置为 True, start 函数会运行主循环,它将直接在调用线程中运行,并且将阻塞直到没有其他pending 状态的作业。 设置为False, 当调用start() 时, 将会生成自己的线程, 具体见解释
    def configure(self, gconfig={}, **options):
        """
        Reconfigures the scheduler with the given options. Can only be done
        when the scheduler isn't running.
        """
        # 判断当前调度器是否运行状态,运行状态直接报错,因为这是配置调度器的全局配置,不允许运行期间动态修改配置
        if self.running:
            raise SchedulerAlreadyRunningError

        # Set general options(设置通用选项)
        config = combine_opts(gconfig, 'apscheduler.', options)
        # 这几个参数都是存在默认值的,下面表格会详细解释参数作用
        self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
        self.coalesce = asbool(config.pop('coalesce', True))
        self.daemonic = asbool(config.pop('daemonic', True))
        self.standalone = asbool(config.pop('standalone', False))

        # Configure the thread pool(配置线程池, 这个线程池也就是queue 加锁实现的)
        if 'threadpool' in config:
            self._threadpool = maybe_ref(config['threadpool'])
        else:
            threadpool_opts = combine_opts(config, 'threadpool.')
            self._threadpool = ThreadPool(**threadpool_opts)

        # Configure job stores(配置作业存储)
        jobstore_opts = combine_opts(config, 'jobstore.')
        jobstores = {}
        for key, value in jobstore_opts.items():
            store_name, option = key.split('.', 1)
            opts_dict = jobstores.setdefault(store_name, {})
            opts_dict[option] = value

        for alias, opts in jobstores.items():
            classname = opts.pop('class')
            cls = maybe_ref(classname)
            jobstore = cls(**opts)
            self.add_jobstore(jobstore, alias, True)

configure 函数的作用已经很明显了,

添加作业到调度器

从刚才的官方示例中,实例化完成调度器后,紧跟着就是使用 add_interval_job 为调度器添加一个任务作业,不同的任务使用不同的函数添加,比如这里是为了定时执行作业,使用了 add_interval_job 并设置了 3秒执行一次的策略。如果是指定某个日期执行的作业,那就使用 add_date_job 添加到调度器中。现在进入 add_interval_job ,看看这个函数做了什么

    def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
                         seconds=0, start_date=None, args=None, kwargs=None,
                         **options):
        """
                相关注释
        """
        # 使用Python datetime.timedelta 函数包装相关的参数
        interval = timedelta(weeks=weeks, days=days, hours=hours,
                             minutes=minutes, seconds=seconds)
        # 实例化一个触发器类,根据任务的不同,实例化不同触发器。然后通过 add_job 添加作业
        trigger = IntervalTrigger(interval, start_date)
        return self.add_job(trigger, func, args, kwargs, **options)

终于看到架构图中触发器模块的相关逻辑,在没看代码之前,大致可以猜到这个触发器只要负责根据用户设置的时间规则来计算出下次触发作业执行的时间。比如官方示例中 add_interval_job(tick, seconds=3), 用户设置了 seconds 为 3秒,这个触发器就负责根据当前时间计算出下次作业执行的时间,接下来看看触发器的初始化函数做了什么操作

IntervalTrigger
class IntervalTrigger(object):
    def __init__(self, interval, start_date=None):
        # interval 为上面 datetime.timedelta 对象,start_date 根据上面默认为 None
        if not isinstance(interval, timedelta):
            raise TypeError('interval must be a timedelta')
        # 如果有 start_date,则先转换成 datetime 类型
        if start_date:
            # convert_to_datetime 可以接收 datetime、date、字符串格式三种类型并转换
            # 为 datetime类型,这个通用方法使用正则表达式对字符串格式进行匹配,有点用
            start_date = convert_to_datetime(start_date)

        self.interval = interval
        # 将 timedelta 转换成秒数, 这里按照官方示例就是 3s 了
        self.interval_length = timedelta_seconds(self.interval)
        # 如果没有设置任何的时间,那么默认就是 1秒执行一次作业
        if self.interval_length == 0:
            self.interval = timedelta(seconds=1)
            self.interval_length = 1
                
        # 这里设置了作业的起始执行时间,根据当前时间 + 用户设置的时间点(如上面设置了3秒)后执行
        if start_date is None:
            self.start_date = datetime.now() + self.interval
        else:
            self.start_date = convert_to_datetime(start_date)

接下来回到添加 Job 作业的地方(self.add_job),这个将触发器实例化的对象及相关要执行的任务的函数及参数传入,在里面

实例化了 Job 类,并根据 Scheduler 的运行情况 将 job 实例添加到 Scheduler 类的 _pending_jobs 等待队列中或者直接添加到 jobstore 中

    def add_job(self, trigger, func, args, kwargs, jobstore='default',
                **options):
        # 这时候 misfire_grace_time、coalesce 两次参数才会有用途
        job = Job(trigger, func, args or [], kwargs or {},
                  options.pop('misfire_grace_time', self.misfire_grace_time),
                  options.pop('coalesce', self.coalesce), **options)
        # 如果 Scheduler 还没有运行,直接添加到 self._pending_jobs 队列,因为这时候 jobstore 还没实例化
        # 也就无法直接添加到 jobstore 中
        if not self.running:
            self._pending_jobs.append((job, jobstore))
            logger.info('Adding job tentatively -- it will be properly '
                        'scheduled when the scheduler starts')
        # 添加 job 到 jobstore 中
        else:
            self._real_add_job(job, jobstore, True)
        return job

这里做个小总结,可以看出 Scheduler 联系起所有的模块,包括触发器模块的实例化、作业存储的相关配置、Job类的实例化,所有子模块的初始化都委托给 Scheduler 执行(这才对得起这个命名吧),并且都是通过 add_interval_job、add_job 这种简而易懂的方式来将任务的所有环节串联起来,值得思考并应用。

接着进入 _real_add_job 看看做了什么操作

    def _real_add_job(self, job, jobstore, wakeup):
        # 计算下一次运行时间,实际上调用了 Trigger(触发器提供的计算下一次时间)的get_next_fire_time 计算
        job.compute_next_run_time(datetime.now())
        if not job.next_run_time:
            raise ValueError('Not adding job since it would never be run')

        self._jobstores_lock.acquire()
        try:
            try:
                store = self._jobstores[jobstore]
            except KeyError:
                raise KeyError('No such job store: %s' % jobstore)
            # 把执行任务添加到存储中
            store.add_job(job)
        finally:
            self._jobstores_lock.release()

        # Notify listeners that a new job has been added,(新增任务事件发送)
        event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
        self._notify_listeners(event)

        logger.info('Added job "%s" to job store "%s"', job, jobstore)

        # Notify the scheduler about the new job, 这个很关键,用Python的事件机制来唤醒scheduler(下面会详细解释)
        if wakeup:
            self._wakeup.set()
            
class IntervalTrigger(object):
    def __init__(self, interval, start_date=None):
                """省略,具体见上面"""

    def get_next_fire_time(self, start_date):
        # start_date 当前时间, self.start_date 任务启动的时间点(初始化IntervalTrigger类的时间)
        if start_date < self.start_date:
            return self.start_date

        timediff_seconds = timedelta_seconds(start_date - self.start_date)
        next_interval_num = int(ceil(timediff_seconds / self.interval_length))
        return self.start_date + self.interval * next_interval_num        

start 函数

前面分析了这么多,其实我们只跑了下面这两行代码的相关逻辑,也就是通过 Scheduler 来构建作业,并设置全局配置包括作业存储的配置、触发器相关信息、最小的执行单元 Job 等操作, 这时候任务还没有真正执行起来,想要执行作业任务,还得运行 Scheduler 的 start 来启用调度器

    scheduler = Scheduler(standalone=True)
    scheduler.add_interval_job(tick, seconds=3)

下面我们就深入 start 函数,了解这个函数是如何开始任务的调度的

    def start(self):
        """
        (在一个新的线程中开启一个调度器)
        Starts the scheduler in a new thread.
                线程模式, 在 scheduler 线程启动后立即返回
        In threaded mode (the default), this method will return immediately
        after starting the scheduler thread.
        标准模式, 这个函数会阻塞直到没有需要调度的作业
        In standalone mode, this method will block until there are no more
        scheduled jobs.
        """
        if self.running:
            raise SchedulerAlreadyRunningError

        # Create a RAMJobStore as the default if there is no default job store
        # 这个地方在没有配置任何的作用存储情况下,默认使用的内存存储
        if not 'default' in self._jobstores:
            self.add_jobstore(RAMJobStore(), 'default', True)

        # Schedule all pending jobs
        # 将所有的作业添加到作业存储中
        for job, jobstore in self._pending_jobs:
            # 上面已经解释过这个函数
            self._real_add_job(job, jobstore, False)
        del self._pending_jobs[:]

        self._stopped = False
        if self.standalone:
            self._main_loop()
        else:
            self._thread = Thread(target=self._main_loop, name='APScheduler')
            self._thread.setDaemon(self.daemonic)
            self._thread.start()

从上面代码来来,start 函数根据 standalone 的配置不同启用不同的模式来运行。

if __name__ == '__main__':
    # 设置为线程模式
    scheduler = Scheduler(standalone=False)
    scheduler.add_interval_job(tick, seconds=3)
    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    
    # 加这一步让主程序(也就是所谓的非守护线程一直运行中)
    import time
    while True:
      time.sleep(1)
# 运行输出结果:主程序运行完成后直接退出
Press Ctrl+C to exit
主循环 main_loop
    def _main_loop(self):
        """Executes jobs on schedule."""

        logger.info('Scheduler started')
        # 事件通知 Scheduler 启用
        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
                # 清理 threading.Event 的设置
        self._wakeup.clear()
        while not self._stopped:
            logger.debug('Looking for jobs to run')
            now = datetime.now()
            # 获取下一次醒来的时间
            next_wakeup_time = self._process_jobs(now)

            # Sleep until the next job is scheduled to be run,
            # a new job is added or the scheduler is stopped
            if next_wakeup_time is not None:
                # 计算等待时间,时间不到就一直阻塞着
                wait_seconds = time_difference(next_wakeup_time, now)
                logger.debug('Next wakeup is due at %s (in %f seconds)',
                             next_wakeup_time, wait_seconds)
                # 通过 threading.Event 的 wait 设置线程等待 wait_seconds 长时间
                self._wakeup.wait(wait_seconds)
                # 将标志设置为 False
                self._wakeup.clear()
            elif self.standalone:
                logger.debug('No jobs left; shutting down scheduler')
                self.shutdown()
                break
            else:
                logger.debug('No jobs; waiting until a job is added')
                self._wakeup.wait()
                self._wakeup.clear()

        logger.info('Scheduler has been shut down')
        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))

主线程本质上是一个死循环,不断获取作业任务,并获取作业的下一次执行时间,然后使用 Python threading.Event 模块让线程阻塞一段时间(一次循环结束之前会计算任务下次执行事件与当前时间之差),这样就不用在死循环中不断从 jobstore 存储中取出任务,然后计算执行时间,这样会浪费 Scheduler 的资源,也加重了 jobstore 取作业的负担。

现在来回顾一下Python threading Event 模块的官方描述:

0OX1IA.png

未完待续

下一篇接着分析 Event 的具体使用及后续代码

上一篇下一篇

猜你喜欢

热点阅读