watchdog源码分析

2019-09-21  本文已影响0人  落羽归尘

简介

python中有一个监控文件变化的库,watchdog。包括添加删除文件或目录、修改文件内容、重命名文件或目录等,每种都是一种事件,可自定义方法,用于当事件来临时的动作。

简单用法

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# 自定义处理类
class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        print("文件被修改了 %s"%event.src_path)


if __name__ == "__main__":
    path = "."
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

如上示例,当当前文件下有文件改变时,会触发on_modified方法。
下面我们对背后的原理进行分析

watchdog 流程分析

对于以下分析,是基于Windows操作系统的!
对于上述例子,我们自定义处理类是继承FileSystemEventHandler的,下面是FileSystemEventHandler的源码:

class FileSystemEventHandler(object):
    """
    Base file system event handler that you can override methods from.
    """

    def dispatch(self, event):
        """Dispatches events to the appropriate methods.

        :param event:
            The event object representing the file system event.
        :type event:
            :class:`FileSystemEvent`
        """
        self.on_any_event(event)
        _method_map = {
            EVENT_TYPE_MODIFIED: self.on_modified,
            EVENT_TYPE_MOVED: self.on_moved,
            EVENT_TYPE_CREATED: self.on_created,
            EVENT_TYPE_DELETED: self.on_deleted,
        }
        event_type = event.event_type
        _method_map[event_type](event)

    def on_any_event(self, event):
        """Catch-all event handler.

        :param event:
            The event object representing the file system event.
        :type event:
            :class:`FileSystemEvent`
        """

    def on_moved(self, event):
        """Called when a file or a directory is moved or renamed.

        :param event:
            Event representing file/directory movement.
        :type event:
            :class:`DirMovedEvent` or :class:`FileMovedEvent`
        """

    def on_created(self, event):
        """Called when a file or directory is created.

        :param event:
            Event representing file/directory creation.
        :type event:
            :class:`DirCreatedEvent` or :class:`FileCreatedEvent`
        """

    def on_deleted(self, event):
        """Called when a file or directory is deleted.

        :param event:
            Event representing file/directory deletion.
        :type event:
            :class:`DirDeletedEvent` or :class:`FileDeletedEvent`
        """

    def on_modified(self, event):
        """Called when a file or directory is modified.

        :param event:
            Event representing file/directory modification.
        :type event:
            :class:`DirModifiedEvent` or :class:`FileModifiedEvent`
        """

在例子中实例化定义MyHandler类后,再实例化observer = Observer(),在Windows下 Observer()即是WindowsApiObserver,
WindowsApiObserver类源码如下:

class WindowsApiObserver(BaseObserver):
    """
    Observer thread that schedules watching directories and dispatches
    calls to event handlers.
    """

    def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
        BaseObserver.__init__(self, emitter_class=WindowsApiEmitter,
                              timeout=timeout)

关于BaseObserver,由于篇幅限制,这里只贴出关键代码:

class BaseThread(threading.Thread):
    """ Convenience class for creating stoppable threads. """

    def __init__(self):
        threading.Thread.__init__(self)
        if has_attribute(self, 'daemon'):
            self.daemon = True
        else:
            self.setDaemon(True)
        self._stopped_event = Event()

        if not has_attribute(self._stopped_event, 'is_set'):
            self._stopped_event.is_set = self._stopped_event.isSet

    def on_thread_start(self):
        pass

    def start(self):
        self.on_thread_start()
        threading.Thread.start(self)

class EventDispatcher(BaseThread):
    def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
        BaseThread.__init__(self)
        self._event_queue = EventQueue()
        self._timeout = timeout

    @property
    def timeout(self):
        """Event queue block timeout."""
        return self._timeout

    @property
    def event_queue(self):
        """The event queue which is populated with file system events
        by emitters and from which events are dispatched by a dispatcher
        thread."""
        return self._event_queue

    def dispatch_events(self, event_queue, timeout):
        pass

    def run(self):
        while self.should_keep_running():
            try:
                self.dispatch_events(self.event_queue, self.timeout)
            except queue.Empty:
                continue

class BaseObserver(EventDispatcher):
    """Base observer."""

    def __init__(self, emitter_class, timeout=DEFAULT_OBSERVER_TIMEOUT):
        EventDispatcher.__init__(self, timeout)
        self._emitter_class = emitter_class
        self._lock = threading.RLock()
        self._watches = set()
        self._handlers = dict()
        self._emitters = set()
        self._emitter_for_watch = dict()

    def _add_emitter(self, emitter):
        self._emitter_for_watch[emitter.watch] = emitter
        self._emitters.add(emitter)

    def _add_handler_for_watch(self, event_handler, watch):
        if watch not in self._handlers:
            self._handlers[watch] = set()
        self._handlers[watch].add(event_handler)

    @property
    def emitters(self):
        """Returns event emitter created by this observer."""
        return self._emitters

    def start(self):
        for emitter in self._emitters.copy():
            try:
                emitter.start()
            except Exception:
                self._remove_emitter(emitter)
                raise
        super(BaseObserver, self).start()

    def schedule(self, event_handler, path, recursive=False):
        with self._lock:
            watch = ObservedWatch(path, recursive)
            self._add_handler_for_watch(event_handler, watch)

            # If we don't have an emitter for this watch already, create it.
            if self._emitter_for_watch.get(watch) is None:
                emitter = self._emitter_class(event_queue=self.event_queue,
                                              watch=watch,
                                              timeout=self.timeout)
                self._add_emitter(emitter)
                if self.is_alive():
                    emitter.start()
            self._watches.add(watch)
        return watch

    def add_handler_for_watch(self, event_handler, watch):
        with self._lock:
            self._add_handler_for_watch(event_handler, watch)

    def dispatch_events(self, event_queue, timeout):
        event, watch = event_queue.get(block=True, timeout=timeout)

        with self._lock:
            # To allow unschedule/stop and safe removal of event handlers
            # within event handlers itself, check if the handler is still
            # registered after every dispatch.
            for handler in list(self._handlers.get(watch, [])):
                if handler in self._handlers.get(watch, []):
                    handler.dispatch(event)
        event_queue.task_done()
  1. 这里继承关系WindowsApiObserver-BaseObserver-EventDispatcher-BaseThread最终是一个线程类
  2. 实例化WindowsApiObserver时,执行各个对象的init方法,包括BaseThread的init方法,启动线程,调用EventDispatcherrun方法,循环执行self.dispatch_events(self.event_queue, self.timeout),而这个方法如源码所示,从队列中取出事件,handler.dispatch(event),调用我们自定义的handler对象的分发任务的方法。这个过程在程序运行中都是循环进行的,也就是当有事件时,会进行事件的分发处理。
  3. 然后例子中observer.schedule(event_handler, path, recursive=True),调用schedule方法,会emitter.start(),而这个emitter就是上面我们说的emitter_class=WindowsApiEmitter,会另起一个线程,循环执行queue_events方法,而这个方法就是传入一个队列,也就是上面说的事件队列,监控指定目录文件,当有事件发生时,就把事件放到队列。
  4. 放到队列后,也就可以在第二步中进行事件分发了。

总结

上一篇下一篇

猜你喜欢

热点阅读