hosseinmoein DataFrame自定义列类似spar

2022-02-10  本文已影响0人  FredricZhu

CMakeLists.txt


cmake_minimum_required(VERSION 2.6)
project(df_test)

set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)

if(APPLE)
    message(STATUS "This is Apple, do nothing.")
elseif(UNIX)
    message(STATUS "This is linux, set CMAKE_PREFIX_PATH.")
    set(CMAKE_PREFIX_PATH /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/share)
endif(APPLE)

add_definitions(-std=c++14)
add_definitions(-g)

find_package(ZLIB)

find_package(glog REQUIRED)
find_package(OpenCV REQUIRED )

find_package(Boost REQUIRED COMPONENTS
    system
    filesystem
    serialization
    program_options
    thread
    )

find_package(DataFrame REQUIRED)

if(APPLE)
    MESSAGE(STATUS "This is APPLE, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set INCLUDE_DIRS")
    set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include ${CMAKE_CURRENT_SOURCE_DIR}/../../)
endif(APPLE)


if(APPLE)
    MESSAGE(STATUS "This is APPLE, set LINK_DIRS")
    set(LINK_DIRS /usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set LINK_DIRS")
    set(LINK_DIRS ${Boost_INCLUDE_DIRS} /usr/local/lib /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/lib)
endif(APPLE)

if(APPLE)
    MESSAGE(STATUS "This is APPLE, set ODBC_LIBS")
    set(ODBC_LIBS iodbc iodbcinst)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set LINK_DIRS")
    set(ODBC_LIBS odbc odbcinst ltdl)
endif(APPLE)

include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})

file( GLOB test_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 


file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.h
    ${CMAKE_CURRENT_SOURCE_DIR}/../impl/*.cpp)

add_library(${PROJECT_NAME}_lib SHARED ${APP_SOURCES})
target_link_libraries(${PROJECT_NAME}_lib ${Boost_LIBRARIES} ZLIB::ZLIB libgtest.a glog::glog DataFrame::DataFrame ${OpenCV_LIBS})

foreach( test_file ${test_file_list} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${test_file})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file}  ${test_file})
    target_link_libraries(${file} ${PROJECT_NAME}_lib)
endforeach( test_file ${test_file_list})

df/df.h

#ifndef _FREDRIC_DF_H_
#define _FREDRIC_DF_H_

#include "json/json.hpp"

#include <DataFrame/DataFrame.h>
#include <DataFrame/DataFrameFinancialVisitors.h>
#include <DataFrame/DataFrameMLVisitors.h>
#include <DataFrame/DataFrameOperators.h>
#include <DataFrame/DataFrameStatsVisitors.h>

#include <vector>


using json = nlohmann::json;

// 主键为unsigned int的DataFrame
using CDataFrame = hmdf::StdDataFrame<unsigned int>;
// 主键为unsigned long的DataFrame
using CLDataFrame = hmdf::StdDataFrame<unsigned long>;

// DataFrame Iterator type
template <typename T>
using CDFIdxIteratorT = typename hmdf::StdDataFrame<T>::IndexVecType::iterator;

using CDataFrameIdxItType = CDFIdxIteratorT<unsigned int>;
using CLDataFrameIdxItType = CDFIdxIteratorT<unsigned long>;

using concat_policy = hmdf::concat_policy;
using join_policy = hmdf::join_policy;

const std::string Keys = "keys_";

struct df_op {
    static CDataFrame convert_json_to_df(const json& js, const std::vector<std::string>& pri_keys_);
    static std::vector<std::string> get_df_keys(const json& js);
    static CDataFrame remove_duplicate(const CDataFrame& df, const std::vector<std::string>& keys_);
    static bool write_to_csv(const CDataFrame& df, const std::string& csv_file_name);
}; 

#endif

df/impl/df.cpp

#include "df/df.h"

#include <glog/logging.h>

CDataFrame df_op::convert_json_to_df(const json& js, const std::vector<std::string>& pri_keys_) {
    CDataFrame::set_thread_level(10);
    CDataFrame df;

    unsigned long idx = 1ul;
    std::vector<unsigned long> ulidxs{};

    std::vector<std::string> keys_ = get_df_keys(js);
    if(keys_.size() == 0) {
        return df;
    }
    
    std::map<std::string, std::vector<json>> columns {};

    for (auto&& ele_js : js) {
        std::string key {};
        for(auto column_key: keys_) {
            if(columns.find(column_key) == columns.end()) {
                std::vector<json> tmp_v {ele_js[column_key]};
                columns[column_key] = std::move(tmp_v);
            } else {
                columns[column_key].emplace_back(std::move(ele_js[column_key]));
            }   
            // No primary keys specified, all columns are considered as primary keys
            if(pri_keys_.size() == 0) {
                key +=  ele_js[column_key].dump();
            } else {
                auto key_it_ = std::find(pri_keys_.begin(), pri_keys_.end(), column_key);
                if(key_it_ != pri_keys_.end()) {
                    key +=  ele_js[column_key].dump();
                }
            }
        }

        if(columns.find(Keys) == columns.end()) {
            std::vector<json> tmp_v {json(key)};
            columns[Keys] = std::move(tmp_v);
        } else {
            columns[Keys].emplace_back(std::move(json(key)));
        }

        ulidxs.emplace_back(idx++);
    }
    
    df.load_index(ulidxs.begin(), ulidxs.end());
    for(auto&& key: keys_) {
        df.load_column<json>(key.c_str(), {columns[key].begin(), columns[key].end()}, hmdf::nan_policy::pad_with_nans);
    }

    df.load_column<json>(Keys.c_str(), {columns[Keys].begin(), columns[Keys].end()}, hmdf::nan_policy::pad_with_nans);
    return df;
}

std::vector<std::string> df_op::get_df_keys(const json& js) {
    std::vector<std::string> keys_{};
    if(js.size() == 0) {
        LOG(ERROR) << "Json list size is zero, empty list!!" << "\n";
        return keys_;
    }

    auto ele_0 = js[0];
    for (auto &&begin = ele_0.begin(), end = ele_0.end(); begin != end; ++begin) {
        auto key = begin.key();
        keys_.emplace_back(key);
    }
    return keys_;
}

CDataFrame df_op::remove_duplicate(const CDataFrame& df, const std::vector<std::string>& keys_) {
    auto size_ = keys_.size();
    if(size_ == 1) {
        return df.remove_duplicates<json>(keys_[0].c_str(), false, hmdf::remove_dup_spec::keep_none);
    } else if(size_ == 2) {
        return df.remove_duplicates<json, json>(keys_[0].c_str(), keys_[1].c_str(), false, hmdf::remove_dup_spec::keep_none);
    } else if(size_ == 3) {
        return df.remove_duplicates<json, json, json>(keys_[0].c_str(), keys_[1].c_str(),keys_[2].c_str(), false, hmdf::remove_dup_spec::keep_none);
    } else if(size_ == 4) {
        return df.remove_duplicates<json, json, json, json>(keys_[0].c_str(), keys_[1].c_str(),keys_[2].c_str(), keys_[3].c_str(), false, hmdf::remove_dup_spec::keep_none);
    } else if(size_ == 5) {
        return df.remove_duplicates<json, json, json, json, json>(keys_[0].c_str(), keys_[1].c_str(),keys_[2].c_str(), keys_[3].c_str(), keys_[4].c_str(), false, hmdf::remove_dup_spec::keep_none);
    } else if(size_ == 6) {
        return df.remove_duplicates<json, json, json, json, json, json>(keys_[0].c_str(), keys_[1].c_str(),keys_[2].c_str(), keys_[3].c_str(), keys_[4].c_str(), keys_[5].c_str() , false, hmdf::remove_dup_spec::keep_none);
    } else {
        throw std::runtime_error("Not supported argument length, greater than 6!");
    }
}

bool df_op::write_to_csv(const CDataFrame& df, const std::string& csv_file_name) {
    std::fstream fs {csv_file_name, std::ios::out | std::ios::trunc};
    if(!fs.is_open()) {
        LOG(ERROR) << "Open file failed" << "\n";
        return false;
    }
    
    df.write<std::ostream, json>(fs, hmdf::io_format::csv2, true);
    fs.close();
    return true;
}

df_combine_col_test.cpp

#include "df/df.h"

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "json/json.hpp"

#include <fstream>
#include <sstream>

using json = nlohmann::json;

int main(int argc, char** argv) {
    FLAGS_log_dir = "./";
    FLAGS_alsologtostderr = true;
    google::InitGoogleLogging("./logs.log");
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}

static double my_max_3(double const& d1, double const& d2, double const& d3) {
    return std::max<double>({d1, d2, d3});
}

static double my_max_4(double const& d1, double const& d2, double const& d3, double const& d4) {
    return std::max<double>({d1, d2, d3, d4});
}

GTEST_TEST(DFCombineColTests, CombineThreeCol) {
    // 三列合并取最大值
    LOG(INFO) << "\nTesting combine() three cols ...\n";

    std::vector<unsigned long> idx1 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<unsigned long> idx2 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<unsigned long> idx3 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<unsigned long> idx4 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<double> d1 {
        1, 2, 100, 4, 5, 6, 7, 8, 9, 10, 11, 300, 13, 14, 15, 16, 17, 18, 19, 20, 200
    };
    std::vector<double> d2 {
        1, 2, 1000, 4, 5, 6, 7, 8, 9, 10, 11, 3000, 13, 14, 15, 16, 17, 18, 19, 20, 2000
    };
    std::vector<double> d3 {
        1, 2, 5000, 4, 5, 6, 7, 8, 9, 10, 11, 7000, 13, 14, 15, 16, 17, 18, 19, 20, 8000 
    };
    std::vector<double> d4 {
        1, 2, 10000, 4, 5, 6, 7, 8, 9, 10, 11, 20000, 13, 14, 15, 16, 17, 18, 19, 20, 30000
    };

    CLDataFrame df1, df2, df3, df4;

    df1.load_data(
        std::move(idx1),
        std::make_pair("d1_col", d1)
    );

    df2.load_data(
        std::move(idx2),
        std::make_pair("d1_col", d2)
    );

    df3.load_data(
        std::move(idx3),
        std::make_pair("d1_col", d3)
    );

    df4.load_data(
        std::move(idx4),
        std::make_pair("d1_col", d4)
    );

    df1.load_column("d2_col", std::move(df1.combine<double>("d1_col", df2, df3, my_max_3)));

    df1.write<std::ostream, double>(std::cout);

    std::vector<double> result {1,2,5000,4,5,6,7,8,9,10,11,7000,13,14,15,16,17,18,19,20,8000};
    ASSERT_EQ(result, df1.get_column<double>("d2_col"));
}

GTEST_TEST(DFCombineColTests, CombineFourCol) {
    // 4列合并取最大值
    LOG(INFO) << "\nTesting combine() four cols ...\n";

    std::vector<unsigned long> idx1 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<unsigned long> idx2 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<unsigned long> idx3 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<unsigned long> idx4 {
        123450, 123451, 123452, 123453, 123454, 123455, 123456,
        123457, 123458, 123459, 123460, 123461, 123462, 123466,
        123467, 123468, 123469, 123470, 123471, 123472, 123473
    };

    std::vector<double> d1 {
        1, 2, 100, 4, 5, 6, 7, 8, 9, 10, 11, 300, 13, 14, 15, 16, 17, 18, 19, 20, 200
    };
    std::vector<double> d2 {
        1, 2, 1000, 4, 5, 6, 7, 8, 9, 10, 11, 3000, 13, 14, 15, 16, 17, 18, 19, 20, 2000
    };
    std::vector<double> d3 {
        1, 2, 5000, 4, 5, 6, 7, 8, 9, 10, 11, 7000, 13, 14, 15, 16, 17, 18, 19, 20, 8000 
    };
    std::vector<double> d4 {
        1, 2, 10000, 4, 5, 6, 7, 8, 9, 10, 11, 20000, 13, 14, 15, 16, 17, 18, 19, 20, 30000
    };

    CLDataFrame df1, df2, df3, df4;

    df1.load_data(
        std::move(idx1),
        std::make_pair("d1_col", d1)
    );

    df2.load_data(
        std::move(idx2),
        std::make_pair("d1_col", d2)
    );

    df3.load_data(
        std::move(idx3),
        std::make_pair("d1_col", d3)
    );

    df4.load_data(
        std::move(idx4),
        std::make_pair("d1_col", d4)
    );

    df1.load_column("d2_col", std::move(df1.combine<double>("d1_col", df2, df3, df4, my_max_4)));

    df1.write<std::ostream, double>(std::cout);

    std::vector<double> result {1, 2, 10000, 4, 5, 6, 7, 8, 9, 10, 11, 20000, 13, 14, 15, 16, 17, 18, 19, 20, 30000};
    ASSERT_EQ(result, df1.get_column<double>("d2_col"));
}

程序输出如下,


image.png
上一篇 下一篇

猜你喜欢

热点阅读