Python读取通达信本地数据
2022-11-19 本文已影响0人
Curious1990
获取内容
股票名称、缩写、领域及K线(日 周 月 季 年)
import os
import struct
import pandas as pd
# 负责读取本地通达信数据
class TDXLoader:
def __init__(self):
# TDX_DIR 为通达信根目录
self.TDX_DIR = os.environ.get("Tdx")
self.__base_all = self._get_base_all()
def resolve(self, *args, **kwargs):
return os.path.join(self.TDX_DIR, *args, **kwargs)
# 获取行业数据
def _get_tdxhy(self):
tdxhy_path = self.resolve(r"T0002\hq_cache\tdxhy.cfg")
names = "market symbol industry_code idontcare1 idontcare2 industry_second_code".split()
usecols = "market symbol industry_code industry_second_code".split()
tdxhy = pd.read_csv(
tdxhy_path,
sep="|",
names=names,
usecols=usecols,
dtype={"symbol": str},
)
return tdxhy
# 行业代码码值
def _get_incon(self):
incon_path = self.resolve(r"incon.dat")
incon = pd.read_csv(incon_path, encoding="gb2312", names=["index"])
fx0 = lambda x: x.split("|")[0] if "|" in x else ""
incon["industry_code"] = incon["index"].apply(fx0)
fx1 = lambda x: x.split("|")[1] if "|" in x else ""
incon["industry_name"] = incon["index"].apply(fx1)
usecols = ["industry_code", "industry_name"]
return incon[usecols]
# 股票代码对应拼音缩写
def _read_tnf(self, path):
market = path.split(".")[0][-3:]
with open(path, "rb") as f:
buff = f.read()
data = buff[50:]
l = len(data) // 314
fx = lambda x: str(x, encoding="gbk").strip("\x00")
sm = {"szm": ("00", "30"), "shm": ("60", "68")}
stocks = []
for x in [data[i * 314 : (i + 1) * 314] for i in range(l)]:
code = fx(x[:6])
if code.startswith(sm[market]):
name = fx(x[23:41])
shortcode = fx(x[285:293])
stocks += [[code, name, shortcode]]
return stocks
# 股票K线数据源文件
def _get_szshm(self):
szm_path = self.resolve(r"T0002\hq_cache\szm.tnf")
shm_path = self.resolve(r"T0002\hq_cache\shm.tnf")
szm = self._read_tnf(szm_path)
shm = self._read_tnf(shm_path)
stocks = pd.DataFrame(szm + shm, columns=["symbol", "name", "shortcode"])
return stocks
# 整合基本数据
def _get_base_all(self):
tdxhy = self._get_tdxhy()
incon = self._get_incon()
szshm = self._get_szshm()
base = pd.merge(szshm, tdxhy, how="left", on="symbol")
base = pd.merge(base, incon, how="left", on="industry_code")
base = pd.merge(
base,
incon,
how="left",
left_on="industry_second_code",
right_on="industry_code",
)
fx = lambda x: ".sh" if x else ".sz"
base["ts_code"] = base["symbol"] + base["market"].apply(fx)
base.rename(
columns={
"industry_name_x": "industry_name",
"industry_name_y": "industry_detail",
},
inplace=True,
)
usecols = "ts_code symbol name shortcode industry_name industry_detail".split()
return base[usecols]
# 读取K线源文件
def _read_kline(self, filepath):
with open(filepath, "rb") as f:
usecols = "trade_date open high low close amount vol openinterest".split()
buffers = []
while True:
buffer = f.read(32)
if not buffer:
break
buffer = struct.unpack("lllllfll", buffer)
buffers.append(buffer)
kline = pd.DataFrame(buffers, columns=usecols)
kline["trade_date"] = kline["trade_date"].astype(str)
price_columns = ["open", "high", "low", "close"]
kline[price_columns] = kline[price_columns].apply(lambda x: x / 100)
return kline
# 获取基本数据
def get_base_all(self):
return self.__base_all
# 获取日K线数据
def get_kline_daily(self, ts_code):
filename = ts_code.split(".")[1] + ts_code.split(".")[0] + ".day"
filepath = self.resolve("vipdoc", ts_code.split(".")[1], "lday", filename)
kline = self._read_kline(filepath)
kline["ts_code"] = ts_code
kline.index = pd.to_datetime(kline["trade_date"])
kline.index.name = "index"
kline = kline.rename(columns={"vol": "volume"})
usecols = (
"ts_code trade_date open high low close amount volume openinterest".split()
)
return kline[usecols]
if __name__ == "__main__":
loader = TDXLoader()
# 获取所有股票基本数据
base_all = loader.get_base_all()
# 获取单股日K线数据
kline = loader.get_kline_daily("600645.sh")
print(kline)
走缓存,并加工出K线的各个周期
单单运行脚本缓存是无效的,需要放在Web服务或桌面应用中
from store.loader import TDXLoader as Loader
from cacheout import LFUCache
from loguru import logger as log
# 负责缓存
class Cache(object):
def __init__(self):
self.__loader = Loader()
self.cache = LFUCache()
self.rule = {
"ts_code": "last",
"trade_date": "last",
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
"openinterest": "last",
}
# 从缓存获取日K线,没有缓存就从Loader读取
def _get_kline_daily(self, ts_code):
if self.cache.has(ts_code):
kline = self.cache.get(ts_code)
else:
kline = self.__loader.get_kline_daily(ts_code)
self.cache.set(ts_code, kline)
log.info(f"{ts_code} K线数据已缓存")
return kline
# 按 日 周 月 季 年 获取K线数据,对应周期为 D W M Q A。day week month quarter annual
def _get_kline_by_period(self, ts_code, period="D"):
kline = self._get_kline_daily(ts_code)
kline = kline.resample(period).agg(self.rule).dropna()
kline = kline.sort_values(by="trade_date", ascending=False)
kline = kline.reset_index(drop=True)
return kline
# 按天数分割获取K线数据,days等于30则代表30天为一个周期
def _get_kline_by_days(self, ts_code, days=1):
kline = self._get_kline_daily(ts_code)
kline = kline.sort_values(by="trade_date", ascending=False)
kline = kline.reset_index(drop=True).sort_values(by="trade_date")
kline = kline.groupby(lambda x: x // days).agg(self.rule)
kline = kline.sort_values(by="trade_date", ascending=False)
return kline
# 获取基本数据的函数,传股票代码进来就获取一只股票的,不传参就获取所有股票的
def get_base(self, ts_code=None):
data = self.__loader.get_base_all()
if ts_code:
base = data[data["ts_code"] == ts_code]
else:
base = data
return base
# 整合 _get_kline_by_period 和 _get_kline_by_days
def get_kline(self, ts_code, param):
if isinstance(param, int):
return self._get_kline_by_days(ts_code, days=param)
else:
return self._get_kline_by_period(ts_code, period=param)
datas = Cache()
if __name__ == "__main__":
# 获取所有股票基本数据
base_all = datas.get_base()
# 获取单股基本数据
base_all = datas.get_base("600571.sh")
# 获取单股日线数据
kline = datas.get_kline("600571.sh", "D")
# 获取单股周线数据
kline = datas.get_kline("600571.sh", "W")
# 获取单股月线数据
kline = datas.get_kline("600571.sh", "M")
# 获取单股季线数据
kline = datas.get_kline("600571.sh", "Q")
# 获取单股年线数据
kline = datas.get_kline("600571.sh", "A")
# 获取单股以5天为周期的数据
kline = datas.get_kline("600571.sh", 5)
# 获取单股以30天为周期的数据
kline = datas.get_kline("600571.sh", 30)