用Python将OKex的k线数据保存到Mysql数据库中
2020-03-26 本文已影响0人
兰_泽
背景
因为数字货币交易所的k线数据接口都有返回数量限制,对于想要拿到大量历史数据进行自己策略回测而言,数据样本量太小,回测是没有意义的。所以,这里介绍一种方法,将交易所里的k线数据逐渐地保存到自己的数据库中,以便日后需要回测的时候,可以有足够多的历史数据供自己使用。
Step1: 创建数据库
CREATE DATABASE 你自己的数据库名;
Step2: 创建表
CREATE TABLE `okex_swap` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`asset` varchar(18) NOT NULL COMMENT '资产类型',
`kline_type` varchar(4) NOT NULL COMMENT 'k线类型',
`candle_begin_time_GMT8` varchar(20) NOT NULL COMMENT '东8区时间',
`timestamp` varchar(15) NOT NULL COMMENT '东8区时间戳(ms)',
`open` varchar(64) NOT NULL COMMENT '开盘价',
`high` varchar(64) NOT NULL COMMENT '最高价',
`low` varchar(64) NOT NULL COMMENT '最低价',
`close` varchar(64) NOT NULL COMMENT '收盘价',
`volume` varchar(64) NOT NULL COMMENT '交易量(按张折算)',
`currency_volume` varchar(64) NOT NULL COMMENT '交易量(按币折算)',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `indx_time` (`timestamp`) USING BTREE,
KEY `indx_asset_type` (`asset`,`kline_type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Step3: 程序
import ccxt
from ccxt import Exchange
import pymysql as mysql
import pandas as pd
from datetime import timedelta
from time import sleep
import traceback
okex = ccxt.okex3()
instrument_id = 'BTC-USD-SWAP'
interval = 300 # 5分钟k线数据
try:
while True:
# 获取kline
klines = okex.swap_get_instruments_instrument_id_candles(
{
'instrument_id': instrument_id,
'granularity': interval
}
)
df = pd.DataFrame(klines, dtype=float)
df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume', 6: 'currency_volume'},inplace=True)
df['MTS'] = df['MTS'].map(lambda x: Exchange.parse8601(x))
df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
df = df[['candle_begin_time_GMT8', 'MTS', 'open', 'high', 'low', 'close', 'volume', 'currency_volume']]
# 创建数据库链接
connection = mysql.connect(
host='填写你自己的数据库链接地址',
user='填写你自己的数据库用户名',
password='填写你自己的数据库密码',
db='填写你自己的数据库名',
charset='utf8',
cursorclass=mysql.cursors.DictCursor
)
# 遍历df
for index, row in df.iterrows():
# 将df中的每一行数据逐条插入数据库
with connection.cursor() as cursor:
sql = ''' INSERT IGNORE INTO `coin`.`okex_swap`( `asset`,`kline_type`,`candle_begin_time_GMT8`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `currency_volume`)
VALUES ( 'BTC-USD-SWAP', '5min','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')
''' % (row['candle_begin_time_GMT8'], row['MTS'], row['open'], row['high'], row['low'], row['close'],row['volume'], row['currency_volume'])
cursor.execute(sql)
connection.commit()
print("第 %d 条数据保存成功" % index)
print("数据保存成功^_^")
sleep(5 * 60)
except Exception as err:
print("保存数据时发生异常: %s" % traceback.format_exc())
finally:
# 关闭数据库连接资源
connection.close()
import ccxt
from ccxt import Exchange
import pymysql as mysql
import pandas as pd
from datetime import timedelta
from time import sleep
import traceback
okex = ccxt.okex3()
instrument_id = 'BTC-USD-SWAP'
interval = 300 # 5分钟k线数据
try:
while True:
# 获取kline
klines = okex.swap_get_instruments_instrument_id_candles(
{
'instrument_id': instrument_id,
'granularity': interval
}
)
df = pd.DataFrame(klines, dtype=float)
df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume', 6: 'currency_volume'},inplace=True)
df['MTS'] = df['MTS'].map(lambda x: Exchange.parse8601(x))
df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
df = df[['candle_begin_time_GMT8', 'MTS', 'open', 'high', 'low', 'close', 'volume', 'currency_volume']]
# df.sort_index(ascending=False, inplace=True)
# Connect to the database
connection = mysql.connect(
host='dev-mysql.mysql.rds.aliyuncs.com',
user='root',
password='4KkkZ7qja3OWju78rrkH',
db='coin',
charset='utf8',
cursorclass=mysql.cursors.DictCursor
)
list = []
for index, row in df.iterrows():
lst = []
lst.append(str(row['candle_begin_time_GMT8']))
lst.append(str(row['MTS']))
lst.append(str(row['open']))
lst.append(str(row['high']))
lst.append(str(row['low']))
lst.append(str(row['close']))
lst.append(str(row['volume']))
lst.append(str(row['currency_volume']))
list.append(lst)
print("第 %d 条数据添加成功" % index)
sql = '''
REPLACE INTO `coin`.`okex_swap`( `asset`,`kline_type`,`candle_begin_time_GMT8`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `currency_volume`)
VALUES ( 'BTC-USD-SWAP', '5min',%s, %s, %s, %s, %s, %s, %s, %s)
'''
with connection.cursor() as cursor:
cursor.executemany(sql,list)
connection.commit()
print("数据保存成功")
sleep(5*60)
except Exception as err:
print("保存数据时发生异常: %s" % traceback.format_exc())
finally:
connection.close()