From 20e4c10dc8134bd86a5cd983ff0dadd764f1edb7 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Tue, 9 Apr 2024 19:20:29 +0200 Subject: [PATCH] Cleanup branch --- include/quack/quack.h | 6 +- include/quack/quack.hpp | 61 --- .../{quack_scan.hpp => quack_heap_scan.hpp} | 47 +- include/quack/quack_select.h | 7 + include/quack/quack_types.hpp | 14 + sql/quack--0.0.1.sql | 20 +- src/CMakeLists.txt | 12 +- src/quack.c | 21 + src/quack.cpp | 75 ---- src/{quack_scan.cpp => quack_heap_scan.cpp} | 178 +++----- src/quack_hooks.c | 33 ++ src/quack_hooks.cpp | 161 ------- src/quack_internal.cpp | 6 - src/quack_select.cpp | 107 +++++ src/quack_tableam.cpp | 404 ------------------ src/quack_types.cpp | 154 +++++++ src/quack_utility.cpp | 190 -------- src/quack_write_manager.cpp | 134 ------ 18 files changed, 432 insertions(+), 1198 deletions(-) delete mode 100644 include/quack/quack.hpp rename include/quack/{quack_scan.hpp => quack_heap_scan.hpp} (56%) create mode 100644 include/quack/quack_select.h create mode 100644 include/quack/quack_types.hpp create mode 100644 src/quack.c delete mode 100644 src/quack.cpp rename src/{quack_scan.cpp => quack_heap_scan.cpp} (54%) create mode 100644 src/quack_hooks.c delete mode 100644 src/quack_hooks.cpp delete mode 100644 src/quack_internal.cpp create mode 100644 src/quack_select.cpp delete mode 100644 src/quack_tableam.cpp create mode 100644 src/quack_types.cpp delete mode 100644 src/quack_utility.cpp delete mode 100644 src/quack_write_manager.cpp diff --git a/include/quack/quack.h b/include/quack/quack.h index 8be0d64..567bc5a 100644 --- a/include/quack/quack.h +++ b/include/quack/quack.h @@ -1,6 +1,8 @@ #pragma once +// quack.c void _PG_init(void); +void _PG_fini(void); -// quack_internal.cpp -const char * quack_duckdb_version(); \ No newline at end of file +// quack_hooks.c +extern void quack_init_hooks(void); \ No newline at end of file diff --git a/include/quack/quack.hpp b/include/quack/quack.hpp deleted file mode 100644 index 34b3fb1..0000000 --- a/include/quack/quack.hpp +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include "duckdb/main/connection.hpp" -#include "duckdb/main/database.hpp" -#include "duckdb/main/appender.hpp" -#include "duckdb/common/types/data_chunk.hpp" - -namespace duckdb { -struct QuackWriteState; -} // namespace duckdb - -extern "C" { - -#include "postgres.h" - -#include "storage/relfilelocator.h" -#include "access/tupdesc.h" -#include "access/tableam.h" -#include "utils/rel.h" -#include "common/relpath.h" - -/* Quack GUC */ -extern char *quack_data_dir; - -// quack_hooks.c -extern void quack_init_hooks(void); - -// quack_tableam.c -extern void quack_init_tableam(void); -const TableAmRoutine *quack_get_table_am_routine(void); - -extern const char *quack_duckdb_type(Oid columnOid); - -// quack_write_manager.c -extern duckdb::QuackWriteState *quack_init_write_state(Relation relation, Oid databaseOid, - SubTransactionId currentSubXid); -extern void quack_flush_write_state(SubTransactionId currentSubXid, SubTransactionId parentSubXid, bool commit); -// quack.c -void _PG_init(void); -} - -namespace duckdb { - -extern void quack_translate_value(TupleTableSlot *slot, Value &value, idx_t col); - -// quack_utility.c -extern void quack_execute_query(const char *query); -extern unique_ptr quack_open_database(Oid databaseOid, bool preserveInsertOrder); -extern unique_ptr quack_open_connection(DuckDB database); -extern void quack_append_value(Appender &appender, Oid columnOid, Datum value); -extern unique_ptr quack_create_appender(Connection &connection, const char *tableName); - -typedef struct QuackWriteState { - RelFileNumber rel_node; - unique_ptr database; - unique_ptr connection; - unique_ptr appender; - uint16 row_count; -} QuackWriteState; - -} // namespace duckdb diff --git a/include/quack/quack_scan.hpp b/include/quack/quack_heap_scan.hpp similarity index 56% rename from include/quack/quack_scan.hpp rename to include/quack/quack_heap_scan.hpp index 0353671..f564b99 100644 --- a/include/quack/quack_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -1,12 +1,13 @@ #pragma once #include "duckdb.hpp" -#include "postgres.h" -#include "miscadmin.h" extern "C" { +#include "postgres.h" +#include "miscadmin.h" #include "executor/executor.h" -} // extern "C" +#include "access/relscan.h" +} // Postgres Relation @@ -27,11 +28,11 @@ class PostgresRelation { Relation rel = nullptr; }; -namespace duckdb { +namespace quack { // Local State -struct PostgresScanLocalState : public LocalTableFunctionState { +struct PostgresScanLocalState : public duckdb::LocalTableFunctionState { public: PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot); ~PostgresScanLocalState() override; @@ -44,15 +45,14 @@ struct PostgresScanLocalState : public LocalTableFunctionState { // Global State -struct PostgresScanGlobalState : public GlobalTableFunctionState { +struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { explicit PostgresScanGlobalState(); - std::mutex lock; }; // Bind Data -struct PostgresScanFunctionData : public TableFunctionData { +struct PostgresScanFunctionData : public duckdb::TableFunctionData { public: PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot); ~PostgresScanFunctionData() override; @@ -64,22 +64,26 @@ struct PostgresScanFunctionData : public TableFunctionData { // ------- Table Function ------- -struct PostgresScanFunction : public TableFunction { +struct PostgresScanFunction : public duckdb::TableFunction { public: PostgresScanFunction(); public: - static unique_ptr PostgresBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names); - static unique_ptr PostgresInitGlobal(ClientContext &context, - TableFunctionInitInput &input); - static unique_ptr - PostgresInitLocal(ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *gstate); + static duckdb::unique_ptr PostgresBind(duckdb::ClientContext &context, + duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names); + static duckdb::unique_ptr + PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); + static duckdb::unique_ptr + PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate); // static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p); // static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); - static void PostgresFunc(ClientContext &context, TableFunctionInput &data_p, DataChunk &output); + static void PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output); // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void @@ -87,9 +91,7 @@ struct PostgresScanFunction : public TableFunction { // &function); }; -// ------- Replacement Scan ------- - -struct PostgresReplacementScanData : public ReplacementScanData { +struct PostgresReplacementScanData : public duckdb::ReplacementScanData { public: PostgresReplacementScanData(QueryDesc *desc); ~PostgresReplacementScanData() override; @@ -98,7 +100,8 @@ struct PostgresReplacementScanData : public ReplacementScanData { QueryDesc *desc; }; -unique_ptr PostgresReplacementScan(ClientContext &context, const string &table_name, - ReplacementScanData *data); +duckdb::unique_ptr PostgresReplacementScan(duckdb::ClientContext &context, + const duckdb::string &table_name, + duckdb::ReplacementScanData *data); -} // namespace duckdb +} // namespace quack diff --git a/include/quack/quack_select.h b/include/quack/quack_select.h new file mode 100644 index 0000000..f8bda41 --- /dev/null +++ b/include/quack/quack_select.h @@ -0,0 +1,7 @@ +#pragma once + +#include "postgres.h" + +#include "executor/executor.h" + +bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count); \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp new file mode 100644 index 0000000..e32c415 --- /dev/null +++ b/include/quack/quack_types.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "executor/tuptable.h" +} + +namespace quack { +duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); +void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); +void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); +} // namespace quack \ No newline at end of file diff --git a/sql/quack--0.0.1.sql b/sql/quack--0.0.1.sql index 00715ee..b052599 100644 --- a/sql/quack--0.0.1.sql +++ b/sql/quack--0.0.1.sql @@ -1,10 +1,16 @@ CREATE SCHEMA quack; +SET search_path TO quack; -CREATE OR REPLACE FUNCTION quack.quack_am_handler(internal) - RETURNS table_am_handler - LANGUAGE C -AS 'MODULE_PATHNAME', 'quack_am_handler'; +CREATE OR REPLACE FUNCTION public.quack_setup_heap_replacement_scan(replacement_scan_vector INT8, query_desc INT8) + RETURNS VOID LANGUAGE C AS 'MODULE_PATHNAME'; -CREATE ACCESS METHOD quack - TYPE TABLE - HANDLER quack.quack_am_handler; \ No newline at end of file +CREATE TABLE replacement_scan_handler( + oid Oid NOT NULL, + name TEXT NOT NULL, + replacement_scan_cb TEXT NOT NULL +) WITH (user_catalog_table = true); + +INSERT INTO replacement_scan_handler + SELECT oid, amname, 'quack_setup_heap_replacement_scan' FROM pg_am WHERE amname = 'heap'; + +RESET search_path; \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3da930b..03bd9f1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,9 +1,7 @@ -ADD_LIBRARY(extension OBJECT quack_internal.cpp - quack_hooks.cpp - quack_scan.cpp - quack_tableam.cpp - quack_utility.cpp - quack_write_manager.cpp - quack.cpp) +ADD_LIBRARY(extension OBJECT quack.c + quack_hooks.c + quack_select.cpp + quack_heap_scan.cpp + quack_types.cpp) set(PROJECT_OBJECTS ${PROJECT_OBJECTS} "$" PARENT_SCOPE) diff --git a/src/quack.c b/src/quack.c new file mode 100644 index 0000000..c7d344e --- /dev/null +++ b/src/quack.c @@ -0,0 +1,21 @@ +#include "postgres.h" + +#include "utils/guc.h" + +#include "quack/quack.h" + +PG_MODULE_MAGIC; + +static void quack_init_guc(void); + +void +_PG_init(void) { + quack_init_guc(); + quack_init_hooks(); +} + +/* clang-format off */ +static void +quack_init_guc(void) { + +} \ No newline at end of file diff --git a/src/quack.cpp b/src/quack.cpp deleted file mode 100644 index cbe9ac5..0000000 --- a/src/quack.cpp +++ /dev/null @@ -1,75 +0,0 @@ -#include "quack/quack.hpp" -#include "duckdb/main/connection.hpp" - -extern "C" { - -#include -#include -#include -#include - -#include "postgres.h" - -#include "miscadmin.h" - -#include "access/amapi.h" -#include "commands/vacuum.h" -#include "utils/guc.h" - -PG_MODULE_MAGIC; - -char *quack_data_dir = NULL; - -static bool quack_check_data_directory(const char *dataDirectory); - -static void -quack_data_directory_assign_hook(const char *newval, void *extra) { - if (!quack_check_data_directory(newval)) { - if (mkdir(newval, S_IRWXU | S_IRWXG | S_IRWXO) == -1) { - int error = errno; - elog(ERROR, "Creating quack.data_dir failed with reason `%s`\n", strerror(error)); - } - elog(INFO, "Created %s as `quack.data_dir`", newval); - }; -} - -void -_PG_init(void) { - StringInfo quack_default_data_dir = makeStringInfo(); - appendStringInfo(quack_default_data_dir, "%s/quack/", DataDir); - - elog(INFO, "Initializing quack extension"); - DefineCustomStringVariable("quack.data_dir", gettext_noop("Quack storage data directory."), NULL, &quack_data_dir, - quack_default_data_dir->data, PGC_USERSET, GUC_IS_NAME, NULL, - quack_data_directory_assign_hook, NULL); - - quack_init_tableam(); - quack_init_hooks(); -} - -bool -quack_check_data_directory(const char *dataDirectory) { - struct stat info; - - if (lstat(dataDirectory, &info) != 0) { - if (errno == ENOENT) { - elog(WARNING, "Directory `%s` doesn't exists.", dataDirectory); - return false; - } else if (errno == EACCES) { - elog(ERROR, "Can't access `%s` directory.", dataDirectory); - } else { - elog(ERROR, "Other error when reading `%s`.", dataDirectory); - } - } - - if (!S_ISDIR(info.st_mode)) { - elog(WARNING, "`%s` is not directory.", dataDirectory); - } - - if (access(dataDirectory, R_OK | W_OK)) { - elog(ERROR, "Directory `%s` permission problem.", dataDirectory); - } - - return true; -} -} diff --git a/src/quack_scan.cpp b/src/quack_heap_scan.cpp similarity index 54% rename from src/quack_scan.cpp rename to src/quack_heap_scan.cpp index 646cf49..3cee184 100644 --- a/src/quack_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -7,11 +7,7 @@ #include "duckdb/parser/expression/columnref_expression.hpp" #include "duckdb/common/enums/expression_type.hpp" -#include "quack/quack.hpp" -#include "quack/quack_scan.hpp" - extern "C" { - #include "postgres.h" #include "miscadmin.h" @@ -25,6 +21,9 @@ extern "C" { #include "utils/builtins.h" } +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" + // Postgres Relation PostgresRelation::PostgresRelation(RangeTblEntry *table) : rel(RelationIdGetRelation(table->relid)) { @@ -50,14 +49,14 @@ PostgresRelation::PostgresRelation(PostgresRelation &&other) : rel(other.rel) { other.rel = nullptr; } -namespace duckdb { +namespace quack { // ------- Table Function ------- PostgresScanFunction::PostgresScanFunction() - : TableFunction("postgres_scan", {}, PostgresFunc, PostgresBind, PostgresInitGlobal, PostgresInitLocal) { - named_parameters["table"] = LogicalType::POINTER; - named_parameters["snapshot"] = LogicalType::POINTER; + : TableFunction("quack_postgres_scan", {}, PostgresFunc, PostgresBind, PostgresInitGlobal, PostgresInitLocal) { + named_parameters["table"] = duckdb::LogicalType::POINTER; + named_parameters["snapshot"] = duckdb::LogicalType::POINTER; } // Bind Data @@ -69,35 +68,10 @@ PostgresScanFunctionData::PostgresScanFunctionData(PostgresRelation &&relation, PostgresScanFunctionData::~PostgresScanFunctionData() { } -static LogicalType -PostgresToDuck(Oid type) { - switch (type) { - case BOOLOID: - return LogicalTypeId::BOOLEAN; - case CHAROID: - return LogicalTypeId::TINYINT; - case INT2OID: - return LogicalTypeId::SMALLINT; - case INT4OID: - return LogicalTypeId::INTEGER; - case INT8OID: - return LogicalTypeId::BIGINT; - case BPCHAROID: - case TEXTOID: - case VARCHAROID: - return LogicalTypeId::VARCHAR; - case DATEOID: - return LogicalTypeId::DATE; - case TIMESTAMPOID: - return LogicalTypeId::TIMESTAMP; - default: - elog(ERROR, "Unsupported quack type: %d", type); - } -} - -unique_ptr -PostgresScanFunction::PostgresBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { +duckdb::unique_ptr +PostgresScanFunction::PostgresBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names) { auto table = (reinterpret_cast(input.named_parameters["table"].GetPointer())); auto snapshot = (reinterpret_cast(input.named_parameters["snapshot"].GetPointer())); @@ -115,11 +89,10 @@ PostgresScanFunction::PostgresBind(ClientContext &context, TableFunctionBindInpu for (idx_t i = 0; i < column_count; i++) { Form_pg_attribute attr = &tupleDesc->attrs[i]; Oid type_oid = attr->atttypid; - auto col_name = string(NameStr(attr->attname)); - auto duck_type = PostgresToDuck(type_oid); + auto col_name = duckdb::string(NameStr(attr->attname)); + auto duck_type = ConvertPostgresToDuckColumnType(type_oid); return_types.push_back(duck_type); names.push_back(col_name); - /* Log column name and type */ elog(INFO, "Column name: %s, Type: %s", col_name.c_str(), duck_type.ToString().c_str()); } @@ -129,7 +102,7 @@ PostgresScanFunction::PostgresBind(ClientContext &context, TableFunctionBindInpu // These are the methods we need to interact with the table auto access_method_handler = GetTableAmRoutine(rel.GetRelation()->rd_amhandler); - return make_uniq(std::move(rel), snapshot); + return duckdb::make_uniq(std::move(rel), snapshot); } // Global State @@ -137,12 +110,12 @@ PostgresScanFunction::PostgresBind(ClientContext &context, TableFunctionBindInpu PostgresScanGlobalState::PostgresScanGlobalState() { } -unique_ptr -PostgresScanFunction::PostgresInitGlobal(ClientContext &context, TableFunctionInitInput &input) { +duckdb::unique_ptr +PostgresScanFunction::PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { auto &bind_data = input.bind_data->Cast(); (void)bind_data; // FIXME: we'll call 'parallelscan_initialize' here to initialize a parallel scan - return make_uniq(); + return duckdb::make_uniq(); } // Local State @@ -160,85 +133,32 @@ PostgresScanLocalState::~PostgresScanLocalState() { tableam->scan_end(scanDesc); } -unique_ptr -PostgresScanFunction::PostgresInitLocal(ExecutionContext &context, TableFunctionInitInput &input, - GlobalTableFunctionState *gstate) { +duckdb::unique_ptr +PostgresScanFunction::PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate) { auto &bind_data = input.bind_data->CastNoConst(); auto &relation = bind_data.relation; auto snapshot = bind_data.snapshot; - return make_uniq(relation, snapshot); -} - -template -static void -Append(Vector &result, T value, idx_t offset) { - auto data = FlatVector::GetData(result); - data[offset] = value; -} - -static void -AppendString(Vector &result, Datum value, idx_t offset) { - const char *text = VARDATA_ANY(value); - int len = VARSIZE_ANY_EXHDR(value); - string_t str(text, len); - - auto data = FlatVector::GetData(result); - data[offset] = StringVector::AddString(result, str); -} - -// The table scan function -static void -ConvertDatumToDuckDB(Datum value, Vector &result, idx_t offset) { - constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; - constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; - - switch (result.GetType().id()) { - case LogicalTypeId::BOOLEAN: - Append(result, DatumGetBool(value), offset); - break; - case LogicalTypeId::TINYINT: - Append(result, DatumGetChar(value), offset); - break; - case LogicalTypeId::SMALLINT: - Append(result, DatumGetInt16(value), offset); - break; - case LogicalTypeId::INTEGER: - Append(result, DatumGetInt32(value), offset); - break; - case LogicalTypeId::BIGINT: - Append(result, DatumGetInt64(value), offset); - break; - case LogicalTypeId::VARCHAR: - AppendString(result, value, offset); - break; - case LogicalTypeId::DATE: - Append(result, date_t(static_cast(value + QUACK_DUCK_DATE_OFFSET)), offset); - break; - case LogicalTypeId::TIMESTAMP: - Append(result, dtime_t(static_cast(value + QUACK_DUCK_TIMESTAMP_OFFSET)), offset); - break; - default: - elog(ERROR, "Unsupported quack type: %hhu", result.GetType().id()); - break; - } + return duckdb::make_uniq(relation, snapshot); } static void -InsertTupleIntoChunk(DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset) { +InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset) { for (int i = 0; i < tuple->natts; i++) { auto &result = output.data[i]; Datum value = slot_getattr(slot, i + 1, &slot->tts_isnull[i]); if (slot->tts_isnull[i]) { - auto &array_mask = FlatVector::Validity(result); + auto &array_mask = duckdb::FlatVector::Validity(result); array_mask.SetInvalid(offset); } else { - ConvertDatumToDuckDB(value, result, offset); + ConvertPostgresToDuckValue(value, result, offset); } } } void -PostgresScanFunction::PostgresFunc(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { +PostgresScanFunction::PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output) { auto &data = data_p.bind_data->CastNoConst(); auto &lstate = data_p.local_state->Cast(); auto &gstate = data_p.global_state->Cast(); @@ -267,7 +187,6 @@ PostgresScanFunction::PostgresFunc(ClientContext &context, TableFunctionInput &d output.SetCardinality(count); } -// ------- Replacement Scan ------- PostgresReplacementScanData::PostgresReplacementScanData(QueryDesc *desc) : desc(desc) { } @@ -275,7 +194,7 @@ PostgresReplacementScanData::~PostgresReplacementScanData() { } static RangeTblEntry * -FindMatchingRelation(List *tables, const string &to_find) { +FindMatchingRelation(List *tables, const duckdb::string &to_find) { ListCell *lc; foreach (lc, tables) { RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); @@ -289,7 +208,7 @@ FindMatchingRelation(List *tables, const string &to_find) { char *relName = RelationGetRelationName(rel); auto table_name = std::string(relName); - if (StringUtil::CIEquals(table_name, to_find)) { + if (duckdb::StringUtil::CIEquals(table_name, to_find)) { if (!rel->rd_amhandler) { // This doesn't have an access method handler, we cant read from this RelationClose(rel); @@ -304,35 +223,36 @@ FindMatchingRelation(List *tables, const string &to_find) { return nullptr; } -unique_ptr -PostgresReplacementScan(ClientContext &context, const string &table_name, ReplacementScanData *data) { +duckdb::unique_ptr +PostgresReplacementScan(duckdb::ClientContext &context, const duckdb::string &table_name, + duckdb::ReplacementScanData *data) { + auto &scan_data = reinterpret_cast(*data); auto tables = scan_data.desc->plannedstmt->rtable; auto table = FindMatchingRelation(tables, table_name); + if (!table) { - elog(ERROR, "Failed to find table %s in replacement scan lookup", table_name.c_str()); + elog(WARNING, "Failed to find table %s in replacement scan lookup", table_name.c_str()); return nullptr; } // Then inside the table function we can scan tuples from the postgres table and convert them into duckdb vectors. - auto table_function = make_uniq(); - vector> children; - - // This is done so they are received as named parameters to the function - // just so we don't have to use indices to refer to them later - // see PostgresBind for this - - // table = POINTER(table) - children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("table"), - make_uniq(Value::POINTER(CastPointerToValue(table))))); - // snapshot = POINTER(snapshot) - children.push_back(make_uniq( - ExpressionType::COMPARE_EQUAL, make_uniq("snapshot"), - make_uniq(Value::POINTER(CastPointerToValue(scan_data.desc->estate->es_snapshot))))); - table_function->function = make_uniq("postgres_scan", std::move(children)); + auto table_function = duckdb::make_uniq(); + duckdb::vector> children; + + children.push_back(duckdb::make_uniq( + duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("table"), + duckdb::make_uniq(duckdb::Value::POINTER(duckdb::CastPointerToValue(table))))); + + children.push_back(duckdb::make_uniq( + duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("snapshot"), + duckdb::make_uniq( + duckdb::Value::POINTER(duckdb::CastPointerToValue(scan_data.desc->estate->es_snapshot))))); + + table_function->function = duckdb::make_uniq("quack_postgres_scan", std::move(children)); + return std::move(table_function); } -} // namespace duckdb +} // namespace quack diff --git a/src/quack_hooks.c b/src/quack_hooks.c new file mode 100644 index 0000000..7f34b86 --- /dev/null +++ b/src/quack_hooks.c @@ -0,0 +1,33 @@ + + +#include "postgres.h" +#include "commands/extension.h" + +#include "quack/quack.h" +#include "quack/quack_select.h" + +static ExecutorRun_hook_type PrevExecutorRunHook = NULL; + +static bool +is_quack_extension_registered() { + return get_extension_oid("quack", true) != InvalidOid; +} + +static void +quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { + if (is_quack_extension_registered() && queryDesc->operation == CMD_SELECT) { + if (quack_execute_select(queryDesc, direction, count)) { + return; + } + } + + if (PrevExecutorRunHook) { + PrevExecutorRunHook(queryDesc, direction, count, execute_once); + } +} + +void +quack_init_hooks(void) { + PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun; + ExecutorRun_hook = quack_executor_run; +} \ No newline at end of file diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp deleted file mode 100644 index 4f4a764..0000000 --- a/src/quack_hooks.cpp +++ /dev/null @@ -1,161 +0,0 @@ -#include "quack/quack.hpp" -#include "quack/quack_scan.hpp" -#include "duckdb/parser/parsed_data/create_table_function_info.hpp" - -extern "C" { - -#include "postgres.h" - -#include "miscadmin.h" - -#include "access/tableam.h" -#include "executor/executor.h" -#include "parser/parse_type.h" -#include "tcop/utility.h" -#include "catalog/pg_type.h" -#include "utils/syscache.h" -#include "utils/builtins.h" - -} // extern "C" - -namespace duckdb { -static void QuackExecuteSelect(QueryDesc *query_desc, ScanDirection direction, uint64_t count); -} // namespace duckdb - -extern "C" { - -static ExecutorRun_hook_type PrevExecutorRunHook = NULL; -static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; - -static void -quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - if (queryDesc->operation == CMD_SELECT) { - duckdb::QuackExecuteSelect(queryDesc, direction, count); - return; - } - - if (PrevExecutorRunHook) { - PrevExecutorRunHook(queryDesc, direction, count, execute_once); - } -} - -static void -quack_process_utility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, - ParamListInfo params, struct QueryEnvironment *queryEnv, DestReceiver *dest, - QueryCompletion *completionTag) { - - Node *parsetree = pstmt->utilityStmt; - - if (IsA(parsetree, CreateStmt)) { - CreateStmt *create_stmt = (CreateStmt *)parsetree; - ListCell *lc; - - if (create_stmt->accessMethod && !memcmp(create_stmt->accessMethod, "quack", 5)) { - StringInfo create_table_str = makeStringInfo(); - bool first = true; - appendStringInfo(create_table_str, "CREATE TABLE %s (", create_stmt->relation->relname); - foreach (lc, create_stmt->tableElts) { - ColumnDef *def = (ColumnDef *)lfirst(lc); - Oid pg_oid = LookupTypeNameOid(NULL, def->typeName, true); - - if (first) { - first = false; - } else { - appendStringInfo(create_table_str, ", "); - } - - appendStringInfo(create_table_str, "%s %s", def->colname, quack_duckdb_type(pg_oid)); - } - appendStringInfo(create_table_str, ");"); - - duckdb::quack_execute_query(create_table_str->data); - } - } - - PrevProcessUtilityHook(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); -} - -void -quack_init_hooks(void) { - PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun; - ExecutorRun_hook = quack_executor_run; - - PrevProcessUtilityHook = ProcessUtility_hook ? ProcessUtility_hook : standard_ProcessUtility; - ProcessUtility_hook = quack_process_utility; -} -} - -namespace duckdb { - -static void -QuackExecuteSelect(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { - auto db = quack_open_database(MyDatabaseId, false); - db->instance->config.replacement_scans.emplace_back( - PostgresReplacementScan, make_uniq_base(query_desc)); - auto connection = quack_open_connection(*db); - - // Add the postgres_scan inserted by the replacement scan - auto &context = *connection->context; - PostgresScanFunction scan_fun; - CreateTableFunctionInfo scan_info(scan_fun); - - auto &catalog = Catalog::GetSystemCatalog(context); - context.transaction.BeginTransaction(); - catalog.CreateTableFunction(context, &scan_info); - context.transaction.Commit(); - - idx_t column_count; - - CmdType operation; - DestReceiver *dest; - - TupleTableSlot *slot = NULL; - - // FIXME: try-catch ? - auto res = connection->Query(query_desc->sourceText); - if (res->HasError()) { - } - - operation = query_desc->operation; - dest = query_desc->dest; - - dest->rStartup(dest, operation, query_desc->tupDesc); - - slot = MakeTupleTableSlot(query_desc->tupDesc, &TTSOpsHeapTuple); - column_count = res->ColumnCount(); - - while (true) { - - auto chunk = res->Fetch(); - if (!chunk || chunk->size() == 0) { - break; - } - - for (idx_t row = 0; row < chunk->size(); row++) { - ExecClearTuple(slot); - - for (idx_t col = 0; col < column_count; col++) { - auto value = chunk->GetValue(col, row); - if (value.IsNull()) { - slot->tts_isnull[col] = true; - } else { - slot->tts_isnull[col] = false; - quack_translate_value(slot, value, col); - } - } - ExecStoreVirtualTuple(slot); - dest->receiveSlot(slot, dest); - - for (idx_t i = 0; i < column_count; i++) { - if (slot->tts_tupleDescriptor->attrs[i].attbyval == false) { - pfree(DatumGetPointer(slot->tts_values[i])); - } - } - } - } - - dest->rShutdown(dest); - return; -} - -} // namespace duckdb diff --git a/src/quack_internal.cpp b/src/quack_internal.cpp deleted file mode 100644 index 5ff0a6a..0000000 --- a/src/quack_internal.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#include "duckdb.hpp" - -extern "C" const char * -quack_duckdb_version() { - return duckdb::DuckDB::LibraryVersion(); -} \ No newline at end of file diff --git a/src/quack_select.cpp b/src/quack_select.cpp new file mode 100644 index 0000000..b6f0e36 --- /dev/null +++ b/src/quack_select.cpp @@ -0,0 +1,107 @@ +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "fmgr.h" + +#include "access/genam.h" +#include "access/table.h" +#include "catalog/namespace.h" +#include "catalog/pg_proc.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/rel.h" + +#include "quack/quack_select.h" +} + +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" + +namespace quack { + +static duckdb::unique_ptr +quack_open_database() { + duckdb::DBConfig config; + return duckdb::make_uniq(nullptr, &config); +} + +} // namespace quack + +extern "C" bool +quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { + auto db = quack::quack_open_database(); + + db->instance->config.replacement_scans.emplace_back( + quack::PostgresReplacementScan, + duckdb::make_uniq_base(query_desc)); + auto connection = duckdb::make_uniq(*db); + + // Add the postgres_scan inserted by the replacement scan + auto &context = *connection->context; + quack::PostgresScanFunction scan_fun; + duckdb::CreateTableFunctionInfo scan_info_info(scan_fun); + + auto &catalog = duckdb::Catalog::GetSystemCatalog(context); + context.transaction.BeginTransaction(); + catalog.CreateTableFunction(context, &scan_info_info); + context.transaction.Commit(); + + idx_t column_count; + + CmdType operation; + DestReceiver *dest; + + TupleTableSlot *slot = NULL; + + // FIXME: try-catch ? + auto res = connection->Query(query_desc->sourceText); + if (res->HasError()) { + return false; + } + + operation = query_desc->operation; + dest = query_desc->dest; + + dest->rStartup(dest, operation, query_desc->tupDesc); + + slot = MakeTupleTableSlot(query_desc->tupDesc, &TTSOpsHeapTuple); + column_count = res->ColumnCount(); + + while (true) { + + auto chunk = res->Fetch(); + + if (!chunk || chunk->size() == 0) { + break; + } + + for (idx_t row = 0; row < chunk->size(); row++) { + ExecClearTuple(slot); + + for (idx_t col = 0; col < column_count; col++) { + auto value = chunk->GetValue(col, row); + if (value.IsNull()) { + slot->tts_isnull[col] = true; + } else { + slot->tts_isnull[col] = false; + quack::ConvertDuckToPostgresValue(slot, value, col); + } + } + + ExecStoreVirtualTuple(slot); + + dest->receiveSlot(slot, dest); + + for (idx_t i = 0; i < column_count; i++) { + if (slot->tts_tupleDescriptor->attrs[i].attbyval == false) { + pfree(DatumGetPointer(slot->tts_values[i])); + } + } + } + } + + dest->rShutdown(dest); + return true; +} \ No newline at end of file diff --git a/src/quack_tableam.cpp b/src/quack_tableam.cpp deleted file mode 100644 index 9ce1dc6..0000000 --- a/src/quack_tableam.cpp +++ /dev/null @@ -1,404 +0,0 @@ -#include "duckdb.hpp" -#include "quack/quack.hpp" - -extern "C" { - -#include "postgres.h" - -#include "access/detoast.h" -#include "access/relation.h" -#include "access/tableam.h" -#include "access/multixact.h" -#include "access/table.h" -#include "catalog/index.h" -#include "catalog/objectaccess.h" -#include "commands/vacuum.h" -#include "pgstat.h" -#include "nodes/execnodes.h" -#include "nodes/nodes.h" -#include "storage/lmgr.h" -#include "utils/memutils.h" -#include "utils/snapmgr.h" -#include "utils/relmapper.h" - -static const TupleTableSlotOps * -quack_slot_callbacks(Relation rel) { - return &TTSOpsVirtual; -} - -static TableScanDesc -quack_begin_scan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key, - ParallelTableScanDesc parallel_scan, uint32 flags) { - TableScanDesc scan = (TableScanDesc)palloc0(sizeof(TableScanDesc)); - - scan->rs_rd = rel; - scan->rs_snapshot = snapshot; - scan->rs_nkeys = 0; - scan->rs_key = key; - scan->rs_flags = flags; - scan->rs_parallel = parallel_scan; - - return scan; -} - -static void -quack_end_scan(TableScanDesc scan) { -} - -static void -quack_rescan(TableScanDesc scan, struct ScanKeyData *key, bool set_params, bool allow_strat, bool allow_sync, - bool allow_pagemode) { - ereport(ERROR, (errmsg("quack_scan_rescan is not implemented"))); -} - -static bool -quack_getnextslot(TableScanDesc scan, ScanDirection direction, TupleTableSlot *slot) { - return false; -} - -static Size -quack_parallelscan_estimate(Relation rel) { - elog(ERROR, "quack_parallelscan_estimate not implemented"); -} - -static Size -quack_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) { - elog(ERROR, "quack_parallelscan_initialize not implemented"); -} - -static void -quack_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) { - elog(ERROR, "quack_parallelscan_reinitialize not implemented"); -} - -static IndexFetchTableData * -quack_index_fetch_begin(Relation rel) { - elog(ERROR, "quack_index_fetch_begin not implemented"); -} - -static void -quack_index_fetch_reset(IndexFetchTableData *sscan) { - elog(ERROR, "quack_index_fetch_reset not implemented"); -} - -static void -quack_index_fetch_end(IndexFetchTableData *sscan) { - elog(ERROR, "quack_index_fetch_end not implemented"); -} - -static bool -quack_index_fetch_tuple(struct IndexFetchTableData *sscan, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, - bool *call_again, bool *all_dead) { - elog(ERROR, "quack_index_fetch_tuple not implemented"); -} - -static bool -quack_fetch_row_version(Relation relation, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot) { - elog(ERROR, "quack_fetch_row_version not implemented"); -} - -static void -quack_get_latest_tid(TableScanDesc sscan, ItemPointer tid) { - elog(ERROR, "quack_get_latest_tid not implemented"); -} - -static bool -quack_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) { - elog(ERROR, "quack_tuple_tid_valid not implemented"); -} - -static bool -quack_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snapshot) { - elog(ERROR, "quack_tuple_satisfies_snapshot not implemented"); -} - -static TransactionId -quack_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate) { - elog(ERROR, "quack_index_delete_tuples not implemented"); -} - -static void -quack_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, - struct BulkInsertStateData *bistate) { - duckdb::QuackWriteState *write_state = quack_init_write_state(rel, MyDatabaseId, GetCurrentSubTransactionId()); - - slot_getallattrs(slot); - - for (int i = 0; i < slot->tts_nvalid; i++) { - if (slot->tts_isnull[i]) { - write_state->appender->Append(nullptr); - } else { - duckdb::quack_append_value(*write_state->appender, slot->tts_tupleDescriptor->attrs[i].atttypid, - slot->tts_values[i]); - } - } - - write_state->appender->EndRow(); - - pgstat_count_heap_insert(rel, 1); -} - -static void -quack_tuple_insert_speculative(Relation rel, TupleTableSlot *slot, CommandId cid, int options, - struct BulkInsertStateData *bistate, uint32 specToken) { - ereport(ERROR, (errmsg("quack_tuple_insert_speculative is not implemented"))); -} - -static void -quack_tuple_complete_speculative(Relation rel, TupleTableSlot *slot, uint32 specToken, bool succeeded) { - ereport(ERROR, (errmsg("quack_tuple_complete_speculative is not implemented"))); -} - -static void -quack_multi_insert(Relation rel, TupleTableSlot **slots, int ntuples, CommandId cid, int options, - struct BulkInsertStateData *bistate) { - duckdb::QuackWriteState *write_state = quack_init_write_state(rel, MyDatabaseId, GetCurrentSubTransactionId()); - - for (int n = 0; n < ntuples; n++) { - TupleTableSlot *slot = slots[n]; - - slot_getallattrs(slot); - - for (int i = 0; i < slot->tts_nvalid; i++) { - if (slot->tts_isnull[i]) { - write_state->appender->Append(nullptr); - } else { - duckdb::quack_append_value(*write_state->appender, slot->tts_tupleDescriptor->attrs[i].atttypid, - slot->tts_values[i]); - } - } - - write_state->appender->EndRow(); - } - - pgstat_count_heap_insert(rel, ntuples); -} - -static TM_Result -quack_tuple_delete(Relation rel, ItemPointer tip, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) { - ereport(ERROR, (errmsg("quack_tuple_delete is not implemented"))); -} - -static TM_Result -quack_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, - Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - TU_UpdateIndexes *update_indexes) { - ereport(ERROR, (errmsg("quack_tuple_update is not implemented"))); -} - -static TM_Result -quack_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, - LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd) { - ereport(ERROR, (errmsg("quack_tuple_lock is not implemented"))); -} - -static void -quack_finish_bulk_insert(Relation rel, int options) { -} - -static void -quack_relation_set_new_filenode(Relation rel, const RelFileNumber newrnode, char persistence, TransactionId *freezeXid, - MultiXactId *minmulti) { - - if (persistence == RELPERSISTENCE_UNLOGGED) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Unlogged columnar tables are not supported"))); - } - - /* - * If existing and new relfilenode are different, that means the existing - * storage was dropped and we also need to clean up the metadata and - * state. If they are equal, this is a new relation object and we don't - * need to clean anything. - */ - if (RelationMapOidToFilenumber(rel->rd_id, false) != newrnode) { - // acropolis_drop_table(rel); - } -} - -static void -quack_relation_nontransactional_truncate(Relation rel) { - elog(ERROR, "quack_relation_nontransactional_truncate not implemented"); -} - -static void -quack_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) { - elog(ERROR, "quack_relation_copy_data not implemented"); -} - -static void -quack_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, - TransactionId OldestXmin, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, - double *num_tuples, double *tups_vacuumed, double *tups_recently_dead) { - elog(ERROR, "quack_relation_copy_for_cluster not implemented"); -} - -static void -quack_vacuum_rel(Relation rel, VacuumParams *params, BufferAccessStrategy bstrategy) { -} - -static bool -quack_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, BufferAccessStrategy bstrategy) { - elog(ERROR, "quack_scan_analyze_next_block not implemented"); -} - -static bool -quack_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, double *liverows, double *deadrows, - TupleTableSlot *slot) { - elog(ERROR, "quack_scan_analyze_next_tuple not implemented"); -} - -static double -quack_index_build_range_scan(Relation columnarRelation, Relation indexRelation, IndexInfo *indexInfo, bool allow_sync, - bool anyvisible, bool progress, BlockNumber start_blockno, BlockNumber numblocks, - IndexBuildCallback callback, void *callback_state, TableScanDesc scan) { - elog(ERROR, "quack_index_build_range_scan not implemented"); -} - -static void -quack_index_validate_scan(Relation columnarRelation, Relation indexRelation, IndexInfo *indexInfo, Snapshot snapshot, - ValidateIndexState *validateIndexState) { - elog(ERROR, "quack_index_validate_scan not implemented"); -} - -static bool -quack_relation_needs_toast_table(Relation rel) { - return false; -} - -static uint64 -quack_relation_size(Relation rel, ForkNumber forkNumber) { - return 4096 * BLCKSZ; -} - -static void -quack_estimate_rel_size(Relation rel, int32 *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac) { -} - -static bool -quack_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) { - elog(ERROR, "quack_scan_sample_next_block not implemented"); -} - -static bool -quack_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, TupleTableSlot *slot) { - elog(ERROR, "quack_scan_sample_next_tuple not implemented"); -} - -static const TableAmRoutine quack_am_methods = {.type = T_TableAmRoutine, - - .slot_callbacks = quack_slot_callbacks, - - .scan_begin = quack_begin_scan, - .scan_end = quack_end_scan, - .scan_rescan = quack_rescan, - .scan_getnextslot = quack_getnextslot, - - .parallelscan_estimate = quack_parallelscan_estimate, - .parallelscan_initialize = quack_parallelscan_initialize, - .parallelscan_reinitialize = quack_parallelscan_reinitialize, - - .index_fetch_begin = quack_index_fetch_begin, - .index_fetch_reset = quack_index_fetch_reset, - .index_fetch_end = quack_index_fetch_end, - .index_fetch_tuple = quack_index_fetch_tuple, - - .tuple_fetch_row_version = quack_fetch_row_version, - .tuple_tid_valid = quack_tuple_tid_valid, - .tuple_get_latest_tid = quack_get_latest_tid, - - .tuple_satisfies_snapshot = quack_tuple_satisfies_snapshot, - .index_delete_tuples = quack_index_delete_tuples, - - .tuple_insert = quack_tuple_insert, - .tuple_insert_speculative = quack_tuple_insert_speculative, - .tuple_complete_speculative = quack_tuple_complete_speculative, - .multi_insert = quack_multi_insert, - .tuple_delete = quack_tuple_delete, - .tuple_update = quack_tuple_update, - .tuple_lock = quack_tuple_lock, - .finish_bulk_insert = quack_finish_bulk_insert, - - .relation_nontransactional_truncate = - quack_relation_nontransactional_truncate, - .relation_copy_data = quack_relation_copy_data, - .relation_copy_for_cluster = quack_relation_copy_for_cluster, - .relation_vacuum = quack_vacuum_rel, - .scan_analyze_next_block = quack_scan_analyze_next_block, - .scan_analyze_next_tuple = quack_scan_analyze_next_tuple, - .index_build_range_scan = quack_index_build_range_scan, - .index_validate_scan = quack_index_validate_scan, - - .relation_size = quack_relation_size, - .relation_needs_toast_table = quack_relation_needs_toast_table, - - .relation_estimate_size = quack_estimate_rel_size, - - .scan_bitmap_next_block = NULL, - .scan_bitmap_next_tuple = NULL, - .scan_sample_next_block = quack_scan_sample_next_block, - .scan_sample_next_tuple = quack_scan_sample_next_tuple}; - -/* - * Implementation of the object_access_hook. - */ -static object_access_hook_type PrevObjectAccessHook = NULL; - -static void -TableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg) { - if (PrevObjectAccessHook) { - PrevObjectAccessHook(access, classId, objectId, subId, arg); - } - - /* dispatch to the proper action */ - if (access == OAT_DROP && classId == RelationRelationId && !OidIsValid(subId)) { - Relation rel; - rel = relation_open(objectId, AccessExclusiveLock); - - if (rel->rd_rel->relkind == RELKIND_RELATION && rel->rd_tableam == &quack_am_methods) { - } - relation_close(rel, NoLock); - } -} - -static void -QuackXactCallback(XactEvent event, void *arg) { - switch (event) { - case XACT_EVENT_COMMIT: - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_PREPARE: { - /* nothing to do */ - break; - } - - case XACT_EVENT_ABORT: - case XACT_EVENT_PARALLEL_ABORT: { - break; - } - - case XACT_EVENT_PRE_COMMIT: - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_PREPARE: { - quack_flush_write_state(GetCurrentSubTransactionId(), 0, true); - break; - } - } -} - -void -quack_init_tableam() { - RegisterXactCallback(QuackXactCallback, NULL); - object_access_hook = TableAMObjectAccessHook; -} - -const TableAmRoutine * -quack_get_table_am_routine(void) { - return &quack_am_methods; -} - -PG_FUNCTION_INFO_V1(quack_am_handler); -Datum -quack_am_handler(PG_FUNCTION_ARGS) { - PG_RETURN_POINTER(&quack_am_methods); -} -} diff --git a/src/quack_types.cpp b/src/quack_types.cpp new file mode 100644 index 0000000..9d52115 --- /dev/null +++ b/src/quack_types.cpp @@ -0,0 +1,154 @@ +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "catalog/pg_type.h" +#include "executor/tuptable.h" +} + +#include "quack/quack.h" + +namespace quack { + +// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 +constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; +constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; + +void +ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col) { + Oid oid = slot->tts_tupleDescriptor->attrs[col].atttypid; + + switch (oid) { + case BOOLOID: + slot->tts_values[col] = value.GetValue(); + break; + case CHAROID: + slot->tts_values[col] = value.GetValue(); + break; + case INT2OID: + slot->tts_values[col] = value.GetValue(); + break; + case INT4OID: + slot->tts_values[col] = value.GetValue(); + break; + case INT8OID: + slot->tts_values[col] = value.GetValue(); + break; + case BPCHAROID: + case TEXTOID: + case VARCHAROID: { + auto str = value.GetValue(); + auto varchar = str.c_str(); + auto varchar_len = str.size(); + + text *result = (text *)palloc0(varchar_len + VARHDRSZ); + SET_VARSIZE(result, varchar_len + VARHDRSZ); + memcpy(VARDATA(result), varchar, varchar_len); + slot->tts_values[col] = PointerGetDatum(result); + // FIXME: this doesn't need to be freed, the string_t is owned by the chunk right? + // duckdb_free(varchar); + break; + } + case DATEOID: { + duckdb::date_t date = value.GetValue(); + slot->tts_values[col] = date.days - QUACK_DUCK_DATE_OFFSET; + break; + } + case TIMESTAMPOID: { + duckdb::dtime_t timestamp = value.GetValue(); + slot->tts_values[col] = timestamp.micros - QUACK_DUCK_TIMESTAMP_OFFSET; + break; + } + case FLOAT8OID: + case NUMERICOID: { + double result_double = value.GetValue(); + slot->tts_tupleDescriptor->attrs[col].atttypid = FLOAT8OID; + slot->tts_tupleDescriptor->attrs[col].attbyval = true; + memcpy(&slot->tts_values[col], (char *)&result_double, sizeof(double)); + break; + } + default: + elog(ERROR, "Unsuported quack type: %d", oid); + } +} + +duckdb::LogicalType +ConvertPostgresToDuckColumnType(Oid type) { + switch (type) { + case BOOLOID: + return duckdb::LogicalTypeId::BOOLEAN; + case CHAROID: + return duckdb::LogicalTypeId::TINYINT; + case INT2OID: + return duckdb::LogicalTypeId::SMALLINT; + case INT4OID: + return duckdb::LogicalTypeId::INTEGER; + case INT8OID: + return duckdb::LogicalTypeId::BIGINT; + case BPCHAROID: + case TEXTOID: + case VARCHAROID: + return duckdb::LogicalTypeId::VARCHAR; + case DATEOID: + return duckdb::LogicalTypeId::DATE; + case TIMESTAMPOID: + return duckdb::LogicalTypeId::TIMESTAMP; + default: + elog(ERROR, "Unsupported quack type: %d", type); + } +} + +template +static void +Append(duckdb::Vector &result, T value, idx_t offset) { + auto data = duckdb::FlatVector::GetData(result); + data[offset] = value; +} + +static void +AppendString(duckdb::Vector &result, Datum value, idx_t offset) { + const char *text = VARDATA_ANY(value); + int len = VARSIZE_ANY_EXHDR(value); + duckdb::string_t str(text, len); + + auto data = duckdb::FlatVector::GetData(result); + data[offset] = duckdb::StringVector::AddString(result, str); +} + +void +ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { + + switch (result.GetType().id()) { + case duckdb::LogicalTypeId::BOOLEAN: + Append(result, DatumGetBool(value), offset); + break; + case duckdb::LogicalTypeId::TINYINT: + Append(result, DatumGetChar(value), offset); + break; + case duckdb::LogicalTypeId::SMALLINT: + Append(result, DatumGetInt16(value), offset); + break; + case duckdb::LogicalTypeId::INTEGER: + Append(result, DatumGetInt32(value), offset); + break; + case duckdb::LogicalTypeId::BIGINT: + Append(result, DatumGetInt64(value), offset); + break; + case duckdb::LogicalTypeId::VARCHAR: + AppendString(result, value, offset); + break; + case duckdb::LogicalTypeId::DATE: + Append(result, duckdb::date_t(static_cast(value + QUACK_DUCK_DATE_OFFSET)), offset); + break; + case duckdb::LogicalTypeId::TIMESTAMP: + Append(result, duckdb::dtime_t(static_cast(value + QUACK_DUCK_TIMESTAMP_OFFSET)), + offset); + break; + default: + elog(ERROR, "Unsupported quack type: %hhu", result.GetType().id()); + break; + } +} + +} // namespace quack diff --git a/src/quack_utility.cpp b/src/quack_utility.cpp deleted file mode 100644 index db7f1d6..0000000 --- a/src/quack_utility.cpp +++ /dev/null @@ -1,190 +0,0 @@ -#include "duckdb.hpp" -#include "quack/quack.hpp" - -extern "C" { - -#include "postgres.h" - -#include "miscadmin.h" - -#include "catalog/pg_type.h" -#include "utils/fmgrprotos.h" - -// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 -#define QUACK_DUCK_DATE_OFFSET 10957 -#define QUACK_DUCK_TIMESTAMP_OFFSET INT64CONST(10957) * USECS_PER_DAY - -static StringInfo -quack_database_path(Oid databaseOid) { - StringInfo str = makeStringInfo(); - appendStringInfo(str, "%s/%u.duckdb", quack_data_dir, databaseOid); - return str; -} - -const char * -quack_duckdb_type(Oid columnOid) { - switch (columnOid) { - case BOOLOID: - return "BOOLEAN"; - case CHAROID: - return "TINYINT"; - case INT2OID: - return "SMALLINT"; - case INT4OID: - return "INTEGER"; - case INT8OID: - return "INT8"; - case BPCHAROID: - case TEXTOID: - case VARCHAROID: - return "TEXT"; - case DATEOID: - return "DATE"; - case TIMESTAMPOID: - return "TIMESTAMP"; - default: - elog(ERROR, "Unsuported quack type: %d", columnOid); - } -} - -} // extern "C" - -namespace duckdb { - -void -quack_translate_value(TupleTableSlot *slot, Value &value, idx_t col) { - Oid oid = slot->tts_tupleDescriptor->attrs[col].atttypid; - - switch (oid) { - case BOOLOID: - slot->tts_values[col] = value.GetValue(); - break; - case CHAROID: - slot->tts_values[col] = value.GetValue(); - break; - case INT2OID: - slot->tts_values[col] = value.GetValue(); - break; - case INT4OID: - slot->tts_values[col] = value.GetValue(); - break; - case INT8OID: - slot->tts_values[col] = value.GetValue(); - break; - case BPCHAROID: - case TEXTOID: - case VARCHAROID: { - auto str = value.GetValue(); - auto varchar = str.c_str(); - auto varchar_len = str.size(); - - text *result = (text *)palloc0(varchar_len + VARHDRSZ); - SET_VARSIZE(result, varchar_len + VARHDRSZ); - memcpy(VARDATA(result), varchar, varchar_len); - slot->tts_values[col] = PointerGetDatum(result); - // FIXME: this doesn't need to be freed, the string_t is owned by the chunk right? - // duckdb_free(varchar); - break; - } - case DATEOID: { - date_t date = value.GetValue(); - slot->tts_values[col] = date.days - QUACK_DUCK_DATE_OFFSET; - break; - } - case TIMESTAMPOID: { - dtime_t timestamp = value.GetValue(); - slot->tts_values[col] = timestamp.micros - QUACK_DUCK_TIMESTAMP_OFFSET; - break; - } - case FLOAT8OID: - case NUMERICOID: { - double result_double = value.GetValue(); - slot->tts_tupleDescriptor->attrs[col].atttypid = FLOAT8OID; - slot->tts_tupleDescriptor->attrs[col].attbyval = true; - memcpy(&slot->tts_values[col], (char *)&result_double, sizeof(double)); - break; - } - default: - elog(ERROR, "Unsuported quack type: %d", oid); - } -} - -void -quack_execute_query(const char *query) { - auto db = quack_open_database(MyDatabaseId, true); - Connection connection(*db); - - std::string query_string = std::string(query); - auto res = connection.Query(query_string); - // FIME: res.HasError() ?? -} - -unique_ptr -quack_create_appender(Connection &connection, const char *tableName) { - // FIXME: try-catch ? - return make_uniq(connection, "", std::string(tableName)); -} - -void -quack_append_value(Appender &appender, Oid columnOid, Datum value) { - switch (columnOid) { - case BOOLOID: - appender.Append(value); - break; - case CHAROID: - appender.Append(value); - break; - case INT2OID: - appender.Append(value); - break; - case INT4OID: - appender.Append(value); - break; - case INT8OID: - appender.Append(value); - break; - case BPCHAROID: - case TEXTOID: - case VARCHAROID: { - const char *text = VARDATA_ANY(value); - int len = VARSIZE_ANY_EXHDR(value); - string_t str(text, len); - appender.Append(str); - break; - } - case DATEOID: { - date_t date(static_cast(value + QUACK_DUCK_DATE_OFFSET)); - appender.Append(date); - break; - } - case TIMESTAMPOID: { - dtime_t timestamp(static_cast(value + QUACK_DUCK_TIMESTAMP_OFFSET)); - appender.Append(timestamp); - break; - } - default: - elog(ERROR, "Unsuported quack type: %d", columnOid); - } -} - -unique_ptr -quack_open_database(Oid databaseOid, bool preserveInsertOrder) { - /* Set lock for relation until transaction ends */ - DirectFunctionCall1(pg_advisory_xact_lock_int8, Int64GetDatum((int64)databaseOid)); - - DBConfig config; - config.SetOptionByName("preserve_insertion_order", Value::BOOLEAN(false)); - - StringInfo database_path = quack_database_path(databaseOid); - - // FIXME: Does this need try-catch? - return make_uniq(database_path->data, &config); -} - -unique_ptr -quack_open_connection(DuckDB database) { - // FIXME try-catch ? - return make_uniq(database); -} - -} // namespace duckdb diff --git a/src/quack_write_manager.cpp b/src/quack_write_manager.cpp deleted file mode 100644 index e1539b9..0000000 --- a/src/quack_write_manager.cpp +++ /dev/null @@ -1,134 +0,0 @@ -#include "quack/quack.hpp" - -extern "C" { - -#include "postgres.h" - -#include "access/heaptoast.h" -#include "common/hashfn.h" -#include "executor/executor.h" -#include "utils/relmapper.h" - -static HTAB *quack_write_state_map = NULL; -static MemoryContext quack_write_state_context = NULL; - -typedef struct SubXidWriteState { - SubTransactionId subXid; - duckdb::QuackWriteState *quack_write_state_entry; - struct SubXidWriteState *next; -} SubXidWriteState; - -typedef struct QuackWriteStateMapEntry { - Oid rel_node; - SubXidWriteState *write_state_stack; -} QuackWriteStateMapEntry; - -static MemoryContextCallback cleanup_callback; - -static void -cleanup_write_state_map(void *arg) { - quack_write_state_map = NULL; - quack_write_state_context = NULL; -} - -void -quack_flush_write_state(SubTransactionId currentSubXid, SubTransactionId parentSubXid, bool commit) { - HASH_SEQ_STATUS status; - QuackWriteStateMapEntry *entry; - - if (quack_write_state_map == NULL) { - return; - } - - hash_seq_init(&status, quack_write_state_map); - - while ((entry = (QuackWriteStateMapEntry *)hash_seq_search(&status)) != 0) { - SubXidWriteState *stack_head = entry->write_state_stack; - - if (entry->write_state_stack == NULL) { - continue; - } - - if (stack_head->subXid == currentSubXid) { - if (commit) { - duckdb::QuackWriteState *quack_write_state = stack_head->quack_write_state_entry; - quack_write_state->appender->Close(); - quack_write_state->appender.reset(); - quack_write_state->database.reset(); - quack_write_state->connection.reset(); - } - - entry->write_state_stack = stack_head->next; - } - } -} -} - -duckdb::QuackWriteState * -quack_init_write_state(Relation relation, Oid databaseOid, SubTransactionId currentSubXid) { - bool found; - QuackWriteStateMapEntry *hash_entry = NULL; - SubXidWriteState *stack_entry = NULL; - duckdb::QuackWriteState *quack_write_state = NULL; - MemoryContext oldContext = NULL; - - if (quack_write_state_map == NULL) { - HASHCTL info; - uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - quack_write_state_context = - AllocSetContextCreate(TopTransactionContext, "Quack Write State context", ALLOCSET_DEFAULT_SIZES); - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(Oid); - info.hash = oid_hash; - info.entrysize = sizeof(QuackWriteStateMapEntry); - info.hcxt = quack_write_state_context; - - quack_write_state_map = hash_create("quack cache map", 64, &info, hashFlags); - - cleanup_callback.arg = NULL; - cleanup_callback.func = &cleanup_write_state_map; - cleanup_callback.next = NULL; - MemoryContextRegisterResetCallback(quack_write_state_context, &cleanup_callback); - } - - // FIXME: not sure what the effects of the 'shared' parameter are - auto fileNumber = RelationMapOidToFilenumber(relation->rd_id, /*shared=*/false); - hash_entry = (QuackWriteStateMapEntry *)hash_search(quack_write_state_map, &fileNumber, HASH_ENTER, &found); - - if (!found) { - hash_entry->write_state_stack = NULL; - } - - if (hash_entry->write_state_stack != NULL) { - SubXidWriteState *stackHead = hash_entry->write_state_stack; - - if (stackHead->subXid == currentSubXid) { - return stackHead->quack_write_state_entry; - } - } - - oldContext = MemoryContextSwitchTo(quack_write_state_context); - - if (stack_entry == NULL) { - stack_entry = (SubXidWriteState *)palloc0(sizeof(SubXidWriteState)); - stack_entry->subXid = currentSubXid; - stack_entry->next = hash_entry->write_state_stack; - hash_entry->write_state_stack = stack_entry; - } - - quack_write_state = (duckdb::QuackWriteState *)palloc0(sizeof(duckdb::QuackWriteState)); - - quack_write_state->rel_node = RelationMapOidToFilenumber(relation->rd_id, false); - quack_write_state->database = duckdb::quack_open_database(databaseOid, true); - quack_write_state->connection = quack_open_connection(*quack_write_state->database); - quack_write_state->appender = quack_create_appender(*quack_write_state->connection, relation->rd_rel->relname.data); - quack_write_state->row_count = 0; - - stack_entry->quack_write_state_entry = quack_write_state; - - MemoryContextSwitchTo(oldContext); - - return quack_write_state; -}