用Python将OKex的k线数据保存到Mysql数据库中

2020-03-26  本文已影响0人  兰_泽

背景

因为数字货币交易所的k线数据接口都有返回数量限制,对于想要拿到大量历史数据进行自己策略回测而言,数据样本量太小,回测是没有意义的。所以,这里介绍一种方法,将交易所里的k线数据逐渐地保存到自己的数据库中,以便日后需要回测的时候,可以有足够多的历史数据供自己使用。
\color{#99CCCC}{示例中主要获取的是OKex的BTC-USD-SWAP永续合约5min的k线数据。}

Step1: 创建数据库

CREATE DATABASE 你自己的数据库名;

Step2: 创建表

CREATE TABLE `okex_swap` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `asset` varchar(18) NOT NULL COMMENT '资产类型',
  `kline_type` varchar(4) NOT NULL COMMENT 'k线类型',
  `candle_begin_time_GMT8` varchar(20) NOT NULL COMMENT '东8区时间',
  `timestamp` varchar(15) NOT NULL COMMENT '东8区时间戳(ms)',
  `open` varchar(64) NOT NULL COMMENT '开盘价',
  `high` varchar(64) NOT NULL COMMENT '最高价',
  `low` varchar(64) NOT NULL COMMENT '最低价',
  `close` varchar(64) NOT NULL COMMENT '收盘价',
  `volume` varchar(64) NOT NULL COMMENT '交易量(按张折算)',
  `currency_volume` varchar(64) NOT NULL COMMENT '交易量(按币折算)',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `indx_time` (`timestamp`) USING BTREE,
  KEY `indx_asset_type` (`asset`,`kline_type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step3: 程序

import ccxt
from ccxt import Exchange
import pymysql as mysql
import pandas as pd
from datetime import timedelta
from time import sleep
import traceback

okex = ccxt.okex3()
instrument_id = 'BTC-USD-SWAP'
interval = 300 # 5分钟k线数据

try:
    while True:
        # 获取kline
        klines = okex.swap_get_instruments_instrument_id_candles(
            {
                'instrument_id': instrument_id,
                'granularity': interval
            }
        )
        df = pd.DataFrame(klines, dtype=float)
        df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume', 6: 'currency_volume'},inplace=True)

        df['MTS'] = df['MTS'].map(lambda x: Exchange.parse8601(x))

        df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
        df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
        df = df[['candle_begin_time_GMT8', 'MTS', 'open', 'high', 'low', 'close', 'volume', 'currency_volume']]

        # 创建数据库链接
        connection = mysql.connect(
            host='填写你自己的数据库链接地址',
            user='填写你自己的数据库用户名',
            password='填写你自己的数据库密码',
            db='填写你自己的数据库名',
            charset='utf8',
            cursorclass=mysql.cursors.DictCursor
        )

        # 遍历df
        for index, row in df.iterrows():
            # 将df中的每一行数据逐条插入数据库
            with connection.cursor() as cursor:
                sql = ''' INSERT IGNORE INTO `coin`.`okex_swap`( `asset`,`kline_type`,`candle_begin_time_GMT8`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `currency_volume`) 
                VALUES ( 'BTC-USD-SWAP', '5min','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s') 
                ''' % (row['candle_begin_time_GMT8'], row['MTS'], row['open'], row['high'], row['low'], row['close'],row['volume'], row['currency_volume'])
                cursor.execute(sql)
                connection.commit()
                print("第 %d 条数据保存成功" % index)
        print("数据保存成功^_^")

        sleep(5 * 60)
except Exception as err:
    print("保存数据时发生异常: %s" % traceback.format_exc())
finally:
    # 关闭数据库连接资源
    connection.close()

\color{#FFCC00}{方法二}

\color{#99CCCC}{上面是逐条插入数据库,与数据库之间的IO太多,推荐以下方法进行批量导入数据}

import ccxt
from ccxt import Exchange
import pymysql as mysql
import pandas as pd
from datetime import timedelta
from time import sleep
import traceback

okex = ccxt.okex3()
instrument_id = 'BTC-USD-SWAP'
interval = 300 # 5分钟k线数据

try:
    while True:
        # 获取kline
        klines = okex.swap_get_instruments_instrument_id_candles(
            {
                'instrument_id': instrument_id,
                'granularity': interval
            }
        )
        df = pd.DataFrame(klines, dtype=float)
        df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume', 6: 'currency_volume'},inplace=True)

        df['MTS'] = df['MTS'].map(lambda x: Exchange.parse8601(x))

        df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
        df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
        df = df[['candle_begin_time_GMT8', 'MTS', 'open', 'high', 'low', 'close', 'volume', 'currency_volume']]
        # df.sort_index(ascending=False, inplace=True)

        # Connect to the database
        connection = mysql.connect(
            host='dev-mysql.mysql.rds.aliyuncs.com',
            user='root',
            password='4KkkZ7qja3OWju78rrkH',
            db='coin',
            charset='utf8',
            cursorclass=mysql.cursors.DictCursor
        )


        list = []
        for index, row in df.iterrows():
            lst = []
            lst.append(str(row['candle_begin_time_GMT8']))
            lst.append(str(row['MTS']))
            lst.append(str(row['open']))
            lst.append(str(row['high']))
            lst.append(str(row['low']))
            lst.append(str(row['close']))
            lst.append(str(row['volume']))
            lst.append(str(row['currency_volume']))

            list.append(lst)
            print("第 %d 条数据添加成功" % index)

        sql = '''
                REPLACE INTO `coin`.`okex_swap`( `asset`,`kline_type`,`candle_begin_time_GMT8`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `currency_volume`)
                VALUES ( 'BTC-USD-SWAP', '5min',%s, %s, %s, %s, %s, %s, %s, %s)
              '''
        with connection.cursor() as cursor:
            cursor.executemany(sql,list)
            connection.commit()

        print("数据保存成功")

        sleep(5*60)
except Exception as err:
    print("保存数据时发生异常: %s" % traceback.format_exc())
finally:
    connection.close()
上一篇 下一篇

猜你喜欢

热点阅读