vnpy源码阅读-3.EventEngine
from collections import defaultdict # 字典默认类型
from queue import Empty, Queue # 队列
from threading import Thread # 多线程
from time import sleep
from typing import Any, Callable, List # 类型提示
先看第一个导入的模块,defaultdict类初始化时传入一个类型list, str, int, dict,当字典中找不到对应的key时返回传入的类型,看示例
d = defaultdict(dict)
d[1] = 'one'
print(d[1])
print(d[2])
# return
one
{}
导入的包了解了,接着看代码吧
Evnet
class Event:
def __init__(self, type: str, data: Any = None):
self.type: str = type
self.data: Any = data
定义一个Evnet的类,有type和data参数
init
def __init__(self, interval: int = 1):
"""
默认情况下,计时器事件每1秒生成一次
"""
self._interval = interval # 时间间隔
self._queue = Queue() # FIFO队列,先进先出
self._active = False # 开关
self._thread = Thread(target=self._run) # run方法线程
self._timer = Thread(target=self._run_timer) #run_timer方法线程
self._handlers = defaultdict(list) # 处理列表 {'type': []}
self._general_handlers = [] # 通用处理列表
初始化的参数中,interval和timer用不到可以不管,其他的看注释能明白作用
_run
def _run(self) -> None:
"""
从队列中获取事件,然后对其进行处理。
Get event from queue and then process it.
"""
while self._active:
try:
event = self._queue.get(block=True, timeout=1) # 从队列取出事件进行处理,先进先出
self._process(event)
except Empty:
pass
EventEngine启动后,会不停从队列_queue中get,当_queue不为空时就把队列中等待的任务取出交给_process方法去处理
_process
def _process(self, event: Event) -> None:
"""
First ditribute event to those handlers registered listening
to this type.
Then distrubute event to those general handlers which listens
to all types.
"""
if event.type in self._handlers:
[handler(event) for handler in self._handlers[event.type]]
if self._general_handlers:
[handler(event) for handler in self._general_handlers]
根据event.type分配到不同的方法去处理
_run_timer
def _run_timer(self) -> None:
"""
按间隔(秒)睡眠,然后生成计时器事件。
Sleep by interval second(s) and then generate a timer event.
"""
while self._active:
sleep(self._interval)
event = Event(EVENT_TIMER)
self.put(event)
可以暂时不用管,用不上
register
def register(self, type: str, handler: HandlerType) -> None: # handler是一个方法
"""
为特定事件类型注册新的处理程序函数。每一个事件类型只能注册一次函数。
Register a new handler function for a specific event type. Every
function can only be registered once for each event type.
"""
handler_list = self._handlers[type]
if handler not in handler_list:
handler_list.append(handler)
参数里HandlerType的定义,callable是一个回调函数,可以看成func([event], None)
HandlerType = Callable[[Event], None]
当我们知道handler是一个方法,再看下订阅tick示例,就清晰多了。当我们注册一个关键字'tick',self._handlers中没有’tick'时,则返回一个[],接着就会把on_tick方法注册进去,此时self._handlers = {'tick': [on_tick]}。
下面代码中event_engine.put(event)后,event被添加到_queue,并在_run方法被get到_process方法,最后找到self._handlers['tick']对应的回调函数,到此事件结束。
def on_tick(event):
# 处理tick数据
pass
if __name__ == '__main__':
event_engine = EventEngine()
event_engine.start()
event_engine.register('tick', on_tick)
tick_data = get_tick(symbol) # {'open': 1, 'close':0.8}
event = Event(type='tick', data=tick_data)
event_engine.put(event)
register_general
def register_general(self, handler: HandlerType) -> None:
"""
Register a new handler function for all event types. Every
function can only be registered once for each event type.
"""
if handler not in self._general_handlers:
self._general_handlers.append(handler)
用法和register差不多,只是少了type,暂时没找到用处,一般都是用的register
代码中还有start、put、unregister等方法,因为阅读难度不大就不拿出来了。
总结
EvnetEngine的用法大致就是:
接收到事件(如订阅、点击、接收数据)-->推送到事件引擎-->引擎从队列中取出事件交给_process分配-->_process根据register注册的对应类型分配处理方法
遗留问题
1.callable 回调函数的用法
2.register什么时候进行注册?
3.register_general要怎么用?
4.queue源码阅读
5.thread源码阅读