第十四章 基于事件驱动的交易引擎实现
“基于事件驱动的交易引擎实现”是基于一本量化交易书: “Successful Algorithmic Trading”第十四章的翻译。互联网上可以找到这本书的英文版本。
这一章基本上把一个交易系统的结构讲清楚了,基础类层次结构,例子代码和解析都有。看完本章,基本就可以搭建自己的回测系统。想要赚钱当然还有很多工作要做,这只是一个框架而已。
初次翻译英文书籍,见笑了。
分割线
本章提供了一个完整实现的基于事件驱动的回测系统,该系统是用Python开发。本章写的很深入很详细,特别是那些其他算法交易文章省掉的地方。下面的代码将让你能在股票,外汇和期货市场模拟使用不同的高频策略,比如预测,动量和均值回归等。
但是深入详细的介绍也带来复杂性。本章提供的回测系统需要许多组件,每个组件都是综合性的实体。我们将首先介绍事件驱动软件是什么,接下来描述回测组件和整个系统如何搭建。
14.1 事件驱动软件
在我们进行回测系统开发之前,我们需要了解事件驱动系统的概念。视频游戏是一个事件驱动软件很自然的例子。视频游戏有多个组件,在高帧率的实时设置中,这些组件会交互。这是被一个无限的“事件循环”或者“游戏循环”的计算集合处理。
游戏循环的每个Tick,函数被调用用于处理最近接收到的事件,这些事件一般是由游戏中之前的动作产生的。随着这些事件特性不同,事件可能带有按键或者鼠标点击,接下来一些动作会被触发,可能终止循环或者生成一些其他的事件。这个过程将一直持续。
下面是一些伪代码:
while True: # Run the loop forever
new_event = get_new_event() # Get the latest event
# Based on the event type, perform an action
if new_event.type == "LEFT_MOUSE_CLICK":
open_menu()
elif new_event.type == "ESCAPE_KEY_PRESS":
quit_game()
elif new_event.type == "UP_KEY_PRESS":
move_player_north()
# ... and many more events
redraw_screen() # Update the screen to provide animation
tick(50) # Wait 50 milliseconds
代码一直持续的检查新事件,并且基于这些事件完成一些动作。特别地,这将允许实时响应处理,因为代码一直连续不断在循环检查事件。这也是我们要在高频交易模拟中完成的。
14.1.1 为什么是事件驱动的回测?
相比向量化方法,事件驱动系统带来许多优势:
- 代码重用 - 基于事件驱动的回测系统通过设计可以同时用于回测和实盘,两者只需要很小的切换代价。而向量化的回测系统在执行统计分析时需要所有数据可用。
- 前向偏差 - 基于事件驱动的回测系统没有前向偏差,因为市场数据被当作事件来处理。但把市场数据“喂给”事件驱动的回测系统也是可能的,像订单管理和投资组合系统那样。
- 现实主义 - 基于事件驱动的回测系统允许很大的自由度,比如订单执行和交易成本定制。处理基本的市价单和限价单,开市单, 收市单等都是很简单的,因为可以自定义交易所处理。
虽然基于事件的系统有许多优点,但它与简单的向量化系统相比,也有两个缺点。第一个缺点是基于事件的系统显然更复杂,难于实现和测试。有很多“移动部分”容易导致bug。为了减轻测试的压力,测试驱动开发应该被引入。第二个问题是比向量化系统执行更慢。执行数学计算时,理想的向量化操作无法利用起来。
14.2 组件对象
为了在回测系统中应用事件处理方法,我们需要订下如下一些组件用于处理特定的任务:
- Event 事件 - 事件驱动系统中最基本的单元,它包含一个类型比如 “MARKET”, “SIGNAL”, “ORDER”或者 “Fill”等,这个类型决定了在事件循环中如何被处理。
- Event Queue 事件队列 - 事件队列存储所有事件对象,这些事件对象由软件产生,事件队列是内存型的Python队列对象。
- Data Handler 数据处理器 - 数据处理器是一个抽象基类,抽象基类定义的接口可以同时处理历史数据和实盘数据。这将提供显而易见的便利性,策略和投资组合模块可以被重用。数据处理器在系统每次心跳时生成新的“MarketEvent”。
- Strategy 策略 - 策略也是一个抽象基类,它接收市场数据并生成相应的信号事件“SignalEvent”,这个事件最终被Portfolio对象处理。“SignalEvent”包含一个交易目标,一个方向(Long or Short)以及时间戳。
- Portfolio 投资组合 - 这个类负责策略当前和接下来的仓位相关的订单管理。它也会处理跨组合的风险管理,包括分散投资和仓位管理。在更复杂的实现中,这些将会委托给风险管理类。Portfolio类处理来自队列的SignalEvent事件,并生成订单事件OrderEvent,并增加到队列中。
- Execution Handler 执行处理器 - 执行处理器模拟连接到交易所。主要负责处理来自队列的订单事件OrderEvent,通过模拟方法或者真实连接到交易所去执行。一旦订单被执行,执行处理器将生成“FillEvent”,这个事件描述交易执行,比如手续费,佣金以及滑点等。
- Backtest 回测 - 所有这些组件被包装在一个事件循环中,它将正确地处理各种事件,转发各种事件到适当的组件中去处理。
不管组件的数量,这是一个交易引擎的基本模型。这里存在很大的扩展空间,特别是Portfolio如何处理。另外不同的交易费模型也可以在它们的类层级进行抽象。
14.2.1 Event 事件
第一个组件是Event,在这个框架中总共有4种类型的Event通过Event队列进行通信。4种类型的Event分别是MarketEvent, SignalEvent, OrderEvent和FillEvent。
Event
Event是基类,不提供任何功能或特定接口。因为Event对象实现可能会很复杂,因此创建一个面向未来的类层级。
#!/usr/bin/python
# -*- coding: utf-8 -*-
# event.py
from __future__ import print_function
class Event(object):
pass
MarketEvent
回测系统外部的循环开始一次心跳时会触发一个MarketEvent事件。当DataHandler对象收到某个被跟踪的品种市场数据更新时,事件将会产生。它将触发策略对象,并产生交易信号。这个事件对象简单的包含一个事件标识,没有其他的结构。
# event.py
class MarketEvent(event):
def __init__(self):
self.type = 'MARKET'
SignalEvent
策略对象利用市场数据创建一个新的SignalEvent。这个SignalEvent包含一个策略ID,一个品种标识,一个时间戳,一个方向(做多或者做空)和一个“强度”指示器(用于均值回归策略)。Portfolio对象使用这个SignalEvent作为辅助决定如何进行交易。
# event.py
class SignalEvent(Event):
def __init__(self, strategy_id, symbol, datetime, signal_type, strength):
self.type = 'SIGNAL'
self.strategy_id = strategy_id
self.symbol = symbol
self.datetime = datetime
self.signal_type = signal_type
self.strength = strength
OrderEvent
当Portfolio对象收到SignalEvent事件时,会根据风险和仓位控制进行评估。最终会生成OrderEvent事件,该事件会被ExecutionHandler对象处理。
OrderEvent稍微比SignalEvent复杂,除了SignalEvent事件的一些字段以外,它还包含了数量字段。Portofolio对象规定了这个数量字段。另外OrderEvent还包括一个print_order( )方法,用于必要时输出订单信息。
# event.py
class OrderEvent(Event):
def __init__(self, symbol, order_type, quantity, direction):
self.type = 'ORDER'
self.symbol = symbol
self.order_type = order_type
self.quantity = quantity
self.direction = direction
def print_order(self):
print("Order: Symbol=%s, Type=%s, Quantity=%s, Direction=%s" % (self.symbol, self.type, self.quantity, self.direction))
FillEvent
当ExecutionHandler收到OrderEvent事件时,它必须交易这个订单。一旦经单被交易了,它将生成一个FillEvent事件,这个事件描述了买卖的成本,比如手续费或者滑点。
FillEvent是所有事件中最复杂的,它包含了订单被执行时的时间戳,交易品种和交易所,交易数量,真正的交易价格以及交易佣金。
佣金根据交易所代理佣金确定。比如US API规定最小的佣金是每订单1.30USD,以及每份额0.013USD当交易数量小于500份额或者每份额0.08USD当交易数量大于500份额。
# event.py
class FillEvent(Event):
def __init__(self, timeindex, symbol, exchange, quantity, direction, fill_cost, commission=None):
self.type = 'FILL'
self.timeindex = timeindex
self.symbol = symbol
self.exchange = exchange
self.quantity = quantity
self.direction = direction
self.fill_cost = fill_cost
if commission is None:
self.commission = self.calculate_ib_commission()
else:
self.commission = commssion
def calculate_ib_commission(self):
full_cost = 1.3
if self.quantity <= 500:
full_cost = max(1.3, 0.013 * self.quantity)
else:
full_cost = max(1.3, 0.008 * self.quantity)
return full_cost
14.2.2 DataHandler
事件驱动交易系统的其中一个目标就是在回测和实盘之间减少重复代码。理想化一点,不管是历史数据测试还是实盘交易,它将利用相同的信号生成方法和投资组合管理组合组件。为了达到这个目标,必须跟Strategy对象和Portfolio对象协作,前者产生SignalEvent事件,后者基于事件产生订单Order。针对历史数据和实盘交易需要利用统一的接口来完成。
基于DataHandler创建一个类层次结构,所有子类都有一个接口用于提供市场数据给其他系统中其他组件来处理,这种方式下DataHandler每个字类都可以被替换但是并不影响策略或者投资组合计算。
子类例子包括HistoricCSVDataHandler, QuantdlDataHandler, SecuritiesMasterDataHandler, InteractiveBrokersMarketFeedDataHandler等等。本章我们只考虑创建historic CSV data handler,这个类将加载日内CSV数据,数据字段包括:Open, Low, High, Close, Volume, OpenInterest。这种数据能在系统的每个心态时bar-by-bar的方式“喂”给Strategy和Portfolio处理,避免前向偏差。
首先要导入必要的Python库,特别是Pandas和abc抽象基类的方法。因为DataHandler对象会生成MarketEvent事件, 上面提到的event.py也是需要引入的。注:abc是Python原生提供的抽象基类的元类,用于辅助生成抽象基类。
#!/usr/bin/python
# -*- coding: utf-8 -*-
# data.py
from __future__ import print_function
from abc import ABCMeta, abstractmethod
import datetime
import os, os.path
import numpy as np
import pandas as pd
from event import MarketEvent
DataHandler也是一个抽象基类,这个类不可能直接被实例化。这个抽象类定义接口,所有DataHandler子类都需要继承以保持兼容性。我们用 metaclass属性让Python知道这是一个抽象类,另外我们用@abastractmethod装饰方法让Python知道这个方法在子类需要被重写。
这个类有6个方法。前2个方法,get_latest_bar和get_latest_bars用于查找最近的历史交易数据,主要在Strategy和Portfolio中使用,随时都需要知道当前市场价格和成交量。
接下来的1个方法 get_latest_bar_datetime,简单返回一个Python时间对象代表Bar的时间戳。
接下来2个方法,get_latest_bar_value和get_latest_bar_values用于查找特定Bar的值或者一系列Bar的值。通常情况下策略只对收盘价感兴趣,我们能直接这两个方法返回一系列收盘价的浮点值,而不是从Bar对象序列中获取。这增加了策略的效率利用所谓的“回看窗口”。
最后1个方法,update_bars,提供了一个“feed”的机制,把Bar信息置入新的数据结构,可以严格限制“前向偏差”。这是事件驱动的回测系统和基于向量的回测系统的关键不同之处。注意:如果试图实例化这个类会报异常。
# data.py
class DataHandler(object):
__metaclass__ = ABCMeta
@abstractmethod
def get_latest_bar(self, symbol):
raise NotImplementedError("Should implement get_latest_bar()")
@abstractmethod
def get_latest_bars(self, symbol, N=1):
raise NotImplementError("Should implement get_latest_bars()")
@abstractmethod
def get_latest_bar_datetime(self, symbol):
raise NotImplementError("Should implement get_latest_bar_datetime()")
@abstractmethod
def get_latest_bar_value(self, symbol, val_type):
raise NotImplementedError("Should implement get_latest_bar_value()")
@abstractmethod
def get_latest_bar_values(self, symbol, val_type, N=1):
raise NotImplementError("Should implement get_latest_bars_values()")
@abstractmethod
def update_bars(self):
raise NotImplementError("Should implement update_bars()")
为了基于历史数据创建回测系统,我们需要考虑如何通过公共数据源导入数据的机制。我们前面讨论了证券数据库的优势,开发一个DataHandler类与数据库做连接将会是一个很好的辅助。
本章中,我们将使用一个更简单的方案:从CSV文件中导入数据。这将让我们集中在创建DataHandler的机制上,而不是热衷于连接数据和使用SQL查询数据。
我们将定义HistoricCSVDataHandler子类,这个子类设计成可以处理多个CSV文件,每个交易品种一个文件,并转换成Pandas DataFrame类型的词典数据,可以被先前定义的方法访问。
DataHandler也需要一些参数,比如Event Queue,CSV文件的绝对路径和一系列的品种名称,下面是类的初始化:
# data.py
class HistoricCSVDataHandler(DataHandler):
def __init__(self, events, csv_dir, symbol_list):
self.events = events
self.csv_dir = csv_dir
self.symbol_list = symbol_list
self.symbol_data = {}
self.latest_symbol_data = {}
self.continue_backtest = True
self.open_convert_csv_files()
这个handler将在csv_dir目录查找文件并打开,文件名称类似“SYMBOL.csv”,SYMBOL代表交易品种比如GOOG和AAPL。数据文件的格式与Yahoo Finance一致,当然处理其他格式比如Quandl或者DTN IQFeed也很容易修改。方法 _open_convert_csv_files处理文件打开。
HistoricCSVDataHandler内部使用Pandas的一个优点是所有品种的索引都会被跟踪处理。这将允许缺失的数据被补上,对于均值回归策略是必要的。注意混合所有品种所有品种时union和reindex方法的使用。
# data.py
def _open_convert_csv_files(self):
comb_index = None
for s in self.symbol_list:
self.sybmol_data[s] = pd.io.parsers.read_csv(os.path.join(self.csv_dir, '%s.csv' % s), names = ['datetime', 'open', 'high', 'low', 'close', 'volume', 'adj_close']).sort()
if comb_index is None:
comb_index = self.symbol_data[s].index
else:
comb_index.union(self.symbol_data[s].index)
self.latest_symbol_data[s] = []
#reindex
for s in self.symbol_list:
self.symbol_data[s] = self.symbol_data[s].reindex(index=comb_index, method='pad').iterrows()
_get_new_bar方法创建了一个生成器,用于提供新的Bar数据。这意味着后续对这个方法的调用将产生一个新的Bar直到对应的该品种的数据到达结尾。
# data.py
def _get_new_bar(self, symbol):
for b in self.symbol_data[symbol]:
yield b
DataHandler的前2个抽象方法get_latest_bar和get_latest_bars,这两个方法简单提供了来自latest_symbol_data结构的一个Bar数据或者最近N个Bar的数据序列。
# data.py
def get_latest_bar(self, symbol):
try:
bar_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
return bars_list[-1]
def get_latest_bars(self, symbol, N=1):
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
return bars_list[-N:]
接下来一个方法是get_latest_bar_datetime, 查询最近一个数据Bar对应的时间。
def get_latest_bar_datetime(self, symbol):
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
return bars_list[-1][0]
接下来2个方法是get_latest_bar_value和get_latest_bar_values。两个方法利用了Python的getattr函数,这个函数用于查询一个对象是否有一个特定的属性。我们能传递字符串比如“open”或者“close”给getattr函数,可以直接从数据Bar获取值。这使得这个方法非常便利,不需要再写其他方法比如get_latest_bar_close。
def get_latest_bar_value(self, symbol, val_type):
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("That symbol is not available in the historical data set.")
raise
return getattr(bars_list[-1][1], val_type)
def get_latest_bars_values(self, symbol, val_type, N=1):
try:
bars_list = self.get_latest_bars(symbol, N)
except KeyError:
print("That symbol is not available in the historical data set.")
raise
return np.array([getattr(b[1], val_type) for b in bars_list])
最后一个方法,update_bars,它简单的生成一个MarketEvent事件,并且被加入到队列中,当然也被作为最近的Bar数据被增加到latest_symbol_data词典中。
# data.py
def update_bars(self):
for s in self.symbol_list:
try:
bar = next(self._get_new_bar(s))
except StopIteration:
self.continue_backtest = False
if bar is not None:
self.lastest_symbol_data[s].append(bar)
self.events.put(MarketEvent())
我们有个DataHandler继承类,它将被剩下的组件用于跟踪市场数据。Strategy, Portfolio和ExecutionHandler对象都需要当前市场数据。因此集中处理它避免在不同类中重复存储数据。
14.2.3 Strategy
Strategy对象封装了基于市场数据的所有计算工作,并生成建议信号给Portfolio对象。所有的“策略逻辑”都在这个类中处理。之所以在回测系统中分开Strategy和Portfolio两个对象,主要是因为这样可以处理多个Strategy生成信号给更大的Portofolio对象,这个Portfolio对象能处理它自己的风险,比如多块分配和杠杆。在高频交易中,策略和投资组合将紧紧集合在一起并且极端依赖硬件。这已经超过本章的范围。
这个阶段的事件驱动回测系统开发不关心Indicator或者filter等其他技术交易介绍的,为这些创建相应的类层次也是有意义的,不过超出了本章的范围。像这种机制将会在继承的策略对象中使用。
策略层次结构相对简单,它由一个生成SignalEvent事件的纯虚函数构成的抽象基类组成。为了创建Strategy类,需要引入NumPy,Pandas,Queue,抽象基类工具包和SignalEvent:
#!/usr/bin/python
# -*- coding: utf-8 -*-
#strategy.py
from __future__ import print_function
from abc import ABCMeta, abstractmethod
import datetime
try:
import Queue as queue
except ImportError:
import queue
import numpy as np
import pandas as pd
from event import SignalEvent
Strategy抽象类简单定义了一个纯虚函数calculate_signals方法,在继承类中这个方法主要是基于市场数据生成SignalEvent对象。
# strategy.py
class Strategy(object):
__metaclass__ = ABCMeta
@abstractmethod
def calculate_signals(self):
raise NotImplementedError("Should implement calculate_signals()")
14.2.4 Portfolio
Portfolio对象保持跟踪仓位,并且基于信号生成固定数量股票的订单。更复杂的Portfolio对象包括风险管理和仓位调整工具类似Kelly Criterion。接下来几章节我们将在我们的交易策略中增加像这样的工具,可以与更“原始”的Portfolio方法比较看看有什么不一样。
投资组合订单管理系统可能是事件驱动系统中最复杂的组件。它的角色是保持跟踪所有当前的仓位和当前仓位的价值。这是仓位价值结算的一个简单估计,部分来源于回测系统的数据处理工具。
另外,为了优化发往交易所或者其他访问接口的订单,投资组合的仓位管理必须意识到风险因子和仓位调整技术。
不幸运的是,Portfolio和Order Management Systems (OMS)可能变得相当复杂。因此我决定保持Portfolio对象相对直接,以至于你能理解关键所在,并且知道如何实现。面向对象实现的本质就是允许用更自然的方式实现,后面再进行扩展以适应更复杂的情形。
Portfolio对象必须能够处理SignalEvent事件对象,并产生OrderEvent事件对象和解释FillEvent事件对象从而更新仓位。因此毫不奇怪,Portofolio对象经常是事件驱动系统中,以代码行(LOC)来衡量最大的组件。
我们创建新的文件portfolio.py,并且导入必要的库。除了Portfolio不是一个抽象类,大部分具体实现与其他类基本相同。Portfolio是一个正常的基类,这意味着它能够被实例化,当我们测试策略时,Portfolio是第一个可用的对象。其他Portfolio对象可以从这个基类继承,部分功能被重载以增加复杂性。
为了完整性,先上performance.py文件,主要是计算夏普比率(Sharpe Ratio)和最大回测(Max Drawdown):
#!/usr/bin/python
# -*- coding: utf-8 -*-
#performance.py
from __future__ import print_function
import numpy as np
import pandas as pd
def create_sharpe_ratio(returns, periods=252):
return np.sqrt(periods) * (np.mean(returns)) / np.std(returns)
def create_drawdowns(pnl):
hwm = [0]
idx = pnl.index
drawdown = pd.Series(index = idx)
duration = pd.Series(index = idx)
for t in range(1, len(idx)):
hwm.append(max(hwm[t-1]), pnl[t]))
drawdown[t] = (hwm[t] - pnl[t])
duration[t] = (0 if drawdown[t] == 0 else duration[t-1] + 1)
return drawdown, drawdown.max(), duration.max()
下面是Portfolio.py文件的import列表。为了获得订单大小的整型值我们需要从math库导入floor函数。我们也需要FillEvent和OrderEvent对象。注意我们增加了另外两个函数,create_sharpe_ratio和create_drawdowns都是来自于上面的performance.py文件。
#!/usr/bin/python
# -*- coding: utf-8 -*-
#portfolio.py
from __future__ import print_function
import datetime
from math import floor
try:
import Queue as queue
except Import Error:
import queue
import numpy as np
import pandas as pd
from event import FillEvent, OrderEvent
from performance import create_sharpe_ratio, create_drawdowns
Portfolio对象的初始化需要访问DataHandler,Event Queue,启动时间戳以及初始化资本数值(默认10万美金)。
Portfolio对象设计成能处理仓位调整和当前持仓。交易订单以固定的交易数量简单地发送给交易所,不考虑持有的现金。这些都是不切实际的假设,但有助于我们快速了解事件驱动模式下投资组合订单管理系统功能(OMS)。
portfolio包含all_positions和current_positions两个成员。前者存储之前市场数据事件发生时记录的所有的仓位列表。简单说,仓位就是拥有投资品的数量。负仓位意味着投资品被卖出。
后者current_positions词典,包含各个品种最新的以“当前市场值”计算的持仓数据。
“当前市场值”意思是从当前市场Bar数据获取的收盘价,这个无疑是近似的,但足够合理。all_holdings存储所有品种持仓的历史资产价值,current_holdings存储所有品种最新的持仓资产价值:
# portfolio.py
class Portfolio(object):
def __init__(self, bars, events, start_date, initial_capital=100000):
self.bars = bars
self.events = events
self.symbol_list = bars.symbol_list
self.start_date = start_date
self.initial_capital = initial_capital
self.all_positions = self.construct_all_positions()
self.current_positions = dict((k, v) for k, v in [(s, 0) for s in self.symbol_list])
self.all_holdings = self.construct_all_holdings()
self.current_holdings = self.construct_current_holdings()
下面的方法,contruct_all_positions针对每个品种简单创建一个词典,每个品种的仓位都初始化零值,并且增加时间戳,最后把辞典加入列表中。实际上最后形成的结果是辞典的列表。
# portfolio.py
def construct_all_positions(self):
d = dict((k, v) for k, v in [(s, 0) for s in self.symbol_list])
d['datetime'] = self.start_date
return [d]
contruct_all_holdings方法与上面类似,增加几个额外的key处理现金,佣金和总计。它们各自代表任何购买以后账户剩余的现金,累积的交易佣金和账户总资产包括现金和股票持仓。初始的现金和账户总资产都设置为初始资本。
这种方式下,每个品种有独立的“账户”,“手上的现金”,支付的“佣金”(交易所费用)和“总”的投资组合价值。无疑这没有考虑保证金需求和卖空约束,但这足够给你一个体验:像这样的"OMS"是如何创建的:
# portfolio.py
def construct_all_holdings(self):
d = dict((k, v) for k, v in [(s, 0.0) for s in self.symbol_list])
d['datetime'] = self.start_time
d['cash'] = self.initial_capital
d['commission'] = 0.0
d['total'] = self.initial_capital
return d[d]
下面的方法,construct_current_holdings几乎跟上面一样,除了它不需要把辞典加入列表,因为它只需要创建一个记录:
# portfolio.py
def construct_current_holdings(self):
d = dict((k, v) for k, v in [(s, 0.0) for s in self.symbol_list])
d['cash'] = self.initial_capital
d['commission'] = 0.0
d['total'] = self.initial_capital
return d
在每个心跳处理过程中,每一次有来自于DataHandler对象的新市场数据时,portfolio必须更新更新所有持仓的当前市场价值。在实盘场景中,这个信息可以从交易所下载和解析。但是对于回测系统来说,这是必须实现的,需要手动计算这些数值根据DataHandler提供的Bar数据。
不幸的是,在bid/ask这种模式下没有“当前市场值”一说,因此需要通过资产数量近似的“价格”来估算。我在这里采用来自于最新的Bar数据的收盘价。对于日内策略来说这是相对现实可行的。但对每日策略这是不可行的,因为开盘价可能相当多的情况下不同于收盘价。
update_timeindex方法跟踪处理新的持仓。首先获得来自于DataHandler最新的价格,创建新的品种辞典代表当前仓位,并且设置“新”的仓位数据等于“当前”仓位数据。
仅仅当收到FillEvent事件时,当前仓位数据才需要修改,这将在后面Portfolio代码中处理。这个方法后面会添加当前持仓资产价值到all_positions列表。
holdings以类似方式更新,除了市场价值通过当前持仓数据最新Bar的收盘价重新计算。最后,新的持仓价值也被添加到all_holdings列表。
# portfolio.py
def update_timeindex(self, event):
latest_datetime = self.bars.get_latest_bar_datetime(self.symbol_list[0])
dp = dit((k, v) for k, v in [(s, 0) for s in self.symbol_list])
dp['datetime'] = latest_datetime
for s in self.symbol_list:
dp[s] = self.current_positions[s]
self.all_positions.append(dp)
dh = dict((k, v) for k, v in [(s, 0) for s in self.symbol_list])
dh['datetime'] = latest_datetime
dh['cash'] = self.current_holdings['cash']
dh['commission'] = self.current_holdings['commission']
dh['total'] = self.current_holdings['cash']
for s in self.symbol_list:
market_value = self.current_positions[s] * self.bars.get_latest_bar_value(s, "adj_close")
dh[s] = market_value
dh['total'] += market_value
self.all_holdings.append(market_value)
update_positions_from_fill方法检查FillEvent事件确定买卖方向,然后根据增加或者减少股票数量更新current_positions辞典:
# portfolio.py
def update_positions_from_fill(self, fill):
fill_dirs = 0
if fill.direction == 'BUY':
fill_dir = 1
if fill.direction == 'SELL':
fill_dir = -1
self.current_positions[fill.symbol] += fill_dir * fill_quantity
update_holdings_from_fill方法跟上面类似,不过它更新的是“holdings”资产价值。为了模拟交易的成本,下面的方法没有使用FillEvent事件对象中的成本数据。为什么这样处理呢?
因为在回测环境中交易成本本质上是无法获取的(市场影响和订单簿深度都是未知的),这些都是估算出来的。
fill成本被设定成“当前市场价格”,就是最新的Bar数据的收盘价。特定品种的资产价值=交易成本*交易数量。在流动性充裕的市场里大多数低频的交易策略这样处理是很好的近似,但是高频市场中,产品级回测和实盘交易就需要考虑这些问题。
一旦当前资产价值的交易成本已知,现金和总资产就可以更新了。累积佣金也可以更新了:
# portfolio.py
def update_holdings_from_fill(self, fill):
fill_dir = 0
if fill_direction == 'BUY':
fill_dir = 1
if fill_direction == 'SELL':
fill_dir = -1
fill_cost = self.bars.get_latest_bar_value(fill.symbol, "adj_close")
cost = fill_dir * fill_cost * fill.quantity
self.current_holdings[fill.symbol] += cost
self.current_holdings['commission'] += fill.commission
self.current_holdings['cash'] -= (cost + fill.commission)
self.current_holdings['total'] -= (cost + fill.commission)
update_fill方法实现在下面,这个方法简单执行前面2个函数,update_positions_from_fill和update_holdings_from_fill两个方法处理接收到的FillEvent事件。
# portfolio.py
def update_fill(self, event):
if event.type == 'FILL':
self.update_positions_from_fill(event)
self.update_holdings_from_fill(event)
Portfolio对象需要处理FillEvent事件,在收到一个或者多个SignalEvent事件时,也需要仔细生成OrderEvent事件。
generate_naive_order方法简单地生成一个买卖的信号,发送一个订单比如100股的数量。100是一个任意值,在产品级模拟中将会依赖投资组合总的资产数量。在实际实现中,这个值将由风险管理或者仓位控制模块决定。这里是一个简化版本的“Portfolio”,因此仅仅只是发出订单,并没有经过风险管理模块。
这个方法基于当前的数量和特定的品种同时处理,多,空以及平仓,相应的OrderEvent对象被生成:
# portfolio. py
def generate_naive_order(self, signal):
order = None
symbol = signal.symbol
direction = signal.signal_type
strength = signal.strength
mkt_quantity = 100
cur_quantity = self.current_positions[symbol]
order_type = 'MKT'
if direction == 'LONG' and cur_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, 'BUY')
if direction == 'SHORT' and cur_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, 'SELL')
if direction == 'EXIT' and cur_quantity > 0:
order = OrderEvent(symbol, order_type, abs(cur_quantity), 'SELL')
if direction == 'EXIT' and cur_quantity > 0:
order = OrderEvent(symbol, order_type, abs(cur_quantity), 'BUY')
return order
update_signal方法简单调用上面的方法,并增加OrderEvent事件到信事件队列:
# portfolio.py
def update_signal(self, event):
if event.type == 'SIGNAL':
order_event = self.generate_naive_order(event)
self.events.put(order_event)
倒数第二个方法是关于净值曲线的生成。简单地创建一个returns序列用于收益计算,然后基于百分比正规化收益曲线。账户初始值为1.0,相对于全部美元总量:
# portfolio.py
def create_equity_curve_dataframe(self):
curve = pd.DataFrame(self.all_holdings)
curve.set_index('datetime', inplace=True)
curve['returns'] = curve['total'].pct_change()
curve['equity_curve'] = (1.0 + curve['returns']).cumprod()
self.equity_curve = curve
最后一个方法是净值曲线和与策略相关不同收益统计输出。最后一行输出到一个文件: equity.csv,与代码文件在相同目录,这个文件可以被Matplotlib Python script(或者表格工具比如Excel, LibreOffice Calc)用来后续分析。
注意:DrawDown Duration是指回测持续的时间,以数据Bar的绝对数目来计算,与特定的时间周期相关。
def output_summary_stats(self):
total_return = self.equity_curve['equity_curve'][-1]
returns = self.equity_curve['returns']
pnl = self.equity_curve['equity_curve']
sharpe_ratio = create_sharpe_ratio(returns, periods=252*60*6.5)
drawdown, max_dd, dd_duration = create_drawdowns(pnl)
self.equity_curve['drawdown'] = drawdown
stats = [("Total Return", "%0.2f%%" % ((total_return - 1.0) * 100.0)),
("Sharpe Ratio", "%0.2f" % sharpe_ratio),
("Max Drawdown", "%0.2f%%" % (max_dd * 100.0)),
("Drawdown Duration", "%d" % dd_duration)
]
self.equity_curve.to_csv('equity.csv')
return stats
Portfolio对象是整个事件驱动的回测系统中最复杂部分。但是这里的实现中,仓位处理也是相对初级的。
14.2.5 Execution Handler
这个部分我们将研究交易订单执行,我们会创建一个类层次表示模拟订单的处理机制,最终会和交易所或者其他市场进行对接。
这里描述的ExecutionHandler是非常简单的,因为它把所有订单的价格都定为当前市场价格,这实际上是不可行的,但可以作为改进的基础。
我们也需要导入先前定义的抽象基类,另外也需要导入FillEvent事件和OrderEvent事件:
#!/usr/bin/python
# -*- coding: utf-8 -*-
#execution.py
from __future__ import print_function
from abc import ABCMeta, abstractmethod
import datetime
try:
import Queue as queue
except ImportError:
import queue
from event import FillEvent, OrderEvent
ExecutionHandler像之前抽象基类类似由一个纯虚函数,execute_order:
# execution.py
class ExecutionHandler(object):
__metaclass__ == ABCMeta
@abstractmethod
def execute_order(self, event):
raise NotImplementError("Should implement execute_order()")
为了回测策略我们需要模拟交易如何执行。最简单的实现就是假定所有订单所有数量都以当前市场价被交易。这在实际中是完全不可行的,作为回测的一个大的改进,我们将设计更复杂的滑点和市场影响的模型。
注意:FillEvent事件中,fill_cost的值是None, 因为我们在上面描述的Portfolio对象中补充了fill_cost的值。在一个更实际的实现中,我们将利用“当前”市场价值获得实际的fill_cost。
回测中,我们简化地利用ARCA交易所,只是一个字符串占位符,实盘执行环境中,这将是相当重要的:
# execute.py
class SimulatedExecutionHandler(ExecutionHandler):
def __init__(self, events):
self.events = events
def execute_order(self, event):
if event.type == 'ORDER':
fill_event = FillEvent(datetime.datetime.utcnow(), event.symbol, 'ARCA', event.quantity, event.direction, None)
self.events.put(fill_event)
14.2.6 Backtest
我们现在创建回测类。回测类封装了事件驱动逻辑,并且上面讨论的全部类紧紧联系在一起。
为了处理放在Event Queue中的事件,回测类设计成基于嵌套while循环的事件驱动系统,外部的while循环作为心跳循环,决定回测系统的处理精度。在实盘环境中,这个值是一个正数,比如600秒(每10分钟)。市场数据和仓位将在这个时间周期内更新。
在回测系统中,心跳间隔也可能是0,不考虑策略的频率,因为历史数据都已经存在。
我们能够以任何速度来运行回测系统,因为对于回测系统什么时候数据可用是不可知的,只要这些数据带有时间戳。因此我们这里只是演示实盘交易引擎如何工作。外部循环一旦结束,DataHandler通过设置bool变量 _backtest 让Backtest对象知道。
内部循环真正处理事件信号,并根据事件类型,把他们发送给正确的组件。Event Queue持续的增加或者移除事件。这就是所谓的事件驱动系统。
首先应该导入必要的库。我们导入pprint(“pretty-print”),想以更友好的方式显示状态信息:
#!/usr/bin/python
# -*-coding: utf-8 -*-
#backtest.py
from __future__ import print_function
import datetime
import pprint
try:
import Queue as queue
except:
import queue
import time
回测对象的初始化需要CSV目录,完整的交易品种列表,初始化资本,心态间隔(毫秒计),启动时间戳,DataHandler, ExecutionHandler, Portfolio以及Strategy对象等参数。Queue用于存储事件。信号数量,订单数量,交易数量也是很重要的:
# backtest.py
class Backtest(object):
def __init__(self, csv_dir, symbol_list, initial_capital, heartbeat, start_date, data_handler, execution_handler, portfolio, strategy):
self.csv_dir = csv_dir
self.symbol_list = symbol_list
self.initial_capital = initial_capital
self.heartbeat = heartbeat
self.start_date = start_date
self.data_handler_cls = data_handler
self.execution_handler_cls = execution_handler
self.portfolio_cls = portfolio
self.strategy_cls = strategy
self.events = queue.Queue()
self.signals = 0
self.orders = 0
self.fills = 0
self.num_strats = 1
self._generate_trading_instances()
第一个方法 _generate_trading_instances, 生成所有内部交易对象(DataHandler, Strategy, Portfolio和ExecutionHandler):
# backtest.py
def _generate_trading_instances(self):
print("Creating DataHandler, Strategy, Portfolio and ExecutionHandler")
self.data_handler = self.data_handler_cls(self.events, self.csv_dir, self.symbol_list)
self.strategy = self.strategy_cls(self.data_handler, self.events)
self.portfolio = self.portfolio_cls(self.data_handler, self.events, self.start_date, self.initial_capital)
self.execution_handler = self.execution_handler_cls(self.events)
_run_backtest方法主要是事件处理。像上面描述的两个嵌套循环。外部处理心跳循环,内部循环检查队列中的事件,根据事件类型调用合适组件的相应方法。
对于MarketEvent事件,Strategy对象将从新计算生成新的事件,Portfolio对象重建时间索引。Portfolio收到一个SignalEvent,处理之后会生成一系列OrderEvent事件。如果ExecutionHandler收到OrderEvent事件,它将会把订单提交到交易所处理(实盘交易环境),如果收到FillEvent事件,Portfolio对象将更新仓位数据:
# backtest.py
def _run_backtest(self):
i = 0
while True:
i += 1
print i
if self.data_handler.continue_backtest == True:
self.data_handler.update_bars()
else:
break
while True:
try:
event = self.events.get(False)
except queue.Empty:
break
if event is not None:
if event.type == 'MARKET':
self.strategy.calculate_signals(event)
self.portfolio.update_timeindex(event)
elif event.type == 'SIGNAL':
self.signals += 1
self.portfolio.update_signals(event)
elif event.type == 'ORDER':
self.orders += 1
self.execute_handler.execute_order(event)
elif event.type == 'FILL':
self.fills += 1
self.portfolio.update_fill(event)
time.sleep(self.heartbeat)
一旦回测模拟完成,策略的表现情况就可以显示出来了。创建收益曲线并显示摘要统计,信号数量,订单数量,交易数量等。
# backtest.py
def _output_performance(self):
self.portfolio.create_equity_curve_dataframe()
print("Creating summary stats ...")
stats = self.portfolio.output_summary_stats()
print("Creating equity curve ...")
print(self.portfolio.equity_curve.tail(10))
pprint.pprint(stats)
print("Signals: %s" % self.signals)
print("Orders: %s" % self.orders)
print("Fills: %s" % self.fills)
最后一个方法是simulate_trading实现。简单按顺序调用之前两个方法:
# backtest.py
def simulate_trading(self):
self._run_backtest()
self._output_performance()
这些概括了事件驱动回测系统中所有运行对象。
14.3 事件驱动执行器
上面我们描述了一个基本的ExecutionHandler类,为每个OrderEvent事件简单创建了一个相应的FillEvent事件。对于我们第一个回测系统是没有问题的,但是当我们想对接交易所时,我们需要更复杂的处理。在这个部分,我们会定义IBExecutionHandler,这个类允许我们连接流行的交易所接口Interactive Broken API,自动化执行我们的交易。
IBExecutionHandler的核心是收到来自事件队列的OrderEvent事件,然后通过IbPy库直接连接Interactive Brokers order API,并执行这些订单操作。这个类同样也处理来自于API的“服务器返回”消息,这个阶段,唯一的操作就是创建相应的FillEvent事件,并添加到事件队列当中。
这个类本身可能变得相当复杂,因为执行优化逻辑和复杂的错误处理。但是在这里尽量保持相对简单,以至于你能明白主要的意思,你也可以根据你自己的交易习惯去扩展。
首先要做的是创建python文件,导入必要的库。文件叫做ib_execution.py,与其他文件放在同样的目录。我们导入必要的date/time处理库,IbPy库以及IBEexecutionHandler要处理的特定的事件类:
#!/usr/bin/python
# -*- coding: utf-8 -*-
#ib_execution.py
from __future__ import print_function
import datetime
import time
from ib.ext.Contract import Contract
from ib.ext.Order import Order
from ib.opt import ibConnection, message
from event import FillEvent, OrderEvent
from execution import ExecutionHandler
我们现在定义IBExecutionHandler类。init构造函数需要events队列,也需要一个order_routing,默认值是“SMART”,如果你有特别的交易所需求,你能在这里指定。默认的currency设置成“美元”。
在这个方法里,我们创建了一个fill_dict辞典,用于后面生成FillEvent事件。我们也创建了tws_conn的连接对象,用于存储我们与交易所API之间的连接信息。我们同时也创建了一个参数order_id,这个参数有一个默认值,用于跟踪所有后续的订单避免重复。最后,我们注册了消息处理器(更详细的我们将在后面定义):
# ib_execution.py
class IBExecutionHandler(ExecutionHandler):
def __init__(self, events, order_routing="SMART", currency="USD"):
self.events = events
self.order_routing = order_routing
self.currency = currency
self.tws_conn = self.create_tws_connection()
self.order_id = self.create_initial_order_id()
self.register_handlers()
IB API利用基于消息的事件系统允许我们的类处理特定的消息,跟回测系统类似。我们不包括任何实质性的错误处理(因为简洁的原因),仅仅只是通过_error_handler方法向终端输出错误。
_reply_handler方法,用于确定是否需要生成FillEvent事件。方法会检查是否收到"openOrder"消息,检查是否我们的fill_dict辞典中已经有orderId这个参数对应的数据条目,如果没有,那么我们应该创建一个。
如果收到“orderStatus”消息,这个消息代表一个订单成交了,接下来应该调用create_fill创建FillEvent事件,为了调试目的,也会向终端输出日志信息:
# ib_execution.py
def _error_handler(self, msg):
print("Server Error: %s" % msg)
def _reply_handler(self, msg):
if msg.typeName == "openOrder" and msg.orderId == self.order_id and not self.fill_dict.has_key(msg.orderId):
self.create_fill_dict_entry(msg)
if msg.typeName == "orderStatus" and msg.status == "Filled" and self.fill_dict[msg.orderId]["filled"] == False:
self.create_fill(msg)
print("Server Response: %s, %s\n" % (msg.typeName, msg))
下面的方法create_tws_connection,使用IbPy库的ibConnection对象创建了一个与IB API之间的连接,默认端口为7496和默认的clientId为10。一旦对象创建,为了进行连接,connect方法可以被调用。
# ib_execution.py
def create_tws_connection(self):
tws_conn = ibConnection()
tws_conn.connect()
return tws_conn
为了跟踪不同的订单(跟踪成交的目的)使用了下面的方法create_initial_order_id。我们设置的默认值为1,一个更好的方法是通过IB查询最近可用ID后使用。你可以通过Trader Workstation > Global Configuration > API配置面板重置当前的API订单ID:
# ib_execution.py
def create_initial_order_id(self):
return 1
下面的方法register_handlers,通过TWS连接简单地注册了错误处理和回复处理方法:
# ib_execution.py
def register_handlers(self):
self.tws_conn.register(self._error_handler, 'Error')
self.tws_conn.registerAll(self._reply_handler)
为了真正交易一个订单,创建一个IbPy Contract对象是必要的,这个对象跟IbPy Order对象会配对,并且会被提交给IB API。下面的方法create_contract,它需要交易品种参数,证券类型(股票或者期货),交易所/主交易所和货币单位。最终返回Contract对象:
# ib_execution.py
def create_contract(self, symbol, sec_type, exch, prim_exch, curr):
contract = Contract()
contract.m_symbol = symbol
contract.m_secType = sec_type
contract.m_exchange = exch
contract.m_primaryExch = prim_exch
contract.m_currency = curr
return contract
下面方法create_order,它需要交易类型参数order_type(市价单或者限价单),交易资产数量和交易方向(买或者卖)。最终返回Order对象:
# ib_execution.py
def create_order(self, order_type, quantity, action):
order = Order()
order.m_orderType = order_type
order.m_totalQuantity = quantity
order.m_action = action
return order
为了避免一个订单有重复FillEvent事件,我们利用fill_dict词典来存储订单ID相关的数据。当成交发生时,我们在词典中存储该成交订单ID相关的数据字段“filled”为True。如果后续“服务器返回”消息表示该订单ID成交了(重复消息),那我们并不会重新生成新的FillEvent事件。下面的create_fill_dict_entry处理这个情况:
# ib_execution.py
def create_fill_dict_entry(self, msg):
self.fill_dict[msg.orderId] = {
"symbol": msg.contract.m_symbol,
"exchange": msg.contract.m_exchange,
"direction": msg.order.m_action,
"filled": False
}
下面的方法create_fill,真正创建FillEvent事件对象然后放入事件队列:
# ib_execution.py
def create_fill(self, msg):
fd = self.fill_dict[msg.orderId]
symbol = fd["symbol"]
exchange = fd["exchange"]
filled = msg.filled
direction = fd["direction"]
fill_cost = msg.avgFillPrice
fill = FillEvent(
datetime.datetime.utcnow(), symbol, exchange, filled, direction, fill_cost
)
self.fill_dict[msg.orderId]["filled"] = True
self.events.put(fill_events)
现在所有前面的方法已经被实现除了重载execute_order方法。这个方法真正是通过IB API来处理订单。这个方法首先检查受到的事件是否OrderEvent,如果是则根据相应参数准备Contract和Order对象。一旦这两个对象都创建完成,那么就可以调用IbPy库连接对象的placeOrder方法,参数是订单order_id。
为了确保订单真正提交到IB,调用time.sleep(1)方法是非常重要。我的系统删除这行代码会导致这个API有不一致的行为。最后我们增加订单ID,保证我们不重复处理订单:
# ib_execution.py
def execute_order(self, event):
if event.type == 'ORDER':
asset = event.symbol
asset_type = "STK"
order_type = event.order_type
quantity = event.quantity
direction = event.direction
ib_contract = self.create_contract(
asset, asset_type, self.order_routing, self.currency
)
ib_order = self.create_order(order_type, quantity, direction)
self.tws_conn.placeOrder(self.order_id, ib_contract, ib_order)
time.sleep(1)
self.order_id += 1
这个类形成Interactive Brokers执行处理的基础,可以用在模拟执行处理的位置,模拟执行处理只能用于回测。在IB执行处理能够使用之前,仍然需要创建一个实盘数据处理以取代回测系统的历史数据处理。
这种方式下,我们在回测和实盘系统之间重用了尽可能多组件,保证这个代码“替换”是最少的,因为这两者的行为虽然不完全一样,但是类似的。