vnpy源码阅读-3.EventEngine

2021-11-29  本文已影响0人  Leernh
from collections import defaultdict  # 字典默认类型
from queue import Empty, Queue  # 队列
from threading import Thread  # 多线程
from time import sleep
from typing import Any, Callable, List  # 类型提示

先看第一个导入的模块,defaultdict类初始化时传入一个类型list, str, int, dict,当字典中找不到对应的key时返回传入的类型,看示例

d = defaultdict(dict)
d[1] = 'one'

print(d[1])
print(d[2])
# return
one
{}

导入的包了解了,接着看代码吧

Evnet

class Event:

    def __init__(self, type: str, data: Any = None):
        self.type: str = type
        self.data: Any = data

定义一个Evnet的类,有type和data参数

init

def __init__(self, interval: int = 1):
        """
        默认情况下,计时器事件每1秒生成一次
        """
        self._interval = interval  # 时间间隔
        self._queue = Queue()  # FIFO队列,先进先出
        self._active = False    # 开关
        self._thread = Thread(target=self._run)  # run方法线程
        self._timer = Thread(target=self._run_timer)  #run_timer方法线程
        self._handlers = defaultdict(list)  # 处理列表 {'type': []}
        self._general_handlers = []  # 通用处理列表

初始化的参数中,interval和timer用不到可以不管,其他的看注释能明白作用

_run

    def _run(self) -> None:
        """
        从队列中获取事件,然后对其进行处理。
        Get event from queue and then process it.
        """
        while self._active:
            try:
                event = self._queue.get(block=True, timeout=1)  # 从队列取出事件进行处理,先进先出
                self._process(event)
            except Empty:
                pass

EventEngine启动后,会不停从队列_queue中get,当_queue不为空时就把队列中等待的任务取出交给_process方法去处理

_process

    def _process(self, event: Event) -> None:
        """
        First ditribute event to those handlers registered listening
        to this type.

        Then distrubute event to those general handlers which listens
        to all types.
        """
        if event.type in self._handlers:
            [handler(event) for handler in self._handlers[event.type]]

        if self._general_handlers:
            [handler(event) for handler in self._general_handlers]

根据event.type分配到不同的方法去处理

_run_timer

    def _run_timer(self) -> None:
        """
        按间隔(秒)睡眠,然后生成计时器事件。
        Sleep by interval second(s) and then generate a timer event.
        """
        while self._active:
            sleep(self._interval)
            event = Event(EVENT_TIMER)
            self.put(event)

可以暂时不用管,用不上

register

    def register(self, type: str, handler: HandlerType) -> None:  # handler是一个方法
        """
        为特定事件类型注册新的处理程序函数。每一个事件类型只能注册一次函数。 
        Register a new handler function for a specific event type. Every
        function can only be registered once for each event type.
        """
        handler_list = self._handlers[type]
        if handler not in handler_list:
            handler_list.append(handler)

参数里HandlerType的定义,callable是一个回调函数,可以看成func([event], None)

HandlerType = Callable[[Event], None]

当我们知道handler是一个方法,再看下订阅tick示例,就清晰多了。当我们注册一个关键字'tick',self._handlers中没有’tick'时,则返回一个[],接着就会把on_tick方法注册进去,此时self._handlers = {'tick': [on_tick]}。
下面代码中event_engine.put(event)后,event被添加到_queue,并在_run方法被get到_process方法,最后找到self._handlers['tick']对应的回调函数,到此事件结束。

def on_tick(event):
    # 处理tick数据
     pass

if __name__ == '__main__':
    event_engine = EventEngine()
    event_engine.start()
    event_engine.register('tick', on_tick)

    tick_data = get_tick(symbol)  # {'open': 1, 'close':0.8}
    event = Event(type='tick', data=tick_data)
    event_engine.put(event) 

register_general

    def register_general(self, handler: HandlerType) -> None:
        """
        Register a new handler function for all event types. Every
        function can only be registered once for each event type.
        """
        if handler not in self._general_handlers:
            self._general_handlers.append(handler)

用法和register差不多,只是少了type,暂时没找到用处,一般都是用的register

代码中还有start、put、unregister等方法,因为阅读难度不大就不拿出来了。

总结

EvnetEngine的用法大致就是:
接收到事件(如订阅、点击、接收数据)-->推送到事件引擎-->引擎从队列中取出事件交给_process分配-->_process根据register注册的对应类型分配处理方法

遗留问题

1.callable 回调函数的用法
2.register什么时候进行注册?
3.register_general要怎么用?
4.queue源码阅读
5.thread源码阅读

上一篇下一篇

猜你喜欢

热点阅读