Apache Arrow基础数据类型
2024-01-29 本文已影响0人
FredricZhu
本例其实是Apache Arrow的Basic Data Types一文的代码。
主要介绍基础的列向量arrow::Array的构造。
可以使用conan基础配置完成,但是为了后续课程的流畅,请修改conan配置文件中的conanfile.py配置,修改为如下,
主要将default_options中的protobuf, grpc,filesystem,compute,json,dataset都打开。其中dataset和filesystem是精华,没有这两个模块,使用arrow毫无意义。
filesystem用于处理各种文件系统,如Local, HDFS, S3等,要处理S3的需要单独打开with_s3开关。
然后会出现一些错误,逐一解决就可以了。建议使用自带conan的镜像一次编译,到处运行。
dataset可以用于像Python中的pandas一样处理数据,filter,custom function, groupby等。
default_options = {
"shared": False,
"fPIC": True,
"gandiva": False,
"parquet": True,
"skyhook": False,
"substrait": False,
"acero": True,
"cli": False,
"compute": True,
"dataset_modules": True,
"deprecated": True,
"encryption": False,
"filesystem_layer": True,
"hdfs_bridgs": True,
"plasma": "deprecated",
"simd_level": "default",
"runtime_simd_level": "max",
"with_backtrace": False,
"with_boost": "auto",
"with_brotli": False,
"with_bz2": True,
"with_csv": True,
"with_cuda": False,
"with_flight_rpc": True,
"with_flight_sql": True,
"with_gcs": False,
"with_gflags": True,
"with_jemalloc": "auto",
"with_mimalloc": False,
"with_glog": True,
"with_grpc": True,
"with_json": True,
"with_thrift": True,
"with_llvm": "auto",
"with_openssl": "auto",
"with_opentelemetry": False,
"with_orc": False,
"with_protobuf": True,
"with_re2": True,
"with_s3": False,
"with_utf8proc": True,
"with_lz4": False,
"with_snappy": True,
"with_zlib": True,
"with_zstd": False,
}
conanfile.txt
[requires]
boost/1.72.0
arrow/15.0.0
[generators]
cmake
CMakeLists.txt
cmake_minimum_required(VERSION 3.3)
project(1_basic_data_types)
set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")
set ( CMAKE_CXX_FLAGS "-pthread")
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()
include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})
file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( main_file ${main_file_list} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${main_file})
target_link_libraries(${file} ${CONAN_LIBS} pthread)
endforeach( main_file ${main_file_list})
int8_arr.cpp
#include <iostream>
#include <arrow/api.h>
arrow::Status run_main() {
arrow::Int8Builder int8_builder;
int8_t days_raw[5] = {1, 12, 17, 23, 28};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw, 5));
std::shared_ptr<arrow::Array> days;
ARROW_ASSIGN_OR_RAISE(days, int8_builder.Finish());
int8_t months_raw[5] = {1, 3, 5, 7, 1};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw, 5));
std::shared_ptr<arrow::Array> months;
ARROW_ASSIGN_OR_RAISE(months, int8_builder.Finish());
std::cout << "Print days data: " << std::endl;
auto days_int8 = std::static_pointer_cast<arrow::Int8Array>(days);
for(int i=0; i<days_int8->length(); ++i) {
std::cout << (int)days_int8->Value(i) << std::endl;
}
std::cout << "Print month data: " << std::endl;
auto months_int8 = std::static_pointer_cast<arrow::Int8Array>(months);
for(int i=0; i<months_int8->length(); ++i) {
std::cout << (int)months_int8->Value(i) << std::endl;
}
return arrow::Status::OK();
}
int main(int argc, char* argv[]) {
arrow::Status st = run_main();
if(!st.ok()) {
std::cerr << st << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
int16_arr.cpp
#include <iostream>
#include <arrow/api.h>
arrow::Status run_main() {
arrow::Int16Builder int16_builder;
int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw, 5));
std::shared_ptr<arrow::Array> years;
ARROW_ASSIGN_OR_RAISE(years, int16_builder.Finish());
std::cout << "Print out int16 Array: " << std::endl;
auto years_int16 = std::static_pointer_cast<arrow::Int16Array>(years);
for(int i=0; i<years_int16->length(); ++i) {
std::cout << (int)years_int16->Value(i) << std::endl;
}
return arrow::Status::OK();
}
int main(int argc, char* argv[]) {
arrow::Status st = run_main();
if(!st.ok()) {
std::cerr << st << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
record_batch_demo.cpp
#include <iostream>
#include <arrow/api.h>
arrow::Status run_main() {
arrow::Int8Builder int8_builder;
int8_t days_raw[5] = {1, 12, 17, 23, 28};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw, 5));
std::shared_ptr<arrow::Array> days;
ARROW_ASSIGN_OR_RAISE(days, int8_builder.Finish());
int8_t months_raw[5] = {1, 3, 5, 7, 1};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw, 5));
std::shared_ptr<arrow::Array> months;
ARROW_ASSIGN_OR_RAISE(months, int8_builder.Finish());
arrow::Int16Builder int16_builder;
int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw, 5));
std::shared_ptr<arrow::Array> years;
ARROW_ASSIGN_OR_RAISE(years, int16_builder.Finish());
// 定义schema
std::shared_ptr<arrow::Field> field_day, field_month, field_year;
std::shared_ptr<arrow::Schema> schema;
field_day = arrow::field("Day", arrow::int8());
field_month = arrow::field("Month", arrow::int8());
field_year = arrow::field("Year", arrow::int16());
schema = arrow::schema({field_day, field_month, field_year});
// 定义RecordBatch
std::shared_ptr<arrow::RecordBatch> rbatch;
rbatch = arrow::RecordBatch::Make(schema, days->length(), {days, months, years});
std::cout << rbatch->ToString() << std::endl;
return arrow::Status::OK();
}
int main(int argc, char* argv[]) {
auto st = run_main();
if(!st.ok()) {
std::cerr << st << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
table_demo.cpp
#include <iostream>
#include <arrow/api.h>
arrow::Status run_main() {
arrow::Int8Builder int8_builder;
int8_t days_raw[5] = {1, 12, 17, 23, 28};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw, 5));
std::shared_ptr<arrow::Array> days;
ARROW_ASSIGN_OR_RAISE(days, int8_builder.Finish());
int8_t months_raw[5] = {1, 3, 5, 7, 1};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw, 5));
std::shared_ptr<arrow::Array> months;
ARROW_ASSIGN_OR_RAISE(months, int8_builder.Finish());
arrow::Int16Builder int16_builder;
int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw, 5));
std::shared_ptr<arrow::Array> years;
ARROW_ASSIGN_OR_RAISE(years, int16_builder.Finish());
// 定义schema
std::shared_ptr<arrow::Field> field_day, field_month, field_year;
std::shared_ptr<arrow::Schema> schema;
field_day = arrow::field("Day", arrow::int8());
field_month = arrow::field("Month", arrow::int8());
field_year = arrow::field("Year", arrow::int16());
schema = arrow::schema({field_day, field_month, field_year});
// 定义RecordBatch
std::shared_ptr<arrow::RecordBatch> rbatch;
rbatch = arrow::RecordBatch::Make(schema, days->length(), {days, months, years});
std::cout << rbatch->ToString() << std::endl;
int8_t days_raw2[5] = {6, 12, 3, 30, 22};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(days_raw2, 5));
std::shared_ptr<arrow::Array> days2;
ARROW_ASSIGN_OR_RAISE(days2, int8_builder.Finish());
int8_t months_raw2[5] = {5, 4, 11, 3, 2};
ARROW_RETURN_NOT_OK(int8_builder.AppendValues(months_raw2, 5));
std::shared_ptr<arrow::Array> months2;
ARROW_ASSIGN_OR_RAISE(months2, int8_builder.Finish());
int16_t years_raw2[5] = {1980, 2001, 1915, 2020, 1996};
ARROW_RETURN_NOT_OK(int16_builder.AppendValues(years_raw2, 5));
std::shared_ptr<arrow::Array> years2;
ARROW_ASSIGN_OR_RAISE(years2, int16_builder.Finish());
// 定义chunked array
// ChunkedArrays let us have a list of arrays, which aren't contiguous
// with each other. First, we get a vector of arrays.
arrow::ArrayVector day_vecs {days, days2};
std::shared_ptr<arrow::ChunkedArray> day_chunks = std::make_shared<arrow::ChunkedArray>(day_vecs);
arrow::ArrayVector month_vecs {months, months2};
std::shared_ptr<arrow::ChunkedArray> month_chunks = std::make_shared<arrow::ChunkedArray>(month_vecs);
arrow::ArrayVector year_vecs {years, years2};
std::shared_ptr<arrow::ChunkedArray> year_chunks = std::make_shared<arrow::ChunkedArray>(year_vecs);
std::cout << "Print out table: " << std::endl;
// 定义table
std::shared_ptr<arrow::Table> table;
table = arrow::Table::Make(schema, {day_chunks, month_chunks, year_chunks}, 10);
std::cout << table->ToString() << std::endl;
return arrow::Status::OK();
}
int main(int argc, char* argv[]) {
auto st = run_main();
if(!st.ok()) {
std::cerr << st << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
程序输出如下,
image.png
image.png