diff --git a/.gitignore b/.gitignore index 38a16c2..cf87f98 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ testext test/python/__pycache__/ .Rhistory vcpkg_installed/ +vcpkg/ diff --git a/CMakeLists.txt b/CMakeLists.txt index f2c992f..fd9834d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,6 @@ cmake_minimum_required(VERSION 3.20) set(TARGET_NAME duckdb_bigquery) project(${TARGET_NAME}) set(CMAKE_CXX_STANDARD 20) -#set(CMAKE_CXX_STANDARD 14) find_package(google_cloud_cpp_bigquery REQUIRED) find_package(google_cloud_cpp_common REQUIRED) @@ -37,3 +36,5 @@ nlohmann_json::nlohmann_json Threads::Threads Arrow::arrow_static ) + +add_subdirectory(test) \ No newline at end of file diff --git a/duckdb b/duckdb index cc2dc1d..1832d2c 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit cc2dc1d6d4b29b1b20c94e70061de2f441b794a1 +Subproject commit 1832d2c26ca1404675805ba172b99a43cdaa0943 diff --git a/extension-ci-tools b/extension-ci-tools index c60db58..bbc8871 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit c60db58eabacf6746fe1972d6874ae44d4d17e9e +Subproject commit bbc8871900669dabd02a6aa9dc0efa6b5b658223 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0d43105..bc8322a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,11 +13,7 @@ add_library( bigquery_storage.cpp bigquery_utils.cpp) -# set(ALL_OBJECT_FILES -# ${ALL_OBJECT_FILES} $ -# PARENT_SCOPE) - # Ensure storage objects are included set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ $ -PARENT_SCOPE) \ No newline at end of file +PARENT_SCOPE) diff --git a/src/bigquery_scanner.cpp b/src/bigquery_scanner.cpp index c923d64..1b038e6 100644 --- a/src/bigquery_scanner.cpp +++ b/src/bigquery_scanner.cpp @@ -31,7 +31,7 @@ struct BigQueryScannerGlobalState : public GlobalTableFunctionState { string storage_project, string dataset, string table, - unique_ptr read_session, + unique_ptr read_session_p, std::shared_ptr connection_p, idx_t limit, idx_t offset, @@ -41,11 +41,11 @@ struct BigQueryScannerGlobalState : public GlobalTableFunctionState { storage_project(storage_project), dataset(dataset), table(table), - read_session(std::move(read_session)), connection(std::move(connection_p)), + read_session(std::move(read_session_p)), current_offset(offset), - global_row_count(0), limit(limit), + global_row_count(0), has_limit(has_limit) {} @@ -98,7 +98,7 @@ static unique_ptr BigQueryInitGlobalState(ClientContex // identical. std::string const table_name = "projects/" + storage_project + "/datasets/" + dataset + "/tables/" + table; - constexpr int max_streams = 1; + //constexpr int max_streams = 1; std::shared_ptr connection; if(service_account_json.empty()){ @@ -116,9 +116,10 @@ static unique_ptr BigQueryInitGlobalState(ClientContex auto read_session = make_uniq(); read_session->set_data_format(google::cloud::bigquery::storage::v1::DataFormat::ARROW); read_session->set_table(table_name); - for (idx_t c = 0; c < column_names.size(); c++) { - //Printer::Print("Adding column: " + column_names[c]); - read_session->mutable_read_options()->add_selected_fields(column_names[c]); + for(auto &column_id : input.column_ids){ + auto column_name = bind_data.column_names[column_id]; + //Printer::Print("Adding column: " + column_name); + read_session->mutable_read_options()->add_selected_fields(column_name); } auto filters = BigQueryFilterPushdown::TransformFilters(input.column_ids, input.filters, bind_data.column_names); //Printer::Print("filters: " + filters); @@ -198,6 +199,8 @@ static void BigQueryScan(ClientContext &context, TableFunctionInput &data, DataC //Printer::Print("max_rows: " + to_string(max_rows)); + //Printer::Print("Column count : " + to_string(output.ColumnCount())); + for (idx_t c = 0; c < output.ColumnCount(); c++) { std::shared_ptr column = record_batch->column(c); diff --git a/src/bigquery_utils.cpp b/src/bigquery_utils.cpp index f3dfd6d..cc2ee47 100644 --- a/src/bigquery_utils.cpp +++ b/src/bigquery_utils.cpp @@ -32,8 +32,85 @@ using namespace concurrency::streams; namespace duckdb { + class BQColumnRequest { + private: + json schema; + public: + BQColumnRequest(const json& schema): schema(schema) {} + vector ParseColumnFields() { + return ParseColumnFields(this->schema); + } + vector ParseColumnFields(const json& schema) { + //Printer::Print("Parsing schema 1"); + vector column_list; + for (const auto& field : schema["fields"]) { + auto field_name = field["name"].get(); + auto field_type_str = field["type"].get(); + std::vector subfields; + //Printer::Print("Parsing schema " + field_name + " " + field_type_str); + + if (field_type_str == "RECORD") { + //Printer::Print("Parsing record"); + subfields = ParseColumnFields(field); + } + + auto field_type = TypeToLogicalType(field_type_str, subfields); + column_list.push_back(BQField(field_name, field_type)); + } + return column_list; + } + + LogicalType TypeToLogicalType(const std::string &bq_type, std::vector subfields) { + if (bq_type == "INTEGER") { + return LogicalType::BIGINT; + } else if (bq_type == "FLOAT") { + return LogicalType::DOUBLE; + } else if (bq_type == "DATE") { + return LogicalType::DATE; + } else if (bq_type == "TIME") { + // we need to convert time to VARCHAR because TIME in BigQuery is more like an + // interval and can store ranges between -838:00:00 to 838:00:00 + return LogicalType::VARCHAR; + } else if (bq_type == "TIMESTAMP") { + // in BigQuery, "timestamp" columns are timezone aware while "datetime" columns + // are not + return LogicalType::TIMESTAMP_TZ; + } else if (bq_type == "YEAR") { + return LogicalType::INTEGER; + } else if (bq_type == "DATETIME") { + return LogicalType::TIMESTAMP; + } else if (bq_type == "NUMERIC" || bq_type == "BIGNUMERIC") { + // BigQuery NUMERIC and BIGNUMERIC types can have a precision up to 38 and a scale up to 9 + // Assume a default precision and scale for this example; these could be parameterized if needed + return LogicalType::DECIMAL(38, 9); + } else if (bq_type == "JSON") { + // FIXME + return LogicalType::VARCHAR; + } else if (bq_type == "BYTES") { + return LogicalType::BLOB; + } else if (bq_type == "STRING") { + return LogicalType::VARCHAR; + } else if(bq_type == "BOOLEAN") { + return LogicalType::BOOLEAN; + } else if(bq_type == "RECORD") { + // transform the subfields into a list of LogicalType using a recursive call + child_list_t subfield_types; + for (auto &subfield : subfields) { + subfield_types.emplace_back(subfield.name, subfield.type); + } + auto duckdb_type = LogicalType::STRUCT(subfield_types); + //Printer::Print("Struct type: " + duckdb_type.ToString()); + return duckdb_type; + //return LogicalType::STRUCT(); + } + Printer::Print("Unknown type: " + bq_type); + // fallback for unknown types + return LogicalType::VARCHAR; + } +}; + std::string GetAccessToken(const string &service_account_json) { - google::cloud::v2_25::StatusOr> credentials; + google::cloud::StatusOr> credentials; if(service_account_json.empty()) { credentials = gcpoauth2::GoogleDefaultCredentials(); } else { @@ -67,7 +144,7 @@ unique_ptr BigQueryUtils::BigQueryCreateBigQueryTableEntry( return nullptr; } // print size of columns - auto column_list_size = column_list.size(); + //auto column_list_size = column_list.size(); //Printer::Print("column_list_size size: " + to_string(column_list_size)); //Printer::Print("column_list done"); @@ -79,7 +156,7 @@ unique_ptr BigQueryUtils::BigQueryCreateBigQueryTableEntry( columns.AddColumn(std::move(column)); } // print size of columns - auto column_size = columns.GetColumnNames().size(); + //auto column_size = columns.GetColumnNames().size(); //Printer::Print("columns size: " + to_string(column_size)); auto table_entry = make_uniq(catalog, *schema_entry, *table_info); return table_entry; @@ -120,29 +197,15 @@ unique_ptr BigQueryUtils::BigQueryCreateBigQueryTableEntry( if (response.status_code() == status_codes::OK) { return response.extract_json() .then([](web::json::value const& v) -> vector { - std::string str = v.serialize(); - //Printer::Print("received JSON: " + str); - - // Parse the JSON string - json j = json::parse(str); - - // Extract the fields - vector column_list; - for (const auto& field : j["schema"]["fields"]) { - auto field_name = field["name"].get(); - auto field_type = BigQueryUtils::TypeToLogicalType(field["type"]); - //add BQField to column_list - column_list.push_back(BQField(field_name, field_type)); - } - - // for (const auto& field : column_list) { - // Printer::Print("field: " + field.name + " type: " + field.type.ToString()); - // } - - return column_list; + auto bqFields = BigQueryUtils::ParseColumnJSONResponse(v); + // print bq fields + for (auto &field : bqFields) { + //Printer::Print("Field: " + field.name + " " + field.type.ToString()); + } + return bqFields; }); } else { - Printer::Print("Error: " + response.to_string()); + //Printer::Print("Error: " + response.to_string()); throw std::runtime_error("Failed to get column list for provided table, it's likely either an authentication issue or the table does not exist"); } return pplx::task_from_result(vector()); @@ -160,6 +223,18 @@ unique_ptr BigQueryUtils::BigQueryCreateBigQueryTableEntry( return vector(); } +vector BigQueryUtils::ParseColumnJSONResponse(web::json::value const& v){ + + std::string str = v.serialize(); + //Printer::Print("received JSON: " + str); + // Parse the JSON string + json j = json::parse(str); + auto schema = j["schema"]; + //return vector(); + auto bcr = new BQColumnRequest(schema); + return bcr->ParseColumnFields(); +} + Value BigQueryUtils::ValueFromArrowScalar(std::shared_ptr scalar) { switch (scalar->type->id()) { case arrow::Type::INT64: { @@ -262,10 +337,12 @@ Value BigQueryUtils::ValueFromArrowScalar(std::shared_ptr scalar) } case arrow::Type::STRUCT: { // Extract the struct value from the StructScalar + //Printer::Print("Struct scalar"); auto struct_scalar = std::static_pointer_cast(scalar); const auto& struct_values = struct_scalar->value; const auto& field_names = struct_scalar->type->fields(); - + //Printer::Print("Struct scalar values: " + to_string(struct_values.size())); + //Printer::Print("Struct scalar names: " + to_string(field_names.size())); // Convert each field in the struct to a Value child_list_t values; for (size_t i = 0; i < struct_values.size(); i++) { @@ -359,42 +436,20 @@ Value BigQueryUtils::ValueFromArrowScalar(std::shared_ptr scalar) } } -LogicalType BigQueryUtils::TypeToLogicalType(const std::string &bq_type) { - if (bq_type == "INTEGER") { - return LogicalType::BIGINT; - } else if (bq_type == "FLOAT") { - return LogicalType::DOUBLE; - } else if (bq_type == "DATE") { - return LogicalType::DATE; - } else if (bq_type == "TIME") { - // we need to convert time to VARCHAR because TIME in BigQuery is more like an - // interval and can store ranges between -838:00:00 to 838:00:00 - return LogicalType::VARCHAR; - } else if (bq_type == "TIMESTAMP") { - // in BigQuery, "timestamp" columns are timezone aware while "datetime" columns - // are not - return LogicalType::TIMESTAMP_TZ; - } else if (bq_type == "YEAR") { - return LogicalType::INTEGER; - } else if (bq_type == "DATETIME") { - return LogicalType::TIMESTAMP; - } else if (bq_type == "NUMERIC" || bq_type == "BIGNUMERIC") { - // BigQuery NUMERIC and BIGNUMERIC types can have a precision up to 38 and a scale up to 9 - // Assume a default precision and scale for this example; these could be parameterized if needed - return LogicalType::DECIMAL(38, 9); - } else if (bq_type == "JSON") { - // FIXME - return LogicalType::VARCHAR; - } else if (bq_type == "BYTES") { - return LogicalType::BLOB; - } else if (bq_type == "STRING") { - return LogicalType::VARCHAR; - } else if(bq_type == "BOOLEAN") { - return LogicalType::BOOLEAN; - } - Printer::Print("Unknown type: " + bq_type); - // fallback for unknown types - return LogicalType::VARCHAR; +std::shared_ptr BigQueryUtils::GetArrowSchema( + ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) { + std::shared_ptr buffer = + std::make_shared(schema_in.serialized_schema()); + arrow::io::BufferReader buffer_reader(buffer); + arrow::ipc::DictionaryMemo dictionary_memo; + auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo); + if (!result.ok()) { + Printer::Print("Unable to parse schema: " + result.status().message()); + throw result.status(); + } + std::shared_ptr schema = result.ValueOrDie(); + //Printer::Print("Schema: " + schema->ToString()); + return schema; } string BigQueryUtils::EscapeQuotes(const string &text, char quote) { @@ -412,22 +467,6 @@ string BigQueryUtils::EscapeQuotes(const string &text, char quote) { return result; } -std::shared_ptr BigQueryUtils::GetArrowSchema( - ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) { - std::shared_ptr buffer = - std::make_shared(schema_in.serialized_schema()); - arrow::io::BufferReader buffer_reader(buffer); - arrow::ipc::DictionaryMemo dictionary_memo; - auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo); - if (!result.ok()) { - Printer::Print("Unable to parse schema: " + result.status().message()); - throw result.status(); - } - std::shared_ptr schema = result.ValueOrDie(); - //Printer::Print("Schema: " + schema->ToString()); - return schema; -} - string BigQueryUtils::WriteQuoted(const string &text, char quote) { // 1. Escapes all occurences of 'quote' by escaping them with a backslash // 2. Adds quotes around the string diff --git a/src/include/bigquery_utils.hpp b/src/include/bigquery_utils.hpp index 4a4d2c3..7455daa 100644 --- a/src/include/bigquery_utils.hpp +++ b/src/include/bigquery_utils.hpp @@ -11,6 +11,12 @@ #include "duckdb.hpp" #include "google/cloud/bigquery/storage/v1/bigquery_read_client.h" #include +#include +#include + +using json = nlohmann::json; +using namespace web::http; +using namespace web::http::client; namespace duckdb { @@ -48,18 +54,20 @@ class BigQueryUtils { const string &table, const string &service_account_json); - static Value ValueFromArrowScalar(std::shared_ptr scalar); + static Value ValueFromArrowScalar(std::shared_ptr scalar); - static std::shared_ptr GetArrowSchema( + static std::shared_ptr GetArrowSchema( ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in); //static BigQueryConnectionParameters ParseConnectionParameters(const string &dsn); //static BIGQUERY *Connect(const string &dsn); //static LogicalType ToBigQueryType(const LogicalType &input); - static LogicalType TypeToLogicalType(const std::string &bq_type); //static LogicalType FieldToLogicalType(ClientContext &context, BIGQUERY_FIELD *field); - // static string TypeToString(const LogicalType &input); + //static string TypeToString(const LogicalType &input); + static vector ParseColumnJSONResponse(web::json::value const& v); + //static LogicalType TypeToLogicalType(const std::string &bq_type, std::vector subfields); + //static vector ParseColumnFields(const json& schema); static string WriteIdentifier(const string &identifier); static string WriteLiteral(const string &identifier); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100644 index 0000000..64b2490 --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1,18 @@ +enable_testing() + +find_package(GTest REQUIRED) + +include_directories(${GTEST_INCLUDE_DIRS}) + +# add_executable( +# bigquery_utils_test +# cpp/bigquery_utils_test.cpp +# ) + +# target_link_libraries( +# bigquery_utils_test +# ${LOADABLE_EXTENSION_NAME} +# ${GTEST_LIBRARIES} +# ) + +# add_test(NAME bigquery_utils_test COMMAND bigquery_utils_test) \ No newline at end of file diff --git a/test/cpp/bigquery_utils_test.cpp b/test/cpp/bigquery_utils_test.cpp new file mode 100644 index 0000000..8840d3e --- /dev/null +++ b/test/cpp/bigquery_utils_test.cpp @@ -0,0 +1,16 @@ +#include +//#include "bigquery_utils.hpp" +#include + +using json = nlohmann::json; + +namespace duckdb { + +TEST(BigQueryUtilsTest, ParsesEmptySchema) { + json empty_schema = {{"fields", json::array()}}; + //auto result = BigQueryUtils::ParseColumnFields(empty_schema); + //EXPECT_TRUE(result.empty()); + EXPECT_TRUE(true); +} + +} // namespace duckdb diff --git a/vcpkg.json b/vcpkg.json index b3d86f7..de84aa8 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -10,6 +10,7 @@ }, "cpprestsdk", "nlohmann-json", - "arrow" + "arrow", + "gtest" ] }