julia GPU计算之CUDA编程——用日k线来计算周k线

2021-06-20  本文已影响0人  昵称违法

前言

本文用周k线的计算来举例,如何通过CUDA来实现GPU计算。

1、用日线来计算周线的方法:

(1)大盘(沪深300指数)如何计算周k线?

大盘不涉及停牌,所以直接按照week来划分数据,然后分别计算open、 high、 low、 close、 vol、 money几个值

(2)个股如何计算周线?

先调用大盘的周线信息,用大盘周线的起止日期去查询个股的日线数据,然后分别计算open、 high、 low、 close、 vol、 money以及停牌信息。

2、GPU计算相关

(1)julia中GPU计算的实现

a、直接用矢量化的计算

 a_gpu .+ b_gpu

b、用map reduce等类似的函数

c_gpu = CUDA.map(+,a_gpu,b_gpu)

c、自己编写cuda核函数
CPU和GPU之间数据的传输
用Array和CuArray

CPU在GPU上启动核函数后,如何等待GPU计算结束,并取回计算结果
用【sync】来实现

在GPU上创建的数组,操作完毕后,如何释放变量
用CUDA.unsafe_free!()来释放

blocks中线程数如何设置
每个block中thread最大数量不超过1024,具体查询CUDA官网,针对GPU计算能力值来设定

blocks块数如何设置
任务量 / 每块线程数,然后取ceil整数,就是blocks的数量

日线计算周线的CUDA核函数实现

"""
GPU核函数:生成周k线
#大盘信息
dapan_idx_d,
dapan_date_start_d,
dapan_date_end_d,
dapan_cnt_d,

#个股信息
arg_day_d,
arg_open_d,
arg_high_d,
arg_low_d,
arg_close_d,
arg_vol_d,
arg_money_d,

#返回值
rtn_idx_d,
rtn_day_d,
rtn_open_d,
rtn_high_d,
rtn_low_d,
rtn_close_d,
rtn_volume_d,
rtn_money_d,
rtn_cnt_dapan_d,
rtn_cnt_d,
"""
function gen_weekly_kline_kernel!(
    dapan_idx_d,
    dapan_date_start_d,
    dapan_date_end_d,
    dapan_cnt_d,
    arg_day_d,
    arg_open_d,
    arg_high_d,
    arg_low_d,
    arg_close_d,
    arg_vol_d,
    arg_money_d,
    rtn_idx_d,
    rtn_day_d,
    rtn_open_d,
    rtn_high_d,
    rtn_low_d,
    rtn_close_d,
    rtn_volume_d,
    rtn_money_d,
    rtn_cnt_dapan_d,
    rtn_cnt_d,
)
    index = (blockIdx().x - 1) * blockDim().x + threadIdx().x

    if index <= length(dapan_idx_d) #线程的数量多余任务的数量,防止越界

        #初始化
        rtn_idx_d[index] = dapan_idx_d[index]        #序号
        rtn_day_d[index] = dapan_date_end_d[index]   #周k线的日期
        rtn_cnt_dapan_d[index] = dapan_cnt_d[index]  #大盘在该周有几天交易日,用于跟rtn_cnt_d比对,看看个股在该周是否有停牌
        rtn_open_d[index] = -Inf      #开盘价    open = day_num == 1 的 open
        rtn_high_d[index] = -Inf      #最高价    取较大的值
        rtn_low_d[index] = Inf        #最低价    取较小的值
        rtn_close_d[index] = NaN      #收盘价    用当前close覆盖
        rtn_volume_d[index] = 0.0     #成交量    累加
        rtn_money_d[index] = 0.0      #成交资金  累加
        rtn_cnt_d[index] = 0          #交易日数  个股在每个周的交易日数量统计

        #遍历个股的数据,找到符合条件的,则进行处理
        for i = 1:length(arg_day_d)
            if dapan_date_start_d[index] <= arg_day_d[i] <= dapan_date_end_d[index]
                #交易日数量
                rtn_cnt_d[index] += 1

                #开盘价
                if rtn_cnt_d[index] == 1
                    rtn_open_d[index] = arg_open_d[i]
                end

                #最高价
                if arg_high_d[i] > rtn_high_d[index]
                    rtn_high_d[index] = arg_high_d[i]
                end

                #最低价
                if arg_low_d[i] < rtn_low_d[index]
                    rtn_low_d[index] = arg_low_d[i]
                end

                #收盘价,每天更新覆盖
                rtn_close_d[index] = arg_close_d[i]

                #成交量
                rtn_volume_d[index] += arg_vol_d[i]

                #成交资金
                rtn_money_d[index] += arg_money_d[i]
            end
        end
    else
        #
    end
    return nothing
end

3、计算时间对比

(1)用CPU计算周线的时间

一共4000多只个股,用单线程计算,耗时101秒

(2)用CUDA计算周线的时间

一共4000多只个股,用单线程计算,耗时11秒;用CPU多线程去调用核函数,耗时3秒。

(3)小技巧:如何在主存里面管理显存的变量?

下面代码功能:把GPU中的变量映射到内存中,用命名元组来管理

"""
在GPU中生成CuArray,用于缓存计算结果
"""
function gen_rtn_cu(N::Int64)
    return(
    idx = CUDA.ones(Int64,N),       #k线的序号
    day = CUDA.ones(Int64,N),       #日期
    open = CUDA.ones(Float64,N),    #开盘价
    high = CUDA.ones(Float64,N),    #最高价
    low = CUDA.ones(Float64,N),     #最低价
    close = CUDA.ones(Float64,N),   #收盘价
    volume = CUDA.ones(Float64,N),  #成交量
    money = CUDA.ones(Float64,N),   #成交额
    cnt_dapan = CUDA.ones(Int64,N), #本周大盘有多少个交易日
    cnt = CUDA.ones(Int64,N)        #本周个股有多少个交易日
    )
end

通过命名元组来获取GPU中的数组对象,并释放GPU空间

"""
释放gpu中的rtn_cu内存
"""
function free_rtn_cu(rtn)
    """
    如何编程自己循环捕捉对象,然后释放对象
    """
    rtn.idx |> CUDA.unsafe_free!       #k线的序号
    rtn.day |> CUDA.unsafe_free!       #日期
    rtn.open |> CUDA.unsafe_free!      #开盘价
    rtn.high |> CUDA.unsafe_free!      #最高价
    rtn.low |> CUDA.unsafe_free!       #最低价
    rtn.close |> CUDA.unsafe_free!     #收盘价
    rtn.volume |> CUDA.unsafe_free!    #成交量
    rtn.money |> CUDA.unsafe_free!     #成交额
    rtn.cnt_dapan |> CUDA.unsafe_free! #本周大盘有多少个交易日
    rtn.cnt |> CUDA.unsafe_free!       #本周个股有多少个交易日
    rtn = nothing

end

4、附录:代码

本文用到的k线数据是存于本地的,附录代码只供参考,不能直接运行。你可以用量化接口来读取数据,保存到本地,然后再进行实验。

(1)CPU计算的代码

#用CPU来生成k线,日线->周线,与GPU计算结果互相验证
using DataFrames
using DataFramesMeta
using CSV
using Dates
using BenchmarkTools


"""
用大盘的日k线,生成每周交易概要信息

输入:
df300 大盘的日k线

输出:
        k线序号         起始日期           结束日期           本周大盘交易日数量
DataFrame(idx = [...], date_start = [...], date_end = [...], cnt = [...])
"""
function gen_dapan_week_line_info(df300)
    #df300 |> display
    df300.week = df300.day .|> week

    #把week逐个计数,相同的week,它的idx相同
    df300.weekidx = let
        last_week = 0
        idx = 1
        idx_ary = []
        for (i, r) in zip(1:size(df300, 1), eachrow(df300))
            if i == 1
                idx = 1
                last_week = r.week
            elseif i > 1
                if last_week != r.week
                    idx += 1
                    last_week = r.week
                else
                    last_week = r.week
                end
            end
            push!(idx_ary, idx)
        end
        idx_ary
    end


    #生成周k线中每根k线的概要信息,用于计算详细的k线
    info_df = DataFrame(idx = [], date_start = [], date_end = [], cnt = [])
    gd = groupby(df300, "weekidx")

    for weekidx in df300.weekidx |> unique |> sort
        #println(weekidx)
        sub_df = gd[weekidx] |> df -> sort(df, "day")
        idx = weekidx
        sub_df = gd[weekidx] |> df -> sort(df, "day")
        date_start = first(sub_df, 1)[1, "day"]
        date_end = last(sub_df, 1)[1, "day"]
        cnt = size(sub_df, 1)

        push!(info_df, [idx, date_start, date_end, cnt])
    end

    return info_df
end

function  gen_dapan_week_line_info_test()
    df300 = CSV.read("data/399300.csv",DataFrame)
    df300 |> display
    info_df = gen_dapan_week_line_info(df300)
    info_df |> display
end

gen_dapan_week_line_info_test()

"""
帮个股生成周k线

输入:
info_df  大盘每周交易日信息
df       个股日k线

输出:
DataFrame
          序号        日期        开盘价       最高价        最低价      收盘价         成交量        成交金额        本周大盘交易日数量 本周个股交易日数量
--------------------------------------------------------------------------------------------------------------------------------------------------------
DataFrame(idx = [...],day = [...],open = [...],high = [...],low = [...],close = [...],volume = [...],money = [...],dapan_cnt=[...],cnt = [...])
"""
function gen_weekly_kline(info_df,df)
    rtn_df = DataFrame(idx = [],day = [],open = [],high = [],low = [],close = [],volume = [],money = [],dapan_cnt=[],cnt = [])

    for r in eachrow(info_df)
        idx = r.idx
        date_start = r.date_start
        date_end = r.date_end

        q_df = @linq df |> where(date_start .<= :day .<= date_end)
        if size(q_df,1) > 0
            sort!(q_df,"day")

            day = date_end
            open = q_df.open[1]
            high = q_df.high |> maximum
            low = q_df.low |> minimum
            close = q_df.close[end]
            volume = q_df.vol |> sum
            money = q_df.money |> sum
            dapan_cnt = r.cnt
            cnt = size(q_df,1)

            push!(rtn_df,[idx,day,open,high,low,close,volume,money,dapan_cnt,cnt])
        else  #本周个股停牌
            day = date_end
            open = 0.0
            high = 0.0
            low = 0.0
            close = 0.0
            volume = 0.0
            money = 0.0
            dapan_cnt = r.cnt
            cnt = 0
            push!(rtn_df,[idx,day,open,high,low,close,volume,money,dapan_cnt,cnt])
        end
    end
    rtn_df
end

function gen_weekly_kline_test()
    #读取大盘数据
    df300 = CSV.read("data/399300.csv",DataFrame)
    df300 |> display
    info_df = gen_dapan_week_line_info(df300)
    info_df |> display

    #读取个股信息
    df = CSV.read("data/股票/000001.csv",DataFrame)
    gen_weekly_kline(info_df,df)
end

gen_weekly_kline_test()

#读取所有的股票
stocks = readdir("data/股票") .|> item->split(item,".")[1]

"""
读取所有的股票k线,存到一个字典中备用
"""
function get_stocks_dict(stocks)
    mydict = Dict()
    for s in stocks
        mydict[s] = CSV.read("data/股票/$(s).csv",DataFrame)
    end
    mydict
end

stocks_dict = get_stocks_dict(stocks)

"""
计算多只个股的周线表
"""
function main_multi_stocks(stocks_dict)
    #读取大盘数据
    df300 = CSV.read("data/399300.csv",DataFrame)
    #df300 |> display
    info_df = gen_dapan_week_line_info(df300)
    #info_df |> display

    t1 = time()
    #逐只个股计算
    for (stock,df) in stocks_dict
        week_df = gen_weekly_kline(info_df,df)
        CSV.write("data/cpu周线/$(stock).csv",week_df)
    end
    t2 = time()

    println(t2-t1)
end

main_multi_stocks(stocks_dict)

#单线程 101s

(2)GPU计算的代码

#用GPU来生成k线,日线->周线
using DataFrames
using CSV
using XLSX
using Dates

using BenchmarkTools
using CUDA
using Test

if capability(device()) < v"7.0"
    println("你的显卡计算能力没达到7.0,退出程序!!")
    exit()
end


"""
【月份】和【日期】字符串格式化为两位
01 =  01
1  =  01

输入:"0"-"31" 或者 "00"-"31"
"""
function format_str_00(str::String)
    rtn = ""
    if length(str) == 1
        rtn = string(0,str)
    else
        rtn = str
    end
    return rtn
end

format_str_00("9")
format_str_00("12")


"""
把一个Date日期转成同形式字面量的Int,注意,月份和日期会补足成两位
例如:
__________________________
      Date   => Int64
==========================
"2021-01-09" => 20210109
==========================
"""
function get_int_day(mydate::Date)
    day_str = ""
    y = string(mydate |> year)
    m = string(mydate |> month) |> format_str_00
    d = string(mydate |> day) |> format_str_00
    parse(Int64, string(y, m, d))
    return parse(Int64, string(y, m, d))
end

get_int_day(Date("2021-01-09"))
get_int_day(Date("2021-2-1"))
get_int_day(Date("2021-12-14"))


"""
从日k线中,生成每根周线k线的【起始日期】和【终止日期】信息。作用:【个股日线生成周线的时候,用这两个日期,进行检索】

==输出==
     |序号  开始日期    结束日期   交易天数
Row  │ idx  date_start  date_end  cnt
     │ Any  Any         Any       Any
─────┼────────────────────────────────
   1 │ 1    20091203    20091204  2
   2 │ 2    20091207    20091211  5
   3 │ 3    20091214    20091218  5
   4 │ 4    20091221    20091225  5
   ⋮  │  ⋮       ⋮          ⋮       ⋮
 562 │ 562  20201116    20201120  5
 563 │ 563  20201123    20201127  5
 564 │ 564  20201130    20201202  3

"""
function gen_week_kline_info(df300)
    df = DataFrame(idx = [],date_start = [],date_end = [],cnt = [])
    idx = 0                 #每根k线的序号
    date_start = 19700101   #起始日期【用int来存储日期】
    date_end = 19700101     #结束日期
    cnt = 0                 #本周交易日的天数

    last_week = -1          #上一根k线的week值,计算周线的时候,week跳变是标志事件
    for i in 1:size(df300,1)
        current_week = df300[i,"week"]
        if current_week != last_week  #新的一周开始了
            #把上一周的信息假如到df中
            if idx > 0
                date_end = df300[i-1,"day"] |> get_int_day
                push!(df,[idx,date_start,date_end,cnt])
            end

            #新的一周的计数
            date_start = df300[i,"day"] |> get_int_day
            idx += 1
            cnt = 1
        else                        #同一个周
            cnt += 1
        end

        last_week = current_week

        if i == size(df300,1) #最后在生成一根k线
            date_end = df300[i,"day"] |> get_int_day
            push!(df,[idx,date_start,date_end,cnt])
        end
    end
    df
end

function gen_week_kline_info_test()
    #读取hs300大盘数据
    df300 = CSV.read("data/399300.csv",DataFrame)
    df300 |> display

    #计算week
    df300[!,"week"] = df300.day .|> week
    kline_info = gen_week_kline_info(df300)
    kline_info |> display
end

gen_week_kline_info_test()

"""
在GPU中生成大盘周线的概要信息

输入:
kline_info   周k线的概要信息df

输出:
命名元组:
(
idx = CuArray(kline_info.idx .|> Int64),               #序号
start_date = CuArray(kline_info.date_start .|> Int64), #开始日期
end_date =  CuArray(kline_info.date_end .|> Int64),    #结束日期
cnt = CuArray(kline_info.cnt .|> Int64)                #本周交易日天数
)

"""
function gen_cu_dapan_kline_info(kline_info)
    return(
    idx = CuArray(kline_info.idx .|> Int64),               #序号
    start_date = CuArray(kline_info.date_start .|> Int64), #开始日期
    end_date =  CuArray(kline_info.date_end .|> Int64),    #结束日期
    cnt = CuArray(kline_info.cnt .|> Int64)                #本周交易日天数
    )
end

"""
释放cu_dapan_kline_info在GPU上的内存
"""
function free_cu_dapan_kline_info(cu_info)
    cu_info.idx |> CUDA.unsafe_free!         #序号
    cu_info.start_date |> CUDA.unsafe_free!  #开始日期
    cu_info.end_date |> CUDA.unsafe_free!    #结束日期
    cu_info.cnt |> CUDA.unsafe_free!         #本周交易日天数

    cu_info = nothing
end

function gen_cu_dapan_kline_info_test()
    #读取日k线数据
    df300 = CSV.read("data/399300.csv",DataFrame)
    df300 |> display

    #计算week
    df300[!,"week"] = df300.day .|> week

    #生成k线概要信息
    kline_info = gen_week_kline_info(df300)
    kline_info |> display

    #在gpu中生成k线概要信息
    dapan_cu = gen_cu_dapan_kline_info(kline_info)
    dapan_cu |> display

    dapan_cu.idx |> typeof |> display

    dapan_cu
    free_cu_dapan_kline_info(dapan_cu)

    try
        dapan_cu.idx |> display
    catch e

        println("执行代码[dapan_cu.idx |> display] 出错,info = ",e)
    end

end

gen_cu_dapan_kline_info_test()

"""
在GPU中生成CuArray,用于缓存计算结果
"""
function gen_rtn_cu(N::Int64)
    return(
    idx = CUDA.ones(Int64,N),       #k线的序号
    day = CUDA.ones(Int64,N),       #日期
    open = CUDA.ones(Float64,N),    #开盘价
    high = CUDA.ones(Float64,N),    #最高价
    low = CUDA.ones(Float64,N),     #最低价
    close = CUDA.ones(Float64,N),   #收盘价
    volume = CUDA.ones(Float64,N),  #成交量
    money = CUDA.ones(Float64,N),   #成交额
    cnt_dapan = CUDA.ones(Int64,N), #本周大盘有多少个交易日
    cnt = CUDA.ones(Int64,N)        #本周个股有多少个交易日
    )
end

"""
释放gpu中的rtn_cu内存
"""
function free_rtn_cu(rtn)
    """
    如何编程自己循环捕捉对象,然后释放对象
    """
    rtn.idx |> CUDA.unsafe_free!       #k线的序号
    rtn.day |> CUDA.unsafe_free!       #日期
    rtn.open |> CUDA.unsafe_free!      #开盘价
    rtn.high |> CUDA.unsafe_free!      #最高价
    rtn.low |> CUDA.unsafe_free!       #最低价
    rtn.close |> CUDA.unsafe_free!     #收盘价
    rtn.volume |> CUDA.unsafe_free!    #成交量
    rtn.money |> CUDA.unsafe_free!     #成交额
    rtn.cnt_dapan |> CUDA.unsafe_free! #本周大盘有多少个交易日
    rtn.cnt |> CUDA.unsafe_free!       #本周个股有多少个交易日
    rtn = nothing

end

function gen_rtn_cu_test()
    #生成第一个rtn
    res1 = gen_rtn_cu(1024)
    res1.idx .+= 1
    res1.idx |> display

    #生成第二个rtn
    res2 = gen_rtn_cu(1024)
    res2.idx |> display

    #释放对象并测试
    free_rtn_cu(res1)
    try
        res1.idx |> println
    catch
        println("执行代码:res1.idx |> println ,因为res1已经销毁了!")
    end
end

gen_rtn_cu_test()


"""
把k线的数据载入到GPU中
====输入====
df k线数据df
====输出===
命名元组
(
day = CuArray(df.day .|> get_int_day)  #k线的日期
open = CuArray(df.open)    #开盘价
high = CuArray(df.high)    #最高价
low = CuArray(df.low)      #最低价
close = CuArray(df.close)  #收盘价
vol = CuArray(df.vol)      #成交量
money = CuArray(df.money)  #成交额
)

"""
function gen_cu_stock_kline(df)
    return(
    day = CuArray(df.day .|> get_int_day),  #k线的日期
    open = CuArray(df.open),    #开盘价
    high = CuArray(df.high),    #最高价
    low = CuArray(df.low),      #最低价
    close = CuArray(df.close),  #收盘价
    vol = CuArray(df.vol),      #成交量
    money = CuArray(df.money)  #成交额
    )
end

"""
释放cu_stock_kline对象在GPU上的内存
"""
function free_cu_stock_kline(stock_cu)
    stock_cu.day |> CUDA.unsafe_free!    #k线的日期
    stock_cu.open |> CUDA.unsafe_free!   #开盘价
    stock_cu.high |> CUDA.unsafe_free!   #最高价
    stock_cu.low |> CUDA.unsafe_free!    #最低价
    stock_cu.close |> CUDA.unsafe_free!  #收盘价
    stock_cu.vol |> CUDA.unsafe_free!    #成交量
    stock_cu.money |> CUDA.unsafe_free!  #成交额
    stock_cu = nothing
end

function gen_cu_stock_kline_test()
    df = CSV.read("data/股票/000001.csv",DataFrame)
    stock_cu = gen_cu_stock_kline(df)

    stock_cu.open |> display

    free_cu_stock_kline(stock_cu)
    try
        stock_cu.day |> display
    catch e
        println("执行代码【stock_cu.day |> display】出错",e)
    end

end

gen_cu_stock_kline_test()



"""
GPU核函数:生成周k线

dapan_idx_d,
dapan_date_start_d,
dapan_date_end_d,
dapan_cnt_d,

arg_day_d,
arg_open_d,
arg_high_d,
arg_low_d,
arg_close_d,
arg_vol_d,
arg_money_d,

rtn_idx_d,
rtn_day_d,
rtn_open_d,
rtn_high_d,
rtn_low_d,
rtn_close_d,
rtn_volume_d,
rtn_money_d,
rtn_cnt_dapan_d,
rtn_cnt_d,
"""
function gen_weekly_kline_kernel!(
    dapan_idx_d,
    dapan_date_start_d,
    dapan_date_end_d,
    dapan_cnt_d,
    arg_day_d,
    arg_open_d,
    arg_high_d,
    arg_low_d,
    arg_close_d,
    arg_vol_d,
    arg_money_d,
    rtn_idx_d,
    rtn_day_d,
    rtn_open_d,
    rtn_high_d,
    rtn_low_d,
    rtn_close_d,
    rtn_volume_d,
    rtn_money_d,
    rtn_cnt_dapan_d,
    rtn_cnt_d,
)
    index = (blockIdx().x - 1) * blockDim().x + threadIdx().x

    if index <= length(dapan_idx_d) #线程的数量多余任务的数量,防止越界

        #初始化
        rtn_idx_d[index] = dapan_idx_d[index]        #序号
        rtn_day_d[index] = dapan_date_end_d[index]   #周k线的日期
        rtn_cnt_dapan_d[index] = dapan_cnt_d[index]  #大盘在该周有几天交易日,用于跟rtn_cnt_d比对,看看个股在该周是否有停牌
        rtn_open_d[index] = -Inf      #开盘价    open = day_num == 1 的 open
        rtn_high_d[index] = -Inf      #最高价    取较大的值
        rtn_low_d[index] = Inf        #最低价    取较小的值
        rtn_close_d[index] = NaN      #收盘价    用当前close覆盖
        rtn_volume_d[index] = 0.0     #成交量    累加
        rtn_money_d[index] = 0.0      #成交资金  累加
        rtn_cnt_d[index] = 0          #交易日数  个股在每个周的交易日数量统计

        #遍历个股的数据,找到符合条件的,则进行处理
        for i = 1:length(arg_day_d)
            if dapan_date_start_d[index] <= arg_day_d[i] <= dapan_date_end_d[index]
                #交易日数量
                rtn_cnt_d[index] += 1

                #开盘价
                if rtn_cnt_d[index] == 1
                    rtn_open_d[index] = arg_open_d[i]
                end

                #最高价
                if arg_high_d[i] > rtn_high_d[index]
                    rtn_high_d[index] = arg_high_d[i]
                end

                #最低价
                if arg_low_d[i] < rtn_low_d[index]
                    rtn_low_d[index] = arg_low_d[i]
                end

                #收盘价,每天更新覆盖
                rtn_close_d[index] = arg_close_d[i]

                #成交量
                rtn_volume_d[index] += arg_vol_d[i]

                #成交资金
                rtn_money_d[index] += arg_money_d[i]
            end
        end
    else
        #
    end
    return nothing
end


"""
把GPU中的rtn复制到host中,组成一个df
"""
function gen_df_from_gpu(rtn_cu)
    #从GPU复制数据
    idx = rtn_cu.idx |> Array
    day = rtn_cu.day |> Array
    open = rtn_cu.open |> Array
    high = rtn_cu.high |> Array
    low = rtn_cu.low |> Array
    close = rtn_cu.close |> Array
    volume = rtn_cu.volume |> Array
    money = rtn_cu.money |> Array
    dapan_cnt = rtn_cu.cnt_dapan |> Array
    cnt = rtn_cu.cnt |> Array

    df = DataFrame(idx = idx,day = day,open = open,high = high,low = low,close = close,volume = volume,money = money,dapan_cnt=dapan_cnt,cnt = cnt)
end


function main2(stocks_dict)
    #加载大盘数据
    df300 = CSV.read("data/399300.csv", DataFrame)
    df300[!, "week"] = df300.day .|> week

    #生成k线概要信息
    dapan_kline_info = gen_week_kline_info(df300)

    #在gpu中生成k线概要信息
    dapan_cu = gen_cu_dapan_kline_info(dapan_kline_info)

    #free_cu_dapan_kline_info(dapan_cu)

    t1 = time()

    #Threads.@threads for stock in stocks_dict |> keys |> collect
    for stock in stocks_dict |> keys |> collect
        df = stocks_dict[stock]
        #println(stock)
        stock_cu = gen_cu_stock_kline(df)

        N = size(dapan_kline_info, 1)
        rtn_cu = gen_rtn_cu(N)

        #启动CUDA内核进行计算
        numblocks = ceil(Int, N / 256)
        CUDA.@sync begin
            @cuda threads = 256 blocks = numblocks gen_weekly_kline_kernel!(
                dapan_cu.idx,
                dapan_cu.start_date,
                dapan_cu.end_date,
                dapan_cu.cnt,
                stock_cu.day,
                stock_cu.open,
                stock_cu.high,
                stock_cu.low,
                stock_cu.close,
                stock_cu.vol,
                stock_cu.money,
                rtn_cu.idx,
                rtn_cu.day,
                rtn_cu.open,
                rtn_cu.high,
                rtn_cu.low,
                rtn_cu.close,
                rtn_cu.volume,
                rtn_cu.money,
                rtn_cu.cnt_dapan,
                rtn_cu.cnt,
            )

            #从GPU上获取数据
            df = gen_df_from_gpu(rtn_cu)
            #CSV.write("data/gpu周线/$(stock).csv", df)

            #清空gpu变量
            #free_cu_dapan_kline_info(dapan_cu)
            free_cu_stock_kline(stock_cu)
            free_rtn_cu(rtn_cu)
        end
    end

    t2 = time()

    println(t2 - t1)

end

#读取所有的股票
stocks = readdir("data/股票") .|> item->split(item,".")[1]

"""
读取所有的股票k线,存到一个字典中备用
"""
function get_stocks_dict(stocks)
    mydict = Dict()
    for s in stocks
        mydict[s] = CSV.read("data/股票/$(s).csv",DataFrame)
    end
    mydict
end

stocks_dict = get_stocks_dict(stocks)

main2(stocks_dict)
上一篇下一篇

猜你喜欢

热点阅读