From d0fc7316187e9b64d369f5a110a34803bf45eec0 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Thu, 11 Apr 2024 13:02:48 +0200 Subject: [PATCH] PostgreSQL memory allocator and muli thread scan --- Makefile | 18 +- include/quack/quack.h | 1 + include/quack/quack_heap_relation.hpp | 37 ++++ include/quack/quack_heap_scan.hpp | 94 +++++---- include/quack/quack_memory_allocator.hpp | 17 ++ src/CMakeLists.txt | 7 - src/quack.cpp | 15 +- src/quack_heap_relation.cpp | 155 +++++++++++++++ src/quack_heap_scan.cpp | 234 +++++++++++------------ src/quack_memory_allocator.cpp | 27 +++ src/quack_select.cpp | 14 +- src/quack_types.cpp | 2 +- 12 files changed, 425 insertions(+), 196 deletions(-) create mode 100644 include/quack/quack_heap_relation.hpp create mode 100644 include/quack/quack_memory_allocator.hpp delete mode 100644 src/CMakeLists.txt create mode 100644 src/quack_heap_relation.cpp create mode 100644 src/quack_memory_allocator.cpp diff --git a/Makefile b/Makefile index 73ee3f9..2f2fde8 100644 --- a/Makefile +++ b/Makefile @@ -4,11 +4,13 @@ MODULE_big = quack EXTENSION = quack DATA = quack.control $(wildcard quack--*.sql) -SRCS = src/quack_heap_scan.cpp \ - src/quack_hooks.cpp \ - src/quack_select.cpp \ - src/quack_types.cpp \ - src/quack.cpp +SRCS = src/quack_heap_relation.cpp \ + src/quack_heap_scan.cpp \ + src/quack_hooks.cpp \ + src/quack_select.cpp \ + src/quack_types.cpp \ + src/quack_memory_allocator.cpp \ + src/quack.cpp OBJS = $(subst .cpp,.o, $(SRCS)) @@ -18,8 +20,8 @@ PG_CONFIG ?= pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) PG_LIB := $(shell $(PG_CONFIG) --pkglibdir) -PG_CPPFLAGS := -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -L$(PG_LIB) -lduckdb -Lthird_party/build/src -lc++ +PG_CPPFLAGS := -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -O0 -g +SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -L$(PG_LIB) -lduckdb -Lthird_party/build/src #-lc++ # determine the name of the duckdb library that is built UNAME_S := $(shell uname -s) @@ -43,7 +45,7 @@ third_party/build/src/$(DUCKDB_LIB): cd third_party && \ mkdir build && \ cd build && \ - cmake ../duckdb && \ + cmake -DENABLE_SANITIZER=FALSE -DENABLE_UBSAN=FALSE -DBUILD_UNITTESTS=ON -DCMAKE_BUILD_TYPE=Debug ../duckdb && \ make install_duckdb: diff --git a/include/quack/quack.h b/include/quack/quack.h index d98ff3e..647c249 100644 --- a/include/quack/quack.h +++ b/include/quack/quack.h @@ -1,6 +1,7 @@ #pragma once // quack.c +extern int quack_max_threads_per_query; extern "C" void _PG_init(void); // quack_hooks.c diff --git a/include/quack/quack_heap_relation.hpp b/include/quack/quack_heap_relation.hpp new file mode 100644 index 0000000..0d4abb0 --- /dev/null +++ b/include/quack/quack_heap_relation.hpp @@ -0,0 +1,37 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "access/tableam.h" +#include "access/heapam.h" +} + +#include + +namespace quack { + +class PostgresHeapRelation { +public: + PostgresHeapRelation(RangeTblEntry *table); + ~PostgresHeapRelation(); + PostgresHeapRelation(const PostgresHeapRelation &other) = delete; + PostgresHeapRelation &operator=(const PostgresHeapRelation &other) = delete; + PostgresHeapRelation &operator=(PostgresHeapRelation &&other) = delete; + PostgresHeapRelation(PostgresHeapRelation &&other); + +public: + Relation GetRelation(); + TupleDesc GetTupleDesc(); + bool GetNextTuple(TableScanDesc scan, ScanDirection direction, TupleTableSlot *slot); + bool IsValid() const; + +private: + void GetPageTuple(HeapScanDesc scan, ScanDirection dir, int nkeys, ScanKey key); + BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir); + +private: + Relation rel = nullptr; + std::mutex lock; +}; + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index f564b99..ba116eb 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -9,99 +9,91 @@ extern "C" { #include "access/relscan.h" } -// Postgres Relation - -class PostgresRelation { -public: - PostgresRelation(RangeTblEntry *table); - ~PostgresRelation(); - PostgresRelation(const PostgresRelation &other) = delete; - PostgresRelation &operator=(const PostgresRelation &other) = delete; - PostgresRelation &operator=(PostgresRelation &&other) = delete; - PostgresRelation(PostgresRelation &&other); +#include "quack/quack.h" +#include "quack/quack_heap_relation.hpp" -public: - Relation GetRelation(); - bool IsValid() const; +// Postgres Relation -private: - Relation rel = nullptr; -}; namespace quack { -// Local State - -struct PostgresScanLocalState : public duckdb::LocalTableFunctionState { +struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { public: - PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot); - ~PostgresScanLocalState() override; + PostgresHeapScanLocalState(ParallelBlockTableScanDesc scan, PostgresHeapRelation &relation, Snapshot snapshot); + ~PostgresHeapScanLocalState() override; public: - TableScanDesc scanDesc = nullptr; - const struct TableAmRoutine *tableam; + PostgresHeapRelation & rel; + TupleTableSlot *slot = nullptr; + TableScanDesc thread_scan_desc = nullptr; bool exhausted_scan = false; }; // Global State -struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresScanGlobalState(); - std::mutex lock; -}; +struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState { + explicit PostgresHeapScanGlobalState(PostgresHeapRelation &relation, Snapshot snapshot); + ~PostgresHeapScanGlobalState(); + idx_t + MaxThreads() const override { + return quack_max_threads_per_query; + } -// Bind Data +public: + ParallelBlockTableScanDescData parallel_block_scan_desc; +}; -struct PostgresScanFunctionData : public duckdb::TableFunctionData { +struct PostgresHeapScanFunctionData : public duckdb::TableFunctionData { public: - PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot); - ~PostgresScanFunctionData() override; + PostgresHeapScanFunctionData(PostgresHeapRelation &&relation, Snapshot snapshot); + ~PostgresHeapScanFunctionData() override; public: - PostgresRelation relation; + PostgresHeapRelation relation; Snapshot snapshot; }; -// ------- Table Function ------- - -struct PostgresScanFunction : public duckdb::TableFunction { +struct PostgresHeapScanFunction : public duckdb::TableFunction { public: - PostgresScanFunction(); + PostgresHeapScanFunction(); public: - static duckdb::unique_ptr PostgresBind(duckdb::ClientContext &context, - duckdb::TableFunctionBindInput &input, - duckdb::vector &return_types, - duckdb::vector &names); + static duckdb::unique_ptr PostgresHeapBind(duckdb::ClientContext &context, + duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names); static duckdb::unique_ptr - PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); + PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); static duckdb::unique_ptr - PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, - duckdb::GlobalTableFunctionState *gstate); + PostgresHeapInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate); // static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p); // static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); - static void PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output); + static void PostgresHeapFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output); // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void // PostgresSerialize(Serializer &serializer, const optional_ptr bind_data, const TableFunction // &function); +public: + static void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset); }; -struct PostgresReplacementScanData : public duckdb::ReplacementScanData { +struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData { public: - PostgresReplacementScanData(QueryDesc *desc); - ~PostgresReplacementScanData() override; + PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) { + } + ~PostgresHeapReplacementScanData() override {}; public: QueryDesc *desc; }; -duckdb::unique_ptr PostgresReplacementScan(duckdb::ClientContext &context, - const duckdb::string &table_name, - duckdb::ReplacementScanData *data); +duckdb::unique_ptr PostgresHeapReplacementScan(duckdb::ClientContext &context, + const duckdb::string &table_name, + duckdb::ReplacementScanData *data); } // namespace quack diff --git a/include/quack/quack_memory_allocator.hpp b/include/quack/quack_memory_allocator.hpp new file mode 100644 index 0000000..66a0a8c --- /dev/null +++ b/include/quack/quack_memory_allocator.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "duckdb/common/allocator.hpp" + +namespace quack { + +struct QuackAllocatorData : public duckdb::PrivateAllocatorData { + explicit QuackAllocatorData() { + } +}; + +duckdb::data_ptr_t QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size); +void QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t ptr, duckdb::idx_t idx); +duckdb::data_ptr_t QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, + duckdb::idx_t old_size, duckdb::idx_t size); + +} // namespace quack \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt deleted file mode 100644 index 03bd9f1..0000000 --- a/src/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -ADD_LIBRARY(extension OBJECT quack.c - quack_hooks.c - quack_select.cpp - quack_heap_scan.cpp - quack_types.cpp) - -set(PROJECT_OBJECTS ${PROJECT_OBJECTS} "$" PARENT_SCOPE) diff --git a/src/quack.cpp b/src/quack.cpp index fed1f38..ff9d2e5 100644 --- a/src/quack.cpp +++ b/src/quack.cpp @@ -8,6 +8,8 @@ extern "C" { static void quack_init_guc(void); +int quack_max_threads_per_query = 1; + extern "C" { PG_MODULE_MAGIC; @@ -21,5 +23,16 @@ _PG_init(void) { /* clang-format off */ static void quack_init_guc(void) { - + DefineCustomIntVariable("quack.max_threads_per_query", + gettext_noop("DuckDB max no. threads per query."), + NULL, + &quack_max_threads_per_query, + 1, + 1, + 32, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); } \ No newline at end of file diff --git a/src/quack_heap_relation.cpp b/src/quack_heap_relation.cpp new file mode 100644 index 0000000..9523e95 --- /dev/null +++ b/src/quack_heap_relation.cpp @@ -0,0 +1,155 @@ +extern "C" { +#include "postgres.h" +#include "pgstat.h" +#include "access/valid.h" +#include "access/heapam.h" +#include "storage/bufmgr.h" +} + +#include "quack/quack_heap_relation.hpp" + +namespace quack { + +PostgresHeapRelation::PostgresHeapRelation(RangeTblEntry *table) : rel(RelationIdGetRelation(table->relid)) { +} + +PostgresHeapRelation::~PostgresHeapRelation() { + if (IsValid()) { + RelationClose(rel); + } +} + +PostgresHeapRelation::PostgresHeapRelation(PostgresHeapRelation &&other) : rel(other.rel) { + other.rel = nullptr; +} + +Relation +PostgresHeapRelation::GetRelation() { + return rel; +} + +bool +PostgresHeapRelation::IsValid() const { + return RelationIsValid(rel); +} + +TupleDesc +PostgresHeapRelation::GetTupleDesc() { + return RelationGetDescr(rel); +} + +BlockNumber +PostgresHeapRelation::heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) { + + /* When there are no pages to scan, return InvalidBlockNumber */ + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + return InvalidBlockNumber; + + /* parallel scan */ + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc)scan->rs_base.rs_parallel); + + /* may return InvalidBlockNumber if there are no more blocks */ + return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc)scan->rs_base.rs_parallel); +} + +void +PostgresHeapRelation::GetPageTuple(HeapScanDesc scan, ScanDirection dir, int nkeys, ScanKey key) { + + HeapTuple tuple = &(scan->rs_ctup); + BlockNumber block; + Page page; + int lineindex; + int linesleft; + + if (unlikely(!scan->rs_inited)) { + block = heapgettup_initial_block(scan, dir); + /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ + Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); + scan->rs_inited = true; + } else { + /* continue from previously returned page/tuple */ + block = scan->rs_cblock; /* current page */ + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); + + lineindex = scan->rs_cindex + dir; + if (ScanDirectionIsForward(dir)) + linesleft = scan->rs_ntuples - lineindex; + else + linesleft = scan->rs_cindex; + /* lineindex now references the next or previous visible tid */ + + goto continue_page; + } + + /* + * advance the scan until we find a qualifying tuple or run out of stuff + * to scan + */ + while (block != InvalidBlockNumber) { + heapgetpage((TableScanDesc)scan, block); + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); + linesleft = scan->rs_ntuples; + lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; + + /* lineindex now references the next or previous visible tid */ + continue_page: + + for (; linesleft > 0; linesleft--, lineindex += dir) { + ItemId lpp; + OffsetNumber lineoff; + + lineoff = scan->rs_vistuples[lineindex]; + lpp = PageGetItemId(page, lineoff); + Assert(ItemIdIsNormal(lpp)); + + tuple->t_data = (HeapTupleHeader)PageGetItem(page, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), block, lineoff); + + /* skip any tuples that don't match the scan key */ + if (key != NULL && !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), nkeys, key)) + continue; + + scan->rs_cindex = lineindex; + return; + } + + /* Forward & Parallel Scan */ + lock.lock(); + block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc)scan->rs_base.rs_parallel); + lock.unlock(); + } + + /* end of scan */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; +} + +bool +PostgresHeapRelation::GetNextTuple(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { + + HeapScanDesc scan = (HeapScanDesc)sscan; + + GetPageTuple(scan, direction, sscan->rs_nkeys, sscan->rs_key); + + if (scan->rs_ctup.t_data == NULL) { + ExecClearTuple(slot); + return false; + } + + pgstat_count_heap_getnext(scan->rs_base.rs_rd); + ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, scan->rs_cbuf); + return true; +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 3cee184..b6ea4ab 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -19,74 +19,94 @@ extern "C" { #include "catalog/pg_type.h" #include "utils/syscache.h" #include "utils/builtins.h" +#include "utils/snapmgr.h" } #include "quack/quack_heap_scan.hpp" #include "quack/quack_types.hpp" -// Postgres Relation -PostgresRelation::PostgresRelation(RangeTblEntry *table) : rel(RelationIdGetRelation(table->relid)) { -} +namespace quack { -PostgresRelation::~PostgresRelation() { - if (IsValid()) { - RelationClose(rel); - } -} +// +// PostgresHeapScanFunctionData +// -Relation -PostgresRelation::GetRelation() { - return rel; -} - -bool -PostgresRelation::IsValid() const { - return RelationIsValid(rel); +PostgresHeapScanFunctionData::PostgresHeapScanFunctionData(PostgresHeapRelation &&relation, Snapshot snapshot) + : relation(std::move(relation)), snapshot(snapshot) { } -PostgresRelation::PostgresRelation(PostgresRelation &&other) : rel(other.rel) { - other.rel = nullptr; +PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { } -namespace quack { +// +// PostgresHeapScanGlobalState +// -// ------- Table Function ------- +PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapRelation &relation, Snapshot snapshot) { + Relation rel = relation.GetRelation(); + rel->rd_tableam->parallelscan_initialize(rel, ¶llel_block_scan_desc.base); + parallel_block_scan_desc.base.phs_snapshot_off = 0; + if (IsMVCCSnapshot(snapshot)) { + parallel_block_scan_desc.base.phs_snapshot_any = false; + } else { + Assert(snapshot == SnapshotAny); + parallel_block_scan_desc.base.phs_snapshot_any = true; + } +} -PostgresScanFunction::PostgresScanFunction() - : TableFunction("quack_postgres_scan", {}, PostgresFunc, PostgresBind, PostgresInitGlobal, PostgresInitLocal) { - named_parameters["table"] = duckdb::LogicalType::POINTER; - named_parameters["snapshot"] = duckdb::LogicalType::POINTER; +PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { + elog(WARNING, "~PostgresScanGlobalState"); } -// Bind Data +// +// PostgresHeapScanLocalState +// -PostgresScanFunctionData::PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot) - : relation(std::move(relation)), snapshot(snapshot) { +PostgresHeapScanLocalState::PostgresHeapScanLocalState(ParallelBlockTableScanDesc pscan, PostgresHeapRelation &relation, + Snapshot snapshot) + : rel(relation) { + auto rel = relation.GetRelation(); + uint32 flags = SO_TYPE_SEQSCAN | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE; + if (pscan->base.phs_snapshot_any) { + snapshot = SnapshotAny; + } + thread_scan_desc = rel->rd_tableam->scan_begin(rel, snapshot, 0, NULL, &pscan->base, flags); + slot = table_slot_create(rel, NULL); +} +PostgresHeapScanLocalState::~PostgresHeapScanLocalState() { + elog(WARNING, "~PostgresScanLocalState"); + ExecDropSingleTupleTableSlot(slot); + GetTableAmRoutine(rel.GetRelation()->rd_amhandler)->scan_end(thread_scan_desc); } -PostgresScanFunctionData::~PostgresScanFunctionData() { +// +// PostgresHeapScanFunction +// + +PostgresHeapScanFunction::PostgresHeapScanFunction() + : TableFunction("postgres_heap_scan", {}, PostgresHeapFunc, PostgresHeapBind, PostgresHeapInitGlobal, + PostgresHeapInitLocal) { + named_parameters["table"] = duckdb::LogicalType::POINTER; + named_parameters["snapshot"] = duckdb::LogicalType::POINTER; } duckdb::unique_ptr -PostgresScanFunction::PostgresBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, - duckdb::vector &return_types, - duckdb::vector &names) { +PostgresHeapScanFunction::PostgresHeapBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names) { auto table = (reinterpret_cast(input.named_parameters["table"].GetPointer())); auto snapshot = (reinterpret_cast(input.named_parameters["snapshot"].GetPointer())); - D_ASSERT(table->relid); - auto rel = PostgresRelation(table); - + auto rel = PostgresHeapRelation(table); auto tupleDesc = RelationGetDescr(rel.GetRelation()); + if (!tupleDesc) { elog(ERROR, "Failed to get tuple descriptor for relation with OID %u", table->relid); return nullptr; } - int column_count = tupleDesc->natts; - - for (idx_t i = 0; i < column_count; i++) { + for (int i = 0; i < tupleDesc->natts; i++) { Form_pg_attribute attr = &tupleDesc->attrs[i]; Oid type_oid = attr->atttypid; auto col_name = duckdb::string(NameStr(attr->attname)); @@ -94,107 +114,73 @@ PostgresScanFunction::PostgresBind(duckdb::ClientContext &context, duckdb::Table return_types.push_back(duck_type); names.push_back(col_name); /* Log column name and type */ - elog(INFO, "Column name: %s, Type: %s", col_name.c_str(), duck_type.ToString().c_str()); + elog(INFO, "-- Column name: %s, Type: %s -- ", col_name.c_str(), duck_type.ToString().c_str()); } - // FIXME: check this in the replacement scan - D_ASSERT(rel.GetRelation()->rd_amhandler != 0); - // These are the methods we need to interact with the table - auto access_method_handler = GetTableAmRoutine(rel.GetRelation()->rd_amhandler); - - return duckdb::make_uniq(std::move(rel), snapshot); -} - -// Global State - -PostgresScanGlobalState::PostgresScanGlobalState() { + return duckdb::make_uniq(std::move(rel), snapshot); } duckdb::unique_ptr -PostgresScanFunction::PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { - auto &bind_data = input.bind_data->Cast(); - (void)bind_data; - // FIXME: we'll call 'parallelscan_initialize' here to initialize a parallel scan - return duckdb::make_uniq(); -} - -// Local State - -PostgresScanLocalState::PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot) { - auto rel = relation.GetRelation(); - tableam = rel->rd_tableam; - - // Initialize the scan state - uint32 flags = SO_TYPE_SEQSCAN | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE; - scanDesc = rel->rd_tableam->scan_begin(rel, snapshot, 0, NULL, NULL, flags); -} -PostgresScanLocalState::~PostgresScanLocalState() { - // Close the scan state - tableam->scan_end(scanDesc); +PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context, + duckdb::TableFunctionInitInput &input) { + auto &bind_data = input.bind_data->CastNoConst(); + return duckdb::make_uniq(bind_data.relation, bind_data.snapshot); } duckdb::unique_ptr -PostgresScanFunction::PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, - duckdb::GlobalTableFunctionState *gstate) { - auto &bind_data = input.bind_data->CastNoConst(); - auto &relation = bind_data.relation; - auto snapshot = bind_data.snapshot; - return duckdb::make_uniq(relation, snapshot); -} - -static void -InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset) { - for (int i = 0; i < tuple->natts; i++) { - auto &result = output.data[i]; - Datum value = slot_getattr(slot, i + 1, &slot->tts_isnull[i]); - if (slot->tts_isnull[i]) { - auto &array_mask = duckdb::FlatVector::Validity(result); - array_mask.SetInvalid(offset); - } else { - ConvertPostgresToDuckValue(value, result, offset); - } - } +PostgresHeapScanFunction::PostgresHeapInitLocal(duckdb::ExecutionContext &context, + duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate) { + auto &bind_data = input.bind_data->CastNoConst(); + auto &g_state = gstate->Cast(); + return duckdb::make_uniq(&g_state.parallel_block_scan_desc, bind_data.relation, + bind_data.snapshot); } void -PostgresScanFunction::PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output) { - auto &data = data_p.bind_data->CastNoConst(); - auto &lstate = data_p.local_state->Cast(); - auto &gstate = data_p.global_state->Cast(); - - auto &relation = data.relation; - auto snapshot = data.snapshot; - auto &exhausted_scan = lstate.exhausted_scan; +PostgresHeapScanFunction::PostgresHeapFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output) { + auto &bind_data = data_p.bind_data->CastNoConst(); + auto &l_data = data_p.local_state->Cast(); - auto rel = relation.GetRelation(); - - TupleDesc tupleDesc = RelationGetDescr(rel); - auto scanDesc = lstate.scanDesc; + auto &relation = bind_data.relation; + auto &exhausted_scan = l_data.exhausted_scan; - auto slot = table_slot_create(rel, NULL); idx_t count = 0; for (; count < STANDARD_VECTOR_SIZE && !exhausted_scan; count++) { - auto has_tuple = rel->rd_tableam->scan_getnextslot(scanDesc, ForwardScanDirection, slot); + auto has_tuple = relation.GetNextTuple(l_data.thread_scan_desc, ForwardScanDirection, l_data.slot); if (!has_tuple) { exhausted_scan = true; break; } - // Received a tuple, insert it into the DataChunk - InsertTupleIntoChunk(output, tupleDesc, slot, count); + /* Received a tuple, insert it into the DataChunk */ + InsertTupleIntoChunk(output, relation.GetTupleDesc(), l_data.slot, count); } - ExecDropSingleTupleTableSlot(slot); output.SetCardinality(count); } - -PostgresReplacementScanData::PostgresReplacementScanData(QueryDesc *desc) : desc(desc) { -} -PostgresReplacementScanData::~PostgresReplacementScanData() { +void +PostgresHeapScanFunction::InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, + idx_t offset) { + for (int i = 0; i < tuple->natts; i++) { + auto &result = output.data[i]; + Datum value = slot_getattr(slot, i + 1, &slot->tts_isnull[i]); + // elog(WARNING, "%llu --- %d", std::this_thread::get_id(), value); + if (slot->tts_isnull[i]) { + auto &array_mask = duckdb::FlatVector::Validity(result); + array_mask.SetInvalid(offset); + } else { + ConvertPostgresToDuckValue(value, result, offset); + } + } } +// +// PostgresHeapReplacementScan +// + static RangeTblEntry * -FindMatchingRelation(List *tables, const duckdb::string &to_find) { +FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) { ListCell *lc; foreach (lc, tables) { RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); @@ -206,11 +192,16 @@ FindMatchingRelation(List *tables, const duckdb::string &to_find) { return nullptr; } - char *relName = RelationGetRelationName(rel); - auto table_name = std::string(relName); + char *rel_name = RelationGetRelationName(rel); + auto table_name = std::string(rel_name); if (duckdb::StringUtil::CIEquals(table_name, to_find)) { if (!rel->rd_amhandler) { - // This doesn't have an access method handler, we cant read from this + /* This doesn't have an access method handler, we cant read from this */ + RelationClose(rel); + return nullptr; + } + /* Allow only heap tables */ + if (GetTableAmRoutine(rel->rd_amhandler) != GetHeapamTableAmRoutine()) { RelationClose(rel); return nullptr; } @@ -224,16 +215,15 @@ FindMatchingRelation(List *tables, const duckdb::string &to_find) { } duckdb::unique_ptr -PostgresReplacementScan(duckdb::ClientContext &context, const duckdb::string &table_name, - duckdb::ReplacementScanData *data) { +PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string &table_name, + duckdb::ReplacementScanData *data) { - auto &scan_data = reinterpret_cast(*data); + auto &scan_data = reinterpret_cast(*data); - auto tables = scan_data.desc->plannedstmt->rtable; - auto table = FindMatchingRelation(tables, table_name); + /* Check name against query rtable list and verify that it is heap */ + auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name); if (!table) { - elog(WARNING, "Failed to find table %s in replacement scan lookup", table_name.c_str()); return nullptr; } @@ -250,7 +240,7 @@ PostgresReplacementScan(duckdb::ClientContext &context, const duckdb::string &ta duckdb::make_uniq( duckdb::Value::POINTER(duckdb::CastPointerToValue(scan_data.desc->estate->es_snapshot))))); - table_function->function = duckdb::make_uniq("quack_postgres_scan", std::move(children)); + table_function->function = duckdb::make_uniq("postgres_heap_scan", std::move(children)); return std::move(table_function); } diff --git a/src/quack_memory_allocator.cpp b/src/quack_memory_allocator.cpp new file mode 100644 index 0000000..9fde45b --- /dev/null +++ b/src/quack_memory_allocator.cpp @@ -0,0 +1,27 @@ +#include "duckdb/common/allocator.hpp" + +extern "C" { +#include "postgres.h" +} + +#include "quack/quack_memory_allocator.hpp" + +namespace quack { + +duckdb::data_ptr_t +QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size) { + return reinterpret_cast(palloc(size)); +} + +void +QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, duckdb::idx_t idx) { + return pfree(pointer); +} + +duckdb::data_ptr_t +QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, duckdb::idx_t old_size, + duckdb::idx_t size) { + return reinterpret_cast(repalloc(pointer, size)); +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_select.cpp b/src/quack_select.cpp index b6f0e36..53b2c4d 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -18,12 +18,14 @@ extern "C" { #include "quack/quack_heap_scan.hpp" #include "quack/quack_types.hpp" +#include "quack/quack_memory_allocator.hpp" namespace quack { static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; + //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); return duckdb::make_uniq(nullptr, &config); } @@ -33,19 +35,20 @@ extern "C" bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { auto db = quack::quack_open_database(); + /* Add heap tables */ db->instance->config.replacement_scans.emplace_back( - quack::PostgresReplacementScan, - duckdb::make_uniq_base(query_desc)); + quack::PostgresHeapReplacementScan, + duckdb::make_uniq_base(query_desc)); auto connection = duckdb::make_uniq(*db); // Add the postgres_scan inserted by the replacement scan auto &context = *connection->context; - quack::PostgresScanFunction scan_fun; - duckdb::CreateTableFunctionInfo scan_info_info(scan_fun); + quack::PostgresHeapScanFunction heap_scan_fun; + duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); auto &catalog = duckdb::Catalog::GetSystemCatalog(context); context.transaction.BeginTransaction(); - catalog.CreateTableFunction(context, &scan_info_info); + catalog.CreateTableFunction(context, &heap_scan_info); context.transaction.Commit(); idx_t column_count; @@ -101,7 +104,6 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co } } } - dest->rShutdown(dest); return true; } \ No newline at end of file diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 9d52115..d3f3bfe 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -146,7 +146,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { offset); break; default: - elog(ERROR, "Unsupported quack type: %hhu", result.GetType().id()); + elog(ERROR, "Unsupported quack type: %d", static_cast(result.GetType().id())); break; } }