C++11 Snowflake简单查询器封装[ODBC]
2021-08-14 本文已影响0人
FredricZhu
把昨天那个简单封装了一下,作为一个万能的查询器吧,可以做简单的SQL查询。
因为公司的生产环境对测试是只读的,所以没做 insert和update封装。
如果有需求的话,可以自己加。
程序的结构和昨天一样,但是可以支持各种各样的查询了。
image.png
CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(ref_demo2_test)
add_definitions(-std=c++14)
add_definitions(-g)
find_package(Boost REQUIRED COMPONENTS
system
filesystem
serialization
program_options
thread
)
include_directories(${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../)
LINK_DIRECTORIES(/usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.h
${CMAKE_CURRENT_SOURCE_DIR}/../impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
string(FIND "${filename}" "test.cpp" "TEMP")
if( NOT "${TEMP}" STREQUAL "-1" )
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${APP_SOURCES})
target_link_libraries(${file} ${Boost_LIBRARIES})
target_link_libraries(${file} ssl crypto libgtest.a libgtest_main.a libgmock.a iodbc iodbcinst pthread)
endif()
endforeach( sourcefile ${APP_SOURCES})
sf_db.h
#ifndef _FREDRIC_SF_DB_H_
#define _FREDRIC_SF_DB_H_
#include "sf_odbc.h"
#include "sql.h"
#include "sqlext.h"
#include <vector>
#include <string>
#include <iostream>
#include <map>
#include <cassert>
struct DBAuthorization {
std::string dataSource{};
std::string password{};
};
/**
* iodbc
* /usr/local/iODBC/include
* /usr/local/iODBC/lib
* */
struct SFConnection {
using DB_RETURN_TYPE = std::vector<std::map<std::string, std::string>>;
SFConnection(DBAuthorization auth);
~SFConnection();
/**
* 执行查询的函数
* @param query_string 原始查询字符串,注意这里不支持select *, 一般也不会用select * 因为性能比较差
* @return 返回查询的结果集
*
* example:
* select product_key, device_code from dim_product_v1 limit 2 将会返回
* std::vector { std::map
* {"product_key": "111", "device_code": "ios-all"},
* {"product_key": "111", "device_code": "google-play"}
* }
*/
DB_RETURN_TYPE exec_query(std::string query_string);
/**
* 从Query String里面解析出要查询哪些字段的函数
* @param query_string 原始查询字符串,注意这里不支持select *, 一般也不会用select * 因为性能比较差
* @return 返回解析出的查询字段
*
* example:
* select product_key, device_code from dim_product_v1 将会解析出 std::vector {"product_key", "device_code"}
*/
std::vector<std::string> parse_fields_from_query_string(std::string query_string);
private:
DBAuthorization m_auth{};
SQLHENV henv;
SQLHDBC hdbc;
SQLRETURN henv_ret{-1};
SQLRETURN hdbc_conn_ret{-1};
SQLRETURN hdbc_ret{-1};
};
#endif
sf_db.cpp
#include "sf_db/sf_db.h"
#include <algorithm>
#include <sstream>
#include <boost/algorithm/string.hpp>
void checkError(SQLRETURN retcode, std::string msg) {
if ((retcode != SQL_SUCCESS) && (retcode = !SQL_SUCCESS_WITH_INFO)) {
std::cerr << msg << std::endl;
}
assert(retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO);
}
bool checkErrorCode(SQLRETURN retcode) {
if ((retcode != SQL_SUCCESS) && (retcode = !SQL_SUCCESS_WITH_INFO)) {
return false;
}
return true;
}
SFConnection::SFConnection(DBAuthorization auth) : m_auth(auth) {
// Allocate environment handle
henv_ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
checkError(henv_ret, "SQLAllocHandle failed");
// Set the ODBC version environment attribute
henv_ret =
SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0);
checkError(henv_ret, "SQLSetEnvAttr failed");
// Allocate connection handle
hdbc_ret = SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);
checkError(hdbc_ret, "SQLAllocHandle failed");
// Set login timeout to 5 seconds
SQLSetConnectAttr(hdbc, SQL_LOGIN_TIMEOUT, (SQLPOINTER)5, 0);
// Connect to data source
hdbc_conn_ret = SQLConnect(hdbc, (SQLCHAR *)m_auth.dataSource.data(),
SQL_NTS, (SQLCHAR *)NULL, 0,
(SQLCHAR *)m_auth.password.data(), SQL_NTS);
checkError(hdbc_conn_ret, "SQLConnect connect to product_odbc failed");
std::cout << "connect to product_odbc success!" << std::endl;
}
std::vector<std::string> SFConnection::parse_fields_from_query_string(
std::string query_string) {
std::vector<std::string> result {};
std::transform(query_string.begin(), query_string.end(),
query_string.begin(),
[](unsigned char c) { return std::tolower(c); });
auto start = query_string.find("select") + 6;
auto end = query_string.find("from") - 1;
auto fields_string = query_string.substr(start, end - start);
std::stringstream ss(fields_string);
std::string field{};
while(std::getline(ss, field, ',')) {
boost::trim(field);
result.emplace_back(std::move(field));
}
return std::move(result);
}
SFConnection::DB_RETURN_TYPE SFConnection::exec_query(
std::string query_string) {
DB_RETURN_TYPE results;
std::vector<std::string> field_names = parse_fields_from_query_string(query_string);
SQLHSTMT hstmt;
std::vector<std::array<SQLCHAR, 2000>> field_values {};
std::array<SQLCHAR, 2000> field_value {};
field_values.resize(field_names.size(), field_value);
std::vector<SQLLEN> field_lens {};
field_lens.resize(field_names.size(), 0);
// Allocate statement handle
auto hstmt_ret = SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
if (!checkErrorCode(hstmt_ret)) {
std::cerr << "SQLAllocHandle statement failed!" << std::endl;
return results;
}
auto retcode =
SQLExecDirect(hstmt, (SQLCHAR *)query_string.data(), SQL_NTS);
if (!checkErrorCode(retcode)) {
std::cerr << "SQLExecDirect failed!" << std::endl;
goto ERROR_;
}
for(int i=0; i<field_names.size(); ++i) {
SQLCHAR* tmpFieldValue = field_values[i].data();
retcode = SQLBindCol(hstmt, i+1, SQL_C_CHAR, tmpFieldValue, 2000, &field_lens[i]);
}
// Fetch and print each row of data until
// SQL_NO_DATA returned.
for (int i = 0;; i++) {
retcode = SQLFetch(hstmt);
if (retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO) {
std::cout << i+1 << " ";
std::map<std::string, std::string> tmp_fields{};
for(int j=0; j<field_names.size(); ++j) {
std::string temp_field((const char*)field_values[j].data(), field_lens[j]);
std::cout << temp_field << " ";
tmp_fields[field_names[j]] = temp_field;
}
std::cout << std::endl;
results.emplace_back(std::move(tmp_fields));
} else {
if (retcode != SQL_NO_DATA) {
std::cout << "SQLFetch Error, error code: " << retcode
<< std::endl;
break;
} else {
break;
}
}
}
ERROR_:
std::cout << "Free statement" << std::endl;
SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
return std::move(results);
}
SFConnection::~SFConnection() {
if (checkErrorCode(hdbc_conn_ret)) {
SQLDisconnect(hdbc);
std::cout << "Free connection" << std::endl;
}
if (checkErrorCode(hdbc_ret)) {
SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
std::cout << "Free dbc" << std::endl;
}
if (checkErrorCode(henv_ret)) {
SQLFreeHandle(SQL_HANDLE_ENV, henv);
std::cout << "Free env" << std::endl;
}
}
sf_db_test.cpp
#include "sf_db/sf_db.h"
#include <array>
#include <gtest/gtest.h>
GTEST_TEST(SFDBTests, TestExecQuery) {
DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
SFConnection sf_conn {auth};
std::string query_string = "select product_key,change_time,EVENT_TYPE_NAME, change_column, old_value, new_value,meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 10;";
auto results = sf_conn.exec_query(query_string);
ASSERT_EQ(10, results.size());
auto results2 = sf_conn.exec_query(query_string);
ASSERT_EQ(10, results2.size());
}
GTEST_TEST(SFDBTests, TestExecQueryAndPrintValue) {
DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
SFConnection sf_conn {auth};
std::string query_string = "select product_key,change_time,event_type_name, change_column, old_value, new_value,meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 2;";
auto results = sf_conn.exec_query(query_string);
ASSERT_EQ(2, results.size());
std::cout << "product_key, change_time, event_type_name, change_column, old_value, new_value, meta"
<< std::endl;
for(auto&& result: results) {
std::cout << " " << result["product_key"] << " " << result["change_time"] <<
" " << result["event_type_name"] << " " << result["change_column"] <<
" " << result["old_value"] << " " << result["new_value"] <<
" " << result["meta"] << std::endl;
}
}
GTEST_TEST(SFDBTests, TestParseFields) {
DBAuthorization auth {"product_odbc", "{YOUR_PASSWORD}"};
SFConnection sf_conn {auth};
std::string query_string = "select product_key, meta from AA_INTELLIGENCE_PRODUCTION.ADL_MASTER.dim_event_service_v1 where event_type_name='screenshot_change' and product_key=20600000009072 order by change_time desc limit 10;";
auto res = sf_conn.parse_fields_from_query_string(query_string);
ASSERT_EQ(2, res.size());
ASSERT_EQ("product_key", res[0]);
ASSERT_EQ("meta", res[1]);
}
程序输出如下,
image.png