Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

small cleanup to test github actions #1

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ testext
test/python/__pycache__/
.Rhistory
vcpkg_installed/
vcpkg/
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ cmake_minimum_required(VERSION 3.20)
set(TARGET_NAME duckdb_bigquery)
project(${TARGET_NAME})
set(CMAKE_CXX_STANDARD 20)
#set(CMAKE_CXX_STANDARD 14)

find_package(google_cloud_cpp_bigquery REQUIRED)
find_package(google_cloud_cpp_common REQUIRED)
Expand Down Expand Up @@ -37,3 +36,5 @@ nlohmann_json::nlohmann_json
Threads::Threads
Arrow::arrow_static
)

add_subdirectory(test)
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 193 files
2 changes: 1 addition & 1 deletion extension-ci-tools
6 changes: 1 addition & 5 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ add_library(
bigquery_storage.cpp
bigquery_utils.cpp)

# set(ALL_OBJECT_FILES
# ${ALL_OBJECT_FILES} $<TARGET_OBJECTS:bigquery_ext_library>
# PARENT_SCOPE)

# Ensure storage objects are included
set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:bigquery_ext_library> $<TARGET_OBJECTS:bigquery_ext_storage>
PARENT_SCOPE)
PARENT_SCOPE)
17 changes: 10 additions & 7 deletions src/bigquery_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct BigQueryScannerGlobalState : public GlobalTableFunctionState {
string storage_project,
string dataset,
string table,
unique_ptr<bigquery_storage_read::ReadSession> read_session,
unique_ptr<bigquery_storage_read::ReadSession> read_session_p,
std::shared_ptr<google::cloud::bigquery_storage_v1::BigQueryReadConnection> connection_p,
idx_t limit,
idx_t offset,
Expand All @@ -41,11 +41,11 @@ struct BigQueryScannerGlobalState : public GlobalTableFunctionState {
storage_project(storage_project),
dataset(dataset),
table(table),
read_session(std::move(read_session)),
connection(std::move(connection_p)),
read_session(std::move(read_session_p)),
current_offset(offset),
global_row_count(0),
limit(limit),
global_row_count(0),
has_limit(has_limit)
{}

Expand Down Expand Up @@ -98,7 +98,7 @@ static unique_ptr<GlobalTableFunctionState> BigQueryInitGlobalState(ClientContex
// identical.
std::string const table_name = "projects/" + storage_project + "/datasets/" + dataset + "/tables/" + table;

constexpr int max_streams = 1;
//constexpr int max_streams = 1;

std::shared_ptr<google::cloud::bigquery_storage_v1::BigQueryReadConnection> connection;
if(service_account_json.empty()){
Expand All @@ -116,9 +116,10 @@ static unique_ptr<GlobalTableFunctionState> BigQueryInitGlobalState(ClientContex
auto read_session = make_uniq<bigquery_storage_read::ReadSession>();
read_session->set_data_format(google::cloud::bigquery::storage::v1::DataFormat::ARROW);
read_session->set_table(table_name);
for (idx_t c = 0; c < column_names.size(); c++) {
//Printer::Print("Adding column: " + column_names[c]);
read_session->mutable_read_options()->add_selected_fields(column_names[c]);
for(auto &column_id : input.column_ids){
auto column_name = bind_data.column_names[column_id];
//Printer::Print("Adding column: " + column_name);
read_session->mutable_read_options()->add_selected_fields(column_name);
}
auto filters = BigQueryFilterPushdown::TransformFilters(input.column_ids, input.filters, bind_data.column_names);
//Printer::Print("filters: " + filters);
Expand Down Expand Up @@ -198,6 +199,8 @@ static void BigQueryScan(ClientContext &context, TableFunctionInput &data, DataC

//Printer::Print("max_rows: " + to_string(max_rows));

//Printer::Print("Column count : " + to_string(output.ColumnCount()));

for (idx_t c = 0; c < output.ColumnCount(); c++) {

std::shared_ptr<arrow::Array> column = record_batch->column(c);
Expand Down
193 changes: 116 additions & 77 deletions src/bigquery_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,85 @@ using namespace concurrency::streams;

namespace duckdb {

class BQColumnRequest {
private:
json schema;
public:
BQColumnRequest(const json& schema): schema(schema) {}
vector<BQField> ParseColumnFields() {
return ParseColumnFields(this->schema);
}
vector<BQField> ParseColumnFields(const json& schema) {
//Printer::Print("Parsing schema 1");
vector<BQField> column_list;
for (const auto& field : schema["fields"]) {
auto field_name = field["name"].get<std::string>();
auto field_type_str = field["type"].get<std::string>();
std::vector<BQField> subfields;
//Printer::Print("Parsing schema " + field_name + " " + field_type_str);

if (field_type_str == "RECORD") {
//Printer::Print("Parsing record");
subfields = ParseColumnFields(field);
}

auto field_type = TypeToLogicalType(field_type_str, subfields);
column_list.push_back(BQField(field_name, field_type));
}
return column_list;
}

LogicalType TypeToLogicalType(const std::string &bq_type, std::vector<BQField> subfields) {
if (bq_type == "INTEGER") {
return LogicalType::BIGINT;
} else if (bq_type == "FLOAT") {
return LogicalType::DOUBLE;
} else if (bq_type == "DATE") {
return LogicalType::DATE;
} else if (bq_type == "TIME") {
// we need to convert time to VARCHAR because TIME in BigQuery is more like an
// interval and can store ranges between -838:00:00 to 838:00:00
return LogicalType::VARCHAR;
} else if (bq_type == "TIMESTAMP") {
// in BigQuery, "timestamp" columns are timezone aware while "datetime" columns
// are not
return LogicalType::TIMESTAMP_TZ;
} else if (bq_type == "YEAR") {
return LogicalType::INTEGER;
} else if (bq_type == "DATETIME") {
return LogicalType::TIMESTAMP;
} else if (bq_type == "NUMERIC" || bq_type == "BIGNUMERIC") {
// BigQuery NUMERIC and BIGNUMERIC types can have a precision up to 38 and a scale up to 9
// Assume a default precision and scale for this example; these could be parameterized if needed
return LogicalType::DECIMAL(38, 9);
} else if (bq_type == "JSON") {
// FIXME
return LogicalType::VARCHAR;
} else if (bq_type == "BYTES") {
return LogicalType::BLOB;
} else if (bq_type == "STRING") {
return LogicalType::VARCHAR;
} else if(bq_type == "BOOLEAN") {
return LogicalType::BOOLEAN;
} else if(bq_type == "RECORD") {
// transform the subfields into a list of LogicalType using a recursive call
child_list_t<LogicalType> subfield_types;
for (auto &subfield : subfields) {
subfield_types.emplace_back(subfield.name, subfield.type);
}
auto duckdb_type = LogicalType::STRUCT(subfield_types);
//Printer::Print("Struct type: " + duckdb_type.ToString());
return duckdb_type;
//return LogicalType::STRUCT();
}
Printer::Print("Unknown type: " + bq_type);
// fallback for unknown types
return LogicalType::VARCHAR;
}
};

std::string GetAccessToken(const string &service_account_json) {
google::cloud::v2_25::StatusOr<std::__1::shared_ptr<google::cloud::storage::v2_25::oauth2::Credentials>> credentials;
google::cloud::StatusOr<std::__1::shared_ptr<google::cloud::storage::oauth2::Credentials>> credentials;
if(service_account_json.empty()) {
credentials = gcpoauth2::GoogleDefaultCredentials();
} else {
Expand Down Expand Up @@ -67,7 +144,7 @@ unique_ptr<BigQueryTableEntry> BigQueryUtils::BigQueryCreateBigQueryTableEntry(
return nullptr;
}
// print size of columns
auto column_list_size = column_list.size();
//auto column_list_size = column_list.size();
//Printer::Print("column_list_size size: " + to_string(column_list_size));

//Printer::Print("column_list done");
Expand All @@ -79,7 +156,7 @@ unique_ptr<BigQueryTableEntry> BigQueryUtils::BigQueryCreateBigQueryTableEntry(
columns.AddColumn(std::move(column));
}
// print size of columns
auto column_size = columns.GetColumnNames().size();
//auto column_size = columns.GetColumnNames().size();
//Printer::Print("columns size: " + to_string(column_size));
auto table_entry = make_uniq<BigQueryTableEntry>(catalog, *schema_entry, *table_info);
return table_entry;
Expand Down Expand Up @@ -120,29 +197,15 @@ unique_ptr<BigQueryTableEntry> BigQueryUtils::BigQueryCreateBigQueryTableEntry(
if (response.status_code() == status_codes::OK) {
return response.extract_json()
.then([](web::json::value const& v) -> vector<BQField> {
std::string str = v.serialize();
//Printer::Print("received JSON: " + str);

// Parse the JSON string
json j = json::parse(str);

// Extract the fields
vector<BQField> column_list;
for (const auto& field : j["schema"]["fields"]) {
auto field_name = field["name"].get<std::string>();
auto field_type = BigQueryUtils::TypeToLogicalType(field["type"]);
//add BQField to column_list
column_list.push_back(BQField(field_name, field_type));
}

// for (const auto& field : column_list) {
// Printer::Print("field: " + field.name + " type: " + field.type.ToString());
// }

return column_list;
auto bqFields = BigQueryUtils::ParseColumnJSONResponse(v);
// print bq fields
for (auto &field : bqFields) {
//Printer::Print("Field: " + field.name + " " + field.type.ToString());
}
return bqFields;
});
} else {
Printer::Print("Error: " + response.to_string());
//Printer::Print("Error: " + response.to_string());
throw std::runtime_error("Failed to get column list for provided table, it's likely either an authentication issue or the table does not exist");
}
return pplx::task_from_result(vector<BQField>());
Expand All @@ -160,6 +223,18 @@ unique_ptr<BigQueryTableEntry> BigQueryUtils::BigQueryCreateBigQueryTableEntry(
return vector<BQField>();
}

vector<BQField> BigQueryUtils::ParseColumnJSONResponse(web::json::value const& v){

std::string str = v.serialize();
//Printer::Print("received JSON: " + str);
// Parse the JSON string
json j = json::parse(str);
auto schema = j["schema"];
//return vector<BQField>();
auto bcr = new BQColumnRequest(schema);
return bcr->ParseColumnFields();
}

Value BigQueryUtils::ValueFromArrowScalar(std::shared_ptr<arrow::Scalar> scalar) {
switch (scalar->type->id()) {
case arrow::Type::INT64: {
Expand Down Expand Up @@ -262,10 +337,12 @@ Value BigQueryUtils::ValueFromArrowScalar(std::shared_ptr<arrow::Scalar> scalar)
}
case arrow::Type::STRUCT: {
// Extract the struct value from the StructScalar
//Printer::Print("Struct scalar");
auto struct_scalar = std::static_pointer_cast<arrow::StructScalar>(scalar);
const auto& struct_values = struct_scalar->value;
const auto& field_names = struct_scalar->type->fields();

//Printer::Print("Struct scalar values: " + to_string(struct_values.size()));
//Printer::Print("Struct scalar names: " + to_string(field_names.size()));
// Convert each field in the struct to a Value
child_list_t<Value> values;
for (size_t i = 0; i < struct_values.size(); i++) {
Expand Down Expand Up @@ -359,42 +436,20 @@ Value BigQueryUtils::ValueFromArrowScalar(std::shared_ptr<arrow::Scalar> scalar)
}
}

LogicalType BigQueryUtils::TypeToLogicalType(const std::string &bq_type) {
if (bq_type == "INTEGER") {
return LogicalType::BIGINT;
} else if (bq_type == "FLOAT") {
return LogicalType::DOUBLE;
} else if (bq_type == "DATE") {
return LogicalType::DATE;
} else if (bq_type == "TIME") {
// we need to convert time to VARCHAR because TIME in BigQuery is more like an
// interval and can store ranges between -838:00:00 to 838:00:00
return LogicalType::VARCHAR;
} else if (bq_type == "TIMESTAMP") {
// in BigQuery, "timestamp" columns are timezone aware while "datetime" columns
// are not
return LogicalType::TIMESTAMP_TZ;
} else if (bq_type == "YEAR") {
return LogicalType::INTEGER;
} else if (bq_type == "DATETIME") {
return LogicalType::TIMESTAMP;
} else if (bq_type == "NUMERIC" || bq_type == "BIGNUMERIC") {
// BigQuery NUMERIC and BIGNUMERIC types can have a precision up to 38 and a scale up to 9
// Assume a default precision and scale for this example; these could be parameterized if needed
return LogicalType::DECIMAL(38, 9);
} else if (bq_type == "JSON") {
// FIXME
return LogicalType::VARCHAR;
} else if (bq_type == "BYTES") {
return LogicalType::BLOB;
} else if (bq_type == "STRING") {
return LogicalType::VARCHAR;
} else if(bq_type == "BOOLEAN") {
return LogicalType::BOOLEAN;
}
Printer::Print("Unknown type: " + bq_type);
// fallback for unknown types
return LogicalType::VARCHAR;
std::shared_ptr<arrow::Schema> BigQueryUtils::GetArrowSchema(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) {
std::shared_ptr<arrow::Buffer> buffer =
std::make_shared<arrow::Buffer>(schema_in.serialized_schema());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo);
if (!result.ok()) {
Printer::Print("Unable to parse schema: " + result.status().message());
throw result.status();
}
std::shared_ptr<arrow::Schema> schema = result.ValueOrDie();
//Printer::Print("Schema: " + schema->ToString());
return schema;
}

string BigQueryUtils::EscapeQuotes(const string &text, char quote) {
Expand All @@ -412,22 +467,6 @@ string BigQueryUtils::EscapeQuotes(const string &text, char quote) {
return result;
}

std::shared_ptr<arrow::Schema> BigQueryUtils::GetArrowSchema(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) {
std::shared_ptr<arrow::Buffer> buffer =
std::make_shared<arrow::Buffer>(schema_in.serialized_schema());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo);
if (!result.ok()) {
Printer::Print("Unable to parse schema: " + result.status().message());
throw result.status();
}
std::shared_ptr<arrow::Schema> schema = result.ValueOrDie();
//Printer::Print("Schema: " + schema->ToString());
return schema;
}

string BigQueryUtils::WriteQuoted(const string &text, char quote) {
// 1. Escapes all occurences of 'quote' by escaping them with a backslash
// 2. Adds quotes around the string
Expand Down
16 changes: 12 additions & 4 deletions src/include/bigquery_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
#include "duckdb.hpp"
#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h"
#include <arrow/api.h>
#include <nlohmann/json.hpp>
#include <cpprest/http_client.h>

using json = nlohmann::json;
using namespace web::http;
using namespace web::http::client;

namespace duckdb {

Expand Down Expand Up @@ -48,18 +54,20 @@ class BigQueryUtils {
const string &table,
const string &service_account_json);

static Value ValueFromArrowScalar(std::shared_ptr<arrow::Scalar> scalar);
static Value ValueFromArrowScalar(std::shared_ptr<arrow::Scalar> scalar);

static std::shared_ptr<arrow::Schema> GetArrowSchema(
static std::shared_ptr<arrow::Schema> GetArrowSchema(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in);

//static BigQueryConnectionParameters ParseConnectionParameters(const string &dsn);
//static BIGQUERY *Connect(const string &dsn);

//static LogicalType ToBigQueryType(const LogicalType &input);
static LogicalType TypeToLogicalType(const std::string &bq_type);
//static LogicalType FieldToLogicalType(ClientContext &context, BIGQUERY_FIELD *field);
// static string TypeToString(const LogicalType &input);
//static string TypeToString(const LogicalType &input);
static vector<BQField> ParseColumnJSONResponse(web::json::value const& v);
//static LogicalType TypeToLogicalType(const std::string &bq_type, std::vector<BQField> subfields);
//static vector<BQField> ParseColumnFields(const json& schema);

static string WriteIdentifier(const string &identifier);
static string WriteLiteral(const string &identifier);
Expand Down
Loading
Loading