Python读取通达信本地数据

2022-11-19  本文已影响0人  Curious1990

获取内容

股票名称、缩写、领域及K线(日 周 月 季 年)

import os
import struct
import pandas as pd


# 负责读取本地通达信数据
class TDXLoader:
    def __init__(self):
        # TDX_DIR 为通达信根目录
        self.TDX_DIR = os.environ.get("Tdx")
        self.__base_all = self._get_base_all()

    def resolve(self, *args, **kwargs):
        return os.path.join(self.TDX_DIR, *args, **kwargs)

    # 获取行业数据
    def _get_tdxhy(self):
        tdxhy_path = self.resolve(r"T0002\hq_cache\tdxhy.cfg")
        names = "market symbol industry_code idontcare1 idontcare2 industry_second_code".split()
        usecols = "market symbol industry_code industry_second_code".split()
        tdxhy = pd.read_csv(
            tdxhy_path,
            sep="|",
            names=names,
            usecols=usecols,
            dtype={"symbol": str},
        )
        return tdxhy

    # 行业代码码值
    def _get_incon(self):
        incon_path = self.resolve(r"incon.dat")
        incon = pd.read_csv(incon_path, encoding="gb2312", names=["index"])
        fx0 = lambda x: x.split("|")[0] if "|" in x else ""
        incon["industry_code"] = incon["index"].apply(fx0)
        fx1 = lambda x: x.split("|")[1] if "|" in x else ""
        incon["industry_name"] = incon["index"].apply(fx1)
        usecols = ["industry_code", "industry_name"]
        return incon[usecols]

    # 股票代码对应拼音缩写
    def _read_tnf(self, path):
        market = path.split(".")[0][-3:]
        with open(path, "rb") as f:
            buff = f.read()

        data = buff[50:]
        l = len(data) // 314
        fx = lambda x: str(x, encoding="gbk").strip("\x00")
        sm = {"szm": ("00", "30"), "shm": ("60", "68")}

        stocks = []
        for x in [data[i * 314 : (i + 1) * 314] for i in range(l)]:
            code = fx(x[:6])
            if code.startswith(sm[market]):
                name = fx(x[23:41])
                shortcode = fx(x[285:293])

                stocks += [[code, name, shortcode]]
        return stocks

    # 股票K线数据源文件
    def _get_szshm(self):
        szm_path = self.resolve(r"T0002\hq_cache\szm.tnf")
        shm_path = self.resolve(r"T0002\hq_cache\shm.tnf")

        szm = self._read_tnf(szm_path)
        shm = self._read_tnf(shm_path)

        stocks = pd.DataFrame(szm + shm, columns=["symbol", "name", "shortcode"])
        return stocks

    # 整合基本数据
    def _get_base_all(self):
        tdxhy = self._get_tdxhy()
        incon = self._get_incon()
        szshm = self._get_szshm()

        base = pd.merge(szshm, tdxhy, how="left", on="symbol")
        base = pd.merge(base, incon, how="left", on="industry_code")
        base = pd.merge(
            base,
            incon,
            how="left",
            left_on="industry_second_code",
            right_on="industry_code",
        )

        fx = lambda x: ".sh" if x else ".sz"
        base["ts_code"] = base["symbol"] + base["market"].apply(fx)

        base.rename(
            columns={
                "industry_name_x": "industry_name",
                "industry_name_y": "industry_detail",
            },
            inplace=True,
        )
        usecols = "ts_code symbol name shortcode industry_name industry_detail".split()
        return base[usecols]

    # 读取K线源文件
    def _read_kline(self, filepath):
        with open(filepath, "rb") as f:
            usecols = "trade_date open high low close amount vol openinterest".split()
            buffers = []
            while True:
                buffer = f.read(32)
                if not buffer:
                    break
                buffer = struct.unpack("lllllfll", buffer)
                buffers.append(buffer)
            kline = pd.DataFrame(buffers, columns=usecols)

        kline["trade_date"] = kline["trade_date"].astype(str)

        price_columns = ["open", "high", "low", "close"]
        kline[price_columns] = kline[price_columns].apply(lambda x: x / 100)
        return kline

    # 获取基本数据
    def get_base_all(self):
        return self.__base_all

    # 获取日K线数据
    def get_kline_daily(self, ts_code):
        filename = ts_code.split(".")[1] + ts_code.split(".")[0] + ".day"
        filepath = self.resolve("vipdoc", ts_code.split(".")[1], "lday", filename)
        kline = self._read_kline(filepath)
        kline["ts_code"] = ts_code
        kline.index = pd.to_datetime(kline["trade_date"])
        kline.index.name = "index"
        kline = kline.rename(columns={"vol": "volume"})
        usecols = (
            "ts_code trade_date open high low close amount volume openinterest".split()
        )
        return kline[usecols]


if __name__ == "__main__":
    loader = TDXLoader()
    # 获取所有股票基本数据
    base_all = loader.get_base_all()
    # 获取单股日K线数据
    kline = loader.get_kline_daily("600645.sh")
    print(kline)

走缓存,并加工出K线的各个周期

单单运行脚本缓存是无效的,需要放在Web服务或桌面应用中

from store.loader import TDXLoader as Loader
from cacheout import LFUCache

from loguru import logger as log


# 负责缓存
class Cache(object):
    def __init__(self):
        self.__loader = Loader()
        self.cache = LFUCache()

        self.rule = {
            "ts_code": "last",
            "trade_date": "last",
            "open": "first",
            "high": "max",
            "low": "min",
            "close": "last",
            "volume": "sum",
            "openinterest": "last",
        }

    # 从缓存获取日K线,没有缓存就从Loader读取
    def _get_kline_daily(self, ts_code):
        if self.cache.has(ts_code):
            kline = self.cache.get(ts_code)
        else:
            kline = self.__loader.get_kline_daily(ts_code)
            self.cache.set(ts_code, kline)
            log.info(f"{ts_code} K线数据已缓存")
        return kline

    # 按 日 周 月 季 年 获取K线数据,对应周期为 D W M Q A。day week month quarter annual
    def _get_kline_by_period(self, ts_code, period="D"):
        kline = self._get_kline_daily(ts_code)

        kline = kline.resample(period).agg(self.rule).dropna()
        kline = kline.sort_values(by="trade_date", ascending=False)
        kline = kline.reset_index(drop=True)

        return kline

    # 按天数分割获取K线数据,days等于30则代表30天为一个周期
    def _get_kline_by_days(self, ts_code, days=1):
        kline = self._get_kline_daily(ts_code)

        kline = kline.sort_values(by="trade_date", ascending=False)
        kline = kline.reset_index(drop=True).sort_values(by="trade_date")
        kline = kline.groupby(lambda x: x // days).agg(self.rule)
        kline = kline.sort_values(by="trade_date", ascending=False)

        return kline

    # 获取基本数据的函数,传股票代码进来就获取一只股票的,不传参就获取所有股票的
    def get_base(self, ts_code=None):
        data = self.__loader.get_base_all()
        if ts_code:
            base = data[data["ts_code"] == ts_code]
        else:
            base = data
        return base

    # 整合 _get_kline_by_period 和 _get_kline_by_days
    def get_kline(self, ts_code, param):
        if isinstance(param, int):
            return self._get_kline_by_days(ts_code, days=param)
        else:
            return self._get_kline_by_period(ts_code, period=param)


datas = Cache()


if __name__ == "__main__":
    # 获取所有股票基本数据
    base_all = datas.get_base()
    # 获取单股基本数据
    base_all = datas.get_base("600571.sh")
    # 获取单股日线数据
    kline = datas.get_kline("600571.sh", "D")
    # 获取单股周线数据
    kline = datas.get_kline("600571.sh", "W")
    # 获取单股月线数据
    kline = datas.get_kline("600571.sh", "M")
    # 获取单股季线数据
    kline = datas.get_kline("600571.sh", "Q")
    # 获取单股年线数据
    kline = datas.get_kline("600571.sh", "A")
    # 获取单股以5天为周期的数据
    kline = datas.get_kline("600571.sh", 5)
    # 获取单股以30天为周期的数据
    kline = datas.get_kline("600571.sh", 30)
上一篇下一篇

猜你喜欢

热点阅读