C++11 Snowflake简单查询器封装[ODBC]

2021-08-14  本文已影响0人  FredricZhu

把昨天那个简单封装了一下,作为一个万能的查询器吧,可以做简单的SQL查询。
因为公司的生产环境对测试是只读的,所以没做 insert和update封装。
如果有需求的话,可以自己加。
程序的结构和昨天一样,但是可以支持各种各样的查询了。


image.png

CMakeLists.txt


cmake_minimum_required(VERSION 2.6)
project(ref_demo2_test)

add_definitions(-std=c++14)
add_definitions(-g)



find_package(Boost REQUIRED COMPONENTS
    system
    filesystem
    serialization
    program_options
    thread
    )

include_directories(${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../)

LINK_DIRECTORIES(/usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)

file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.h
    ${CMAKE_CURRENT_SOURCE_DIR}/../impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
        file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
    
        string(FIND "${filename}"  "test.cpp" "TEMP")
    if( NOT "${TEMP}" STREQUAL "-1" )
        string(REPLACE ".cpp" "" file ${filename})
        add_executable(${file}  ${APP_SOURCES})
        target_link_libraries(${file} ${Boost_LIBRARIES})
        target_link_libraries(${file}  ssl crypto libgtest.a libgtest_main.a libgmock.a iodbc iodbcinst pthread)
    endif()
endforeach( sourcefile ${APP_SOURCES})

sf_db.h

#ifndef _FREDRIC_SF_DB_H_
#define _FREDRIC_SF_DB_H_

#include "sf_odbc.h"
#include "sql.h"
#include "sqlext.h"

#include <vector>
#include <string>
#include <iostream>
#include <map>

#include <cassert>

struct DBAuthorization {
    std::string dataSource{};
    std::string password{};
};

/**
 * iodbc
 * /usr/local/iODBC/include
 * /usr/local/iODBC/lib
 * */

struct SFConnection {

    using DB_RETURN_TYPE = std::vector<std::map<std::string, std::string>>;

    SFConnection(DBAuthorization auth);
    ~SFConnection();

    /** 
     * 执行查询的函数
     * @param query_string 原始查询字符串,注意这里不支持select *, 一般也不会用select * 因为性能比较差
     * @return 返回查询的结果集
     * 
     * example:
     *  select product_key, device_code from dim_product_v1 limit 2 将会返回
     *  std::vector { std::map
     *      {"product_key": "111", "device_code": "ios-all"},
     *      {"product_key": "111", "device_code": "google-play"}
     *  }
     */
    DB_RETURN_TYPE exec_query(std::string query_string);

    /**
     * 从Query String里面解析出要查询哪些字段的函数
     * @param query_string 原始查询字符串,注意这里不支持select *, 一般也不会用select * 因为性能比较差
     * @return 返回解析出的查询字段
     * 
     * example:
     *  select product_key, device_code from dim_product_v1 将会解析出 std::vector {"product_key", "device_code"}
     */
    std::vector<std::string> parse_fields_from_query_string(std::string query_string);

    private:
        
        DBAuthorization m_auth{};
        SQLHENV henv;
        SQLHDBC hdbc;
        SQLRETURN henv_ret{-1};
        SQLRETURN hdbc_conn_ret{-1};
        SQLRETURN hdbc_ret{-1};
};


#endif

sf_db.cpp

#include "sf_db/sf_db.h"

#include <algorithm>
#include <sstream>
#include <boost/algorithm/string.hpp>

void checkError(SQLRETURN retcode, std::string msg) {
    if ((retcode != SQL_SUCCESS) && (retcode = !SQL_SUCCESS_WITH_INFO)) {
        std::cerr << msg << std::endl;
    }
    assert(retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO);
}

bool checkErrorCode(SQLRETURN retcode) {
    if ((retcode != SQL_SUCCESS) && (retcode = !SQL_SUCCESS_WITH_INFO)) {
        return false;
    }
    return true;
}

SFConnection::SFConnection(DBAuthorization auth) : m_auth(auth) {
    // Allocate environment handle
    henv_ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
    checkError(henv_ret, "SQLAllocHandle failed");
    // Set the ODBC version environment attribute

    henv_ret =
        SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);

    checkError(henv_ret, "SQLSetEnvAttr failed");
    // Allocate connection handle

    hdbc_ret = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
    checkError(hdbc_ret, "SQLAllocHandle failed");
    // Set login timeout to 5 seconds

    SQLSetConnectAttr(hdbc, SQL_LOGIN_TIMEOUT, (SQLPOINTER)5, 0);

    // Connect to data source
    hdbc_conn_ret = SQLConnect(hdbc, (SQLCHAR *)m_auth.dataSource.data(),
                               SQL_NTS, (SQLCHAR *)NULL, 0,
                               (SQLCHAR *)m_auth.password.data(), SQL_NTS);

    checkError(hdbc_conn_ret, "SQLConnect connect to product_odbc failed");

    std::cout << "connect to product_odbc success!" << std::endl;
}

std::vector<std::string> SFConnection::parse_fields_from_query_string(
    std::string query_string) {

    std::vector<std::string> result {};
    std::transform(query_string.begin(), query_string.end(),
                   query_string.begin(),
                   [](unsigned char c) { return std::tolower(c); });
    auto start = query_string.find("select") + 6;
    auto end = query_string.find("from") - 1;
    auto fields_string = query_string.substr(start, end - start);
    std::stringstream ss(fields_string);
    std::string field{};
    while(std::getline(ss, field, ',')) {
        boost::trim(field);
        result.emplace_back(std::move(field));
    }
    return std::move(result);
}

SFConnection::DB_RETURN_TYPE SFConnection::exec_query(
    std::string query_string) {
    DB_RETURN_TYPE results;

    std::vector<std::string> field_names = parse_fields_from_query_string(query_string);

    SQLHSTMT hstmt;

    std::vector<std::array<SQLCHAR, 2000>> field_values {};
    std::array<SQLCHAR, 2000> field_value {};
    field_values.resize(field_names.size(), field_value);

    std::vector<SQLLEN> field_lens {};
    field_lens.resize(field_names.size(), 0);

    // Allocate statement handle

    auto hstmt_ret = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);

    if (!checkErrorCode(hstmt_ret)) {
        std::cerr << "SQLAllocHandle statement failed!" << std::endl;
        return results;
    }

    auto retcode =
        SQLExecDirect(hstmt, (SQLCHAR *)query_string.data(), SQL_NTS);

    if (!checkErrorCode(retcode)) {
        std::cerr << "SQLExecDirect failed!" << std::endl;
        goto ERROR_;
    }

    for(int i=0; i<field_names.size(); ++i) {
        SQLCHAR* tmpFieldValue = field_values[i].data();
        retcode = SQLBindCol(hstmt, i+1, SQL_C_CHAR, tmpFieldValue, 2000, &field_lens[i]);
    }

    // Fetch and print each row of data until
    // SQL_NO_DATA returned.
    for (int i = 0;; i++) {
        retcode = SQLFetch(hstmt);
        if (retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO) {
            std::cout << i+1 << " ";
            std::map<std::string, std::string> tmp_fields{};

            for(int j=0; j<field_names.size(); ++j) {
                std::string temp_field((const char*)field_values[j].data(), field_lens[j]);
                std::cout << temp_field << " ";
                tmp_fields[field_names[j]] = temp_field;
            }
            std::cout << std::endl;

            results.emplace_back(std::move(tmp_fields));

        } else {
            if (retcode != SQL_NO_DATA) {
                std::cout << "SQLFetch Error, error code: " << retcode
                          << std::endl;
                break;
            } else {
                break;
            }
        }
    }
ERROR_:
    std::cout << "Free statement" << std::endl;
    SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
    return std::move(results);
}

SFConnection::~SFConnection() {
    if (checkErrorCode(hdbc_conn_ret)) {
        SQLDisconnect(hdbc);
        std::cout << "Free connection" << std::endl;
    }

    if (checkErrorCode(hdbc_ret)) {
        SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
        std::cout << "Free dbc" << std::endl;
    }

    if (checkErrorCode(henv_ret)) {
        SQLFreeHandle(SQL_HANDLE_ENV, henv);
        std::cout << "Free env" << std::endl;
    }
}

sf_db_test.cpp

#include "sf_db/sf_db.h"

#include <array>
#include <gtest/gtest.h>

GTEST_TEST(SFDBTests, TestExecQuery) {
    DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
    SFConnection sf_conn {auth};
    std::string query_string = "select  product_key,change_time,EVENT_TYPE_NAME, change_column, old_value, new_value,meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where  event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 10;";

    auto results = sf_conn.exec_query(query_string);
    ASSERT_EQ(10, results.size());

    auto results2 = sf_conn.exec_query(query_string);
    ASSERT_EQ(10, results2.size());
}


GTEST_TEST(SFDBTests, TestExecQueryAndPrintValue) {
    DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
    SFConnection sf_conn {auth};
    std::string query_string = "select  product_key,change_time,event_type_name, change_column, old_value, new_value,meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where  event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 2;";

    auto results = sf_conn.exec_query(query_string);
    ASSERT_EQ(2, results.size());
    std::cout << "product_key,  change_time,  event_type_name,  change_column,  old_value,  new_value,  meta" 
        << std::endl;
    
    for(auto&& result: results) {
        std::cout << " " << result["product_key"] << " " << result["change_time"] <<
            " " << result["event_type_name"] << " " << result["change_column"] <<
            " " << result["old_value"] << " " << result["new_value"] << 
            " " << result["meta"] << std::endl;
    }
}


GTEST_TEST(SFDBTests, TestParseFields) {
    DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
    SFConnection sf_conn {auth};
    std::string query_string = "select product_key, meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where  event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 10;";

    auto res = sf_conn.parse_fields_from_query_string(query_string);
    ASSERT_EQ(2, res.size());
    ASSERT_EQ("product_key", res[0]);
    ASSERT_EQ("meta", res[1]);
}

程序输出如下,


image.png
上一篇下一篇

猜你喜欢

热点阅读