Apache Arrow基础数据类型

2024-01-29  本文已影响0人  FredricZhu

本例其实是Apache Arrow的Basic Data Types一文的代码。
主要介绍基础的列向量arrow::Array的构造。
可以使用conan基础配置完成,但是为了后续课程的流畅,请修改conan配置文件中的conanfile.py配置,修改为如下,
主要将default_options中的protobuf, grpc,filesystem,compute,json,dataset都打开。其中dataset和filesystem是精华,没有这两个模块,使用arrow毫无意义。
filesystem用于处理各种文件系统,如Local, HDFS, S3等,要处理S3的需要单独打开with_s3开关。
然后会出现一些错误,逐一解决就可以了。建议使用自带conan的镜像一次编译,到处运行。
dataset可以用于像Python中的pandas一样处理数据,filter,custom function, groupby等。

 default_options = {
        "shared": False,
        "fPIC": True,
        "gandiva": False,
        "parquet": True,
        "skyhook": False,
        "substrait": False,
        "acero": True,
        "cli": False,
        "compute": True,
        "dataset_modules": True,
        "deprecated": True,
        "encryption": False,
        "filesystem_layer": True,
        "hdfs_bridgs": True,
        "plasma": "deprecated",
        "simd_level": "default",
        "runtime_simd_level": "max",
        "with_backtrace": False,
        "with_boost": "auto",
        "with_brotli": False,
        "with_bz2": True,
        "with_csv": True,
        "with_cuda": False,
        "with_flight_rpc": True,
        "with_flight_sql": True,
        "with_gcs": False,
        "with_gflags": True,
        "with_jemalloc": "auto",
        "with_mimalloc": False,
        "with_glog": True,
        "with_grpc": True,
        "with_json": True,
        "with_thrift": True,
        "with_llvm": "auto",
        "with_openssl": "auto",
        "with_opentelemetry": False,
        "with_orc": False,
        "with_protobuf": True,
        "with_re2": True,
        "with_s3": False,
        "with_utf8proc": True,
        "with_lz4": False,
        "with_snappy": True,
        "with_zlib": True,
        "with_zstd": False,
    }

conanfile.txt

[requires]
boost/1.72.0
arrow/15.0.0

[generators]
cmake

CMakeLists.txt

cmake_minimum_required(VERSION 3.3)

project(1_basic_data_types)

set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")

set ( CMAKE_CXX_FLAGS "-pthread")
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)

include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()

include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})

file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 

foreach( main_file ${main_file_list} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file}  ${main_file})
    target_link_libraries(${file} ${CONAN_LIBS}   pthread)
endforeach( main_file ${main_file_list})

int8_arr.cpp

#include <iostream>
#include <arrow/api.h>

arrow::Status run_main() {
    arrow::Int8Builder int8_builder;
    int8_t days_raw[5] = {1, 12, 17, 23, 28};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw, 5));
    std::shared_ptr<arrow::Array> days;
    ARROW_ASSIGN_OR_RAISE(days, int8_builder.Finish());

    int8_t months_raw[5] = {1, 3, 5, 7, 1};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw, 5));
    std::shared_ptr<arrow::Array> months;
    ARROW_ASSIGN_OR_RAISE(months, int8_builder.Finish());

    std::cout << "Print days data: " << std::endl;
    auto days_int8 = std::static_pointer_cast<arrow::Int8Array>(days);
    for(int i=0; i<days_int8->length(); ++i) {
        std::cout << (int)days_int8->Value(i) << std::endl;
    }

    std::cout << "Print month data: " << std::endl;
    auto months_int8 = std::static_pointer_cast<arrow::Int8Array>(months);
    for(int i=0; i<months_int8->length(); ++i) {
        std::cout << (int)months_int8->Value(i) << std::endl;
    }

    return arrow::Status::OK(); 
}

int main(int argc, char* argv[]) {
    arrow::Status st = run_main();
    if(!st.ok()) { 
        std::cerr << st << std::endl;
        return EXIT_FAILURE;
    }   

    return EXIT_SUCCESS;
}

int16_arr.cpp

#include <iostream>
#include <arrow/api.h>

arrow::Status run_main() {
    arrow::Int16Builder int16_builder;
    int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
    ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw, 5));
    std::shared_ptr<arrow::Array> years;
    ARROW_ASSIGN_OR_RAISE(years, int16_builder.Finish());

    std::cout << "Print out int16 Array: " << std::endl;
    auto years_int16 = std::static_pointer_cast<arrow::Int16Array>(years);
    for(int i=0; i<years_int16->length(); ++i) {
        std::cout << (int)years_int16->Value(i) << std::endl;
    }
    return arrow::Status::OK();
}

int main(int argc, char* argv[]) {
    arrow::Status st = run_main();
    if(!st.ok()) {
        std::cerr << st << std::endl;
        return EXIT_FAILURE;
    }

    return EXIT_SUCCESS;
}

record_batch_demo.cpp

#include <iostream>
#include <arrow/api.h>

arrow::Status run_main() {
    arrow::Int8Builder int8_builder;
    int8_t days_raw[5] = {1, 12, 17, 23, 28};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw, 5));
    std::shared_ptr<arrow::Array> days;
    ARROW_ASSIGN_OR_RAISE(days, int8_builder.Finish());

    int8_t months_raw[5] = {1, 3, 5, 7, 1};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw, 5));
    std::shared_ptr<arrow::Array> months;
    ARROW_ASSIGN_OR_RAISE(months, int8_builder.Finish());

    arrow::Int16Builder int16_builder;
    int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
    ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw, 5));
    std::shared_ptr<arrow::Array> years;
    ARROW_ASSIGN_OR_RAISE(years, int16_builder.Finish());

    // 定义schema
    std::shared_ptr<arrow::Field> field_day, field_month, field_year;
    std::shared_ptr<arrow::Schema> schema;

    field_day = arrow::field("Day", arrow::int8());
    field_month = arrow::field("Month", arrow::int8());
    field_year = arrow::field("Year", arrow::int16());

    schema = arrow::schema({field_day, field_month, field_year});

    // 定义RecordBatch
    std::shared_ptr<arrow::RecordBatch> rbatch;
    rbatch = arrow::RecordBatch::Make(schema, days->length(), {days, months, years});
    std::cout << rbatch->ToString() << std::endl;
    return arrow::Status::OK();
}

int main(int argc, char* argv[]) {
    auto st = run_main();
    if(!st.ok()) {
        std::cerr << st << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

table_demo.cpp

#include <iostream>
#include <arrow/api.h>

arrow::Status run_main() {
    arrow::Int8Builder int8_builder;
    int8_t days_raw[5] = {1, 12, 17, 23, 28};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw, 5));
    std::shared_ptr<arrow::Array> days;
    ARROW_ASSIGN_OR_RAISE(days, int8_builder.Finish());

    int8_t months_raw[5] = {1, 3, 5, 7, 1};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw, 5));
    std::shared_ptr<arrow::Array> months;
    ARROW_ASSIGN_OR_RAISE(months, int8_builder.Finish());

    arrow::Int16Builder int16_builder;
    int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
    ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw, 5));
    std::shared_ptr<arrow::Array> years;
    ARROW_ASSIGN_OR_RAISE(years, int16_builder.Finish());

    // 定义schema
    std::shared_ptr<arrow::Field> field_day, field_month, field_year;
    std::shared_ptr<arrow::Schema> schema;

    field_day = arrow::field("Day", arrow::int8());
    field_month = arrow::field("Month", arrow::int8());
    field_year = arrow::field("Year", arrow::int16());

    schema = arrow::schema({field_day, field_month, field_year});

    // 定义RecordBatch
    std::shared_ptr<arrow::RecordBatch> rbatch;
    rbatch = arrow::RecordBatch::Make(schema, days->length(), {days, months, years});
    std::cout << rbatch->ToString() << std::endl;

    int8_t days_raw2[5] = {6, 12, 3, 30, 22};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw2, 5));
    std::shared_ptr<arrow::Array> days2;
    ARROW_ASSIGN_OR_RAISE(days2, int8_builder.Finish());

    int8_t months_raw2[5] = {5, 4, 11, 3, 2};
    ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw2, 5));
    std::shared_ptr<arrow::Array> months2;
    ARROW_ASSIGN_OR_RAISE(months2, int8_builder.Finish());

    int16_t years_raw2[5] = {1980, 2001, 1915, 2020, 1996};
    ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw2, 5));
    std::shared_ptr<arrow::Array> years2;
    ARROW_ASSIGN_OR_RAISE(years2, int16_builder.Finish());


    // 定义chunked array
    // ChunkedArrays let us have a list of arrays, which aren't contiguous
    // with each other. First, we get a vector of arrays.
    arrow::ArrayVector day_vecs {days, days2};
    std::shared_ptr<arrow::ChunkedArray> day_chunks = std::make_shared<arrow::ChunkedArray>(day_vecs);
    arrow::ArrayVector month_vecs {months, months2};
    std::shared_ptr<arrow::ChunkedArray> month_chunks = std::make_shared<arrow::ChunkedArray>(month_vecs);
    arrow::ArrayVector year_vecs {years, years2};
    std::shared_ptr<arrow::ChunkedArray> year_chunks = std::make_shared<arrow::ChunkedArray>(year_vecs);

    std::cout << "Print out table: " << std::endl;
    // 定义table
    std::shared_ptr<arrow::Table> table;
    table = arrow::Table::Make(schema, {day_chunks, month_chunks, year_chunks}, 10);
    std::cout << table->ToString() << std::endl;
    return arrow::Status::OK();
}

int main(int argc, char* argv[]) {
    auto st = run_main();
    if(!st.ok()) {
        std::cerr << st << std::endl;
        return EXIT_FAILURE;
    }

    return EXIT_SUCCESS;
}

程序输出如下,


image.png
image.png
上一篇 下一篇

猜你喜欢

热点阅读