From efcd2798a2fbcd8a3616ff35e29feb20ca08cabe Mon Sep 17 00:00:00 2001 From: mkaruza Date: Thu, 11 Apr 2024 13:02:48 +0200 Subject: [PATCH] PostgreSQL memory allocator and muli thread scan --- Makefile | 12 +- include/quack/quack.h | 1 + include/quack/quack_heap_scan.hpp | 93 +++++----- include/quack/quack_heap_seq_scan.hpp | 76 ++++++++ include/quack/quack_memory_allocator.hpp | 17 ++ include/quack/quack_types.hpp | 1 + src/CMakeLists.txt | 7 - src/quack.cpp | 15 +- src/quack_heap_scan.cpp | 217 +++++++++-------------- src/quack_heap_seq_scan.cpp | 181 +++++++++++++++++++ src/quack_memory_allocator.cpp | 27 +++ src/quack_select.cpp | 14 +- src/quack_types.cpp | 104 ++++++++++- 13 files changed, 559 insertions(+), 206 deletions(-) create mode 100644 include/quack/quack_heap_seq_scan.hpp create mode 100644 include/quack/quack_memory_allocator.hpp delete mode 100644 src/CMakeLists.txt create mode 100644 src/quack_heap_seq_scan.cpp create mode 100644 src/quack_memory_allocator.cpp diff --git a/Makefile b/Makefile index affc1380..e076247a 100644 --- a/Makefile +++ b/Makefile @@ -4,11 +4,13 @@ MODULE_big = quack EXTENSION = quack DATA = quack.control $(wildcard quack--*.sql) -SRCS = src/quack_heap_scan.cpp \ - src/quack_hooks.cpp \ - src/quack_select.cpp \ - src/quack_types.cpp \ - src/quack.cpp +SRCS = src/quack_heap_seq_scan.cpp \ + src/quack_heap_scan.cpp \ + src/quack_hooks.cpp \ + src/quack_select.cpp \ + src/quack_types.cpp \ + src/quack_memory_allocator.cpp \ + src/quack.cpp OBJS = $(subst .cpp,.o, $(SRCS)) diff --git a/include/quack/quack.h b/include/quack/quack.h index d98ff3e7..647c2496 100644 --- a/include/quack/quack.h +++ b/include/quack/quack.h @@ -1,6 +1,7 @@ #pragma once // quack.c +extern int quack_max_threads_per_query; extern "C" void _PG_init(void); // quack_hooks.c diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index f564b991..5aa4ff32 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -9,99 +9,86 @@ extern "C" { #include "access/relscan.h" } -// Postgres Relation - -class PostgresRelation { -public: - PostgresRelation(RangeTblEntry *table); - ~PostgresRelation(); - PostgresRelation(const PostgresRelation &other) = delete; - PostgresRelation &operator=(const PostgresRelation &other) = delete; - PostgresRelation &operator=(PostgresRelation &&other) = delete; - PostgresRelation(PostgresRelation &&other); +#include "quack/quack.h" +#include "quack/quack_heap_seq_scan.hpp" -public: - Relation GetRelation(); - bool IsValid() const; +// Postgres Relation -private: - Relation rel = nullptr; -}; namespace quack { -// Local State - -struct PostgresScanLocalState : public duckdb::LocalTableFunctionState { +struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { public: - PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot); - ~PostgresScanLocalState() override; + PostgresHeapScanLocalState(PostgresHeapSeqScan &relation); + ~PostgresHeapScanLocalState() override; public: - TableScanDesc scanDesc = nullptr; - const struct TableAmRoutine *tableam; - bool exhausted_scan = false; + PostgresHeapSeqScan & m_rel; + PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info; + bool m_exhausted_scan = false; }; // Global State -struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresScanGlobalState(); - std::mutex lock; +struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState { + explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation); + ~PostgresHeapScanGlobalState(); + idx_t + MaxThreads() const override { + return quack_max_threads_per_query; + } }; -// Bind Data - -struct PostgresScanFunctionData : public duckdb::TableFunctionData { +struct PostgresHeapScanFunctionData : public duckdb::TableFunctionData { public: - PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot); - ~PostgresScanFunctionData() override; + PostgresHeapScanFunctionData(PostgresHeapSeqScan &&relation, Snapshot Snapshot); + ~PostgresHeapScanFunctionData() override; public: - PostgresRelation relation; - Snapshot snapshot; + PostgresHeapSeqScan m_relation; }; -// ------- Table Function ------- - -struct PostgresScanFunction : public duckdb::TableFunction { +struct PostgresHeapScanFunction : public duckdb::TableFunction { public: - PostgresScanFunction(); + PostgresHeapScanFunction(); public: - static duckdb::unique_ptr PostgresBind(duckdb::ClientContext &context, - duckdb::TableFunctionBindInput &input, - duckdb::vector &return_types, - duckdb::vector &names); + static duckdb::unique_ptr PostgresHeapBind(duckdb::ClientContext &context, + duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names); static duckdb::unique_ptr - PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); + PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); static duckdb::unique_ptr - PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, - duckdb::GlobalTableFunctionState *gstate); + PostgresHeapInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate); // static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p); // static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); - static void PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output); + static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output); // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void // PostgresSerialize(Serializer &serializer, const optional_ptr bind_data, const TableFunction // &function); +public: + static void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset); }; -struct PostgresReplacementScanData : public duckdb::ReplacementScanData { +struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData { public: - PostgresReplacementScanData(QueryDesc *desc); - ~PostgresReplacementScanData() override; + PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) { + } + ~PostgresHeapReplacementScanData() override {}; public: QueryDesc *desc; }; -duckdb::unique_ptr PostgresReplacementScan(duckdb::ClientContext &context, - const duckdb::string &table_name, - duckdb::ReplacementScanData *data); +duckdb::unique_ptr PostgresHeapReplacementScan(duckdb::ClientContext &context, + const duckdb::string &table_name, + duckdb::ReplacementScanData *data); } // namespace quack diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp new file mode 100644 index 00000000..4a30a9a8 --- /dev/null +++ b/include/quack/quack_heap_seq_scan.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "access/tableam.h" +#include "access/heapam.h" +} + +#include + +namespace quack { + +class PostgresHeapSeqScanThreadInfo { +public: + PostgresHeapSeqScanThreadInfo(); + ~PostgresHeapSeqScanThreadInfo(); + void EndScan(); + +public: + TupleDesc m_tuple_desc; + bool m_inited; + bool m_read_next_page; + bool m_page_tuples_all_visible; + int m_output_vector_size; + BlockNumber m_block_number; + Buffer m_buffer; + OffsetNumber m_current_tuple_index; + int m_page_tuples_left; + HeapTupleData m_tuple; +}; + +class PostgresHeapSeqScan { +private: + class ParallelScanState { + public: + ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) { + } + BlockNumber AssignNextBlockNumber(); + std::mutex m_lock; + BlockNumber m_nblocks; + BlockNumber m_last_assigned_block_number; + }; + +public: + PostgresHeapSeqScan(RangeTblEntry *table); + ~PostgresHeapSeqScan(); + PostgresHeapSeqScan(const PostgresHeapSeqScan &other) = delete; + PostgresHeapSeqScan &operator=(const PostgresHeapSeqScan &other) = delete; + PostgresHeapSeqScan &operator=(PostgresHeapSeqScan &&other) = delete; + PostgresHeapSeqScan(PostgresHeapSeqScan &&other); + +public: + void InitParallelScanState(); + void + SetSnapshot(Snapshot snapshot) { + m_snapshot = snapshot; + } + +public: + Relation GetRelation(); + TupleDesc GetTupleDesc(); + bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo); + bool IsValid() const; + +private: + Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo); + +private: + Relation m_rel = nullptr; + Snapshot m_snapshot = nullptr; + ParallelScanState m_parallel_scan_state; +}; + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_memory_allocator.hpp b/include/quack/quack_memory_allocator.hpp new file mode 100644 index 00000000..66a0a8c4 --- /dev/null +++ b/include/quack/quack_memory_allocator.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "duckdb/common/allocator.hpp" + +namespace quack { + +struct QuackAllocatorData : public duckdb::PrivateAllocatorData { + explicit QuackAllocatorData() { + } +}; + +duckdb::data_ptr_t QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size); +void QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t ptr, duckdb::idx_t idx); +duckdb::data_ptr_t QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, + duckdb::idx_t old_size, duckdb::idx_t size); + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index e32c415b..05f38603 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -11,4 +11,5 @@ namespace quack { duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); +void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset); } // namespace quack \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt deleted file mode 100644 index 03bd9f19..00000000 --- a/src/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -ADD_LIBRARY(extension OBJECT quack.c - quack_hooks.c - quack_select.cpp - quack_heap_scan.cpp - quack_types.cpp) - -set(PROJECT_OBJECTS ${PROJECT_OBJECTS} "$" PARENT_SCOPE) diff --git a/src/quack.cpp b/src/quack.cpp index fed1f38d..dd507911 100644 --- a/src/quack.cpp +++ b/src/quack.cpp @@ -8,6 +8,8 @@ extern "C" { static void quack_init_guc(void); +int quack_max_threads_per_query = 1; + extern "C" { PG_MODULE_MAGIC; @@ -21,5 +23,16 @@ _PG_init(void) { /* clang-format off */ static void quack_init_guc(void) { - + DefineCustomIntVariable("quack.max_threads_per_query", + gettext_noop("DuckDB max no. threads per query."), + NULL, + &quack_max_threads_per_query, + 1, + 1, + 64, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); } \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 6b6950cd..d9bffb4b 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -7,86 +7,76 @@ #include "duckdb/parser/expression/columnref_expression.hpp" #include "duckdb/common/enums/expression_type.hpp" -extern "C" { -#include "postgres.h" - -#include "miscadmin.h" - -#include "access/tableam.h" -#include "executor/executor.h" -#include "parser/parse_type.h" -#include "tcop/utility.h" -#include "catalog/pg_type.h" -#include "utils/syscache.h" -#include "utils/builtins.h" -} - #include "quack/quack_heap_scan.hpp" #include "quack/quack_types.hpp" -// Postgres Relation +namespace quack { -PostgresRelation::PostgresRelation(RangeTblEntry *table) : rel(RelationIdGetRelation(table->relid)) { -} +// +// PostgresHeapScanFunctionData +// -PostgresRelation::~PostgresRelation() { - if (IsValid()) { - RelationClose(rel); - } +PostgresHeapScanFunctionData::PostgresHeapScanFunctionData(PostgresHeapSeqScan &&relation, Snapshot snapshot) + : m_relation(std::move(relation)) { + m_relation.SetSnapshot(snapshot); } -Relation -PostgresRelation::GetRelation() { - return rel; +PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { } -bool -PostgresRelation::IsValid() const { - return RelationIsValid(rel); -} +// +// PostgresHeapScanGlobalState +// -PostgresRelation::PostgresRelation(PostgresRelation &&other) : rel(other.rel) { - other.rel = nullptr; +PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) { + relation.InitParallelScanState(); } -namespace quack { - -// ------- Table Function ------- +PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { -PostgresScanFunction::PostgresScanFunction() - : TableFunction("quack_postgres_scan", {}, PostgresFunc, PostgresBind, PostgresInitGlobal, PostgresInitLocal) { - named_parameters["table"] = duckdb::LogicalType::POINTER; - named_parameters["snapshot"] = duckdb::LogicalType::POINTER; } -// Bind Data +// +// PostgresHeapScanLocalState +// -PostgresScanFunctionData::PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot) - : relation(std::move(relation)), snapshot(snapshot) { +PostgresHeapScanLocalState::PostgresHeapScanLocalState(PostgresHeapSeqScan &relation) : m_rel(relation) { + m_thread_seq_scan_info.m_tuple.t_tableOid = RelationGetRelid(relation.GetRelation()); + m_thread_seq_scan_info.m_tuple_desc = RelationGetDescr(relation.GetRelation()); } -PostgresScanFunctionData::~PostgresScanFunctionData() { +PostgresHeapScanLocalState::~PostgresHeapScanLocalState() { + m_thread_seq_scan_info.EndScan(); +} + +// +// PostgresHeapScanFunction +// + +PostgresHeapScanFunction::PostgresHeapScanFunction() + : TableFunction("postgres_heap_scan", {}, PostgresHeapScanFunc, PostgresHeapBind, PostgresHeapInitGlobal, + PostgresHeapInitLocal) { + named_parameters["table"] = duckdb::LogicalType::POINTER; + named_parameters["snapshot"] = duckdb::LogicalType::POINTER; + //projection_pushdown = true; } duckdb::unique_ptr -PostgresScanFunction::PostgresBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, - duckdb::vector &return_types, - duckdb::vector &names) { +PostgresHeapScanFunction::PostgresHeapBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names) { auto table = (reinterpret_cast(input.named_parameters["table"].GetPointer())); auto snapshot = (reinterpret_cast(input.named_parameters["snapshot"].GetPointer())); - D_ASSERT(table->relid); - auto rel = PostgresRelation(table); - + auto rel = PostgresHeapSeqScan(table); auto tupleDesc = RelationGetDescr(rel.GetRelation()); + if (!tupleDesc) { elog(ERROR, "Failed to get tuple descriptor for relation with OID %u", table->relid); return nullptr; } - int column_count = tupleDesc->natts; - - for (idx_t i = 0; i < column_count; i++) { + for (int i = 0; i < tupleDesc->natts; i++) { Form_pg_attribute attr = &tupleDesc->attrs[i]; Oid type_oid = attr->atttypid; auto col_name = duckdb::string(NameStr(attr->attname)); @@ -94,95 +84,56 @@ PostgresScanFunction::PostgresBind(duckdb::ClientContext &context, duckdb::Table return_types.push_back(duck_type); names.push_back(col_name); /* Log column name and type */ - elog(INFO, "Column name: %s, Type: %s", col_name.c_str(), duck_type.ToString().c_str()); + elog(INFO, "-- Column name: %s, Type: %s -- ", col_name.c_str(), duck_type.ToString().c_str()); } - return duckdb::make_uniq(std::move(rel), snapshot); + return duckdb::make_uniq(std::move(rel), snapshot); } -// Global State - -PostgresScanGlobalState::PostgresScanGlobalState() { +duckdb::unique_ptr +PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context, + duckdb::TableFunctionInitInput &input) { + auto &bind_data = input.bind_data->CastNoConst(); + return duckdb::make_uniq(bind_data.m_relation); } -duckdb::unique_ptr -PostgresScanFunction::PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { - return duckdb::make_uniq(); +duckdb::unique_ptr +PostgresHeapScanFunction::PostgresHeapInitLocal(duckdb::ExecutionContext &context, + duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate) { + auto &bind_data = input.bind_data->CastNoConst(); + return duckdb::make_uniq(bind_data.m_relation); } -// Local State +void +PostgresHeapScanFunction::PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output) { + auto &bind_data = data_p.bind_data->CastNoConst(); + auto &l_data = data_p.local_state->Cast(); -PostgresScanLocalState::PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot) { - auto rel = relation.GetRelation(); - tableam = rel->rd_tableam; + auto &relation = bind_data.m_relation; + auto &exhausted_scan = l_data.m_exhausted_scan; - // Initialize the scan state - uint32 flags = SO_TYPE_SEQSCAN | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE; - scanDesc = rel->rd_tableam->scan_begin(rel, snapshot, 0, NULL, NULL, flags); -} -PostgresScanLocalState::~PostgresScanLocalState() { - // Close the scan state - tableam->scan_end(scanDesc); -} + l_data.m_thread_seq_scan_info.m_output_vector_size = 0; -duckdb::unique_ptr -PostgresScanFunction::PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, - duckdb::GlobalTableFunctionState *gstate) { - auto &bind_data = input.bind_data->CastNoConst(); - auto &relation = bind_data.relation; - auto snapshot = bind_data.snapshot; - return duckdb::make_uniq(relation, snapshot); -} - -static void -InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset) { - for (int i = 0; i < tuple->natts; i++) { - auto &result = output.data[i]; - Datum value = slot_getattr(slot, i + 1, &slot->tts_isnull[i]); - if (slot->tts_isnull[i]) { - auto &array_mask = duckdb::FlatVector::Validity(result); - array_mask.SetInvalid(offset); - } else { - ConvertPostgresToDuckValue(value, result, offset); - } + /* We have exhausted seq scan of heap table so we can return */ + if (exhausted_scan) { + output.SetCardinality(0); + return; } -} -void -PostgresScanFunction::PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output) { - auto &data = data_p.bind_data->CastNoConst(); - auto &lstate = data_p.local_state->Cast(); - - auto &relation = data.relation; - auto &exhausted_scan = lstate.exhausted_scan; - - auto rel = relation.GetRelation(); - - TupleDesc tupleDesc = RelationGetDescr(rel); - auto scanDesc = lstate.scanDesc; - - auto slot = table_slot_create(rel, NULL); - idx_t count = 0; - for (; count < STANDARD_VECTOR_SIZE && !exhausted_scan; count++) { - auto has_tuple = rel->rd_tableam->scan_getnextslot(scanDesc, ForwardScanDirection, slot); - if (!has_tuple) { - exhausted_scan = true; - break; - } - // Received a tuple, insert it into the DataChunk - InsertTupleIntoChunk(output, tupleDesc, slot, count); + auto has_tuple = relation.ReadPageTuples(output, l_data.m_thread_seq_scan_info); + + if (!has_tuple || l_data.m_thread_seq_scan_info.m_block_number == InvalidBlockNumber) { + exhausted_scan = true; } - ExecDropSingleTupleTableSlot(slot); - output.SetCardinality(count); } -PostgresReplacementScanData::PostgresReplacementScanData(QueryDesc *desc) : desc(desc) { -} -PostgresReplacementScanData::~PostgresReplacementScanData() { -} +// +// PostgresHeapReplacementScan +// static RangeTblEntry * -FindMatchingRelation(List *tables, const duckdb::string &to_find) { +FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) { ListCell *lc; foreach (lc, tables) { RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); @@ -194,11 +145,12 @@ FindMatchingRelation(List *tables, const duckdb::string &to_find) { return nullptr; } - char *relName = RelationGetRelationName(rel); - auto table_name = std::string(relName); + char *rel_name = RelationGetRelationName(rel); + auto table_name = std::string(rel_name); if (duckdb::StringUtil::CIEquals(table_name, to_find)) { - if (!rel->rd_amhandler) { - // This doesn't have an access method handler, we cant read from this + /* Allow only heap tables */ + if (!rel->rd_amhandler || (GetTableAmRoutine(rel->rd_amhandler) != GetHeapamTableAmRoutine())) { + /* This doesn't have an access method handler, we cant read from this */ RelationClose(rel); return nullptr; } @@ -225,23 +177,22 @@ static duckdb::vector> CreateFuncti } duckdb::unique_ptr -PostgresReplacementScan(duckdb::ClientContext &context, const duckdb::string &table_name, - duckdb::ReplacementScanData *data) { +PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string &table_name, + duckdb::ReplacementScanData *data) { - auto &scan_data = reinterpret_cast(*data); + auto &scan_data = reinterpret_cast(*data); - auto tables = scan_data.desc->plannedstmt->rtable; - auto table = FindMatchingRelation(tables, table_name); + /* Check name against query rtable list and verify that it is heap table */ + auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name); if (!table) { - elog(WARNING, "Failed to find table %s in replacement scan lookup", table_name.c_str()); return nullptr; } // Create POINTER values from the 'table' and 'snapshot' variables auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot); auto table_function = duckdb::make_uniq(); - table_function->function = duckdb::make_uniq("quack_postgres_scan", std::move(children)); + table_function->function = duckdb::make_uniq("postgres_heap_scan", std::move(children)); return std::move(table_function); } diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp new file mode 100644 index 00000000..fc253912 --- /dev/null +++ b/src/quack_heap_seq_scan.cpp @@ -0,0 +1,181 @@ +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "pgstat.h" +#include "access/valid.h" +#include "access/heapam.h" +#include "storage/bufmgr.h" +} + +#include "quack/quack_heap_seq_scan.hpp" +#include "quack/quack_types.hpp" + +#include + +namespace quack { + +PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table) + : m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) { +} + +PostgresHeapSeqScan::~PostgresHeapSeqScan() { + if (IsValid()) { + RelationClose(m_rel); + } +} + +PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) { + other.m_rel = nullptr; +} + +Relation +PostgresHeapSeqScan::GetRelation() { + return m_rel; +} + +bool +PostgresHeapSeqScan::IsValid() const { + return RelationIsValid(m_rel); +} + +TupleDesc +PostgresHeapSeqScan::GetTupleDesc() { + return RelationGetDescr(m_rel); +} + +Page +PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo) { + Page page = BufferGetPage(threadScanInfo.m_buffer); + TestForOldSnapshot(m_snapshot, m_rel, page); + threadScanInfo.m_page_tuples_all_visible = PageIsAllVisible(page) && !m_snapshot->takenDuringRecovery; + threadScanInfo.m_page_tuples_left = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1; + threadScanInfo.m_current_tuple_index = FirstOffsetNumber; + return page; +} + +void +PostgresHeapSeqScan::InitParallelScanState() { + m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); +} + +bool +PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo) { + BlockNumber block = InvalidBlockNumber; + Page page = nullptr; + + if (!threadScanInfo.m_inited) { + block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber(); + if (threadScanInfo.m_block_number == InvalidBlockNumber) { + return false; + } + threadScanInfo.m_inited = true; + threadScanInfo.m_read_next_page = true; + } + + if (!threadScanInfo.m_read_next_page) { + block = threadScanInfo.m_block_number; + page = BufferGetPage(threadScanInfo.m_buffer); + } + + while (block != InvalidBlockNumber) { + if (threadScanInfo.m_read_next_page) { + CHECK_FOR_INTERRUPTS(); + m_parallel_scan_state.m_lock.lock(); + block = threadScanInfo.m_block_number; + threadScanInfo.m_buffer = + ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, GetAccessStrategy(BAS_BULKREAD)); + LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE); + m_parallel_scan_state.m_lock.unlock(); + + page = PreparePageRead(threadScanInfo); + threadScanInfo.m_read_next_page = false; + } + + for (; threadScanInfo.m_page_tuples_left > 0 && threadScanInfo.m_output_vector_size < STANDARD_VECTOR_SIZE; + threadScanInfo.m_page_tuples_left--, threadScanInfo.m_current_tuple_index++, + threadScanInfo.m_output_vector_size++) { + bool visible = true; + ItemId lpp = PageGetItemId(page, threadScanInfo.m_current_tuple_index); + + if (!ItemIdIsNormal(lpp)) + continue; + + threadScanInfo.m_tuple.t_data = (HeapTupleHeader)PageGetItem(page, lpp); + threadScanInfo.m_tuple.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(threadScanInfo.m_tuple.t_self), block, threadScanInfo.m_current_tuple_index); + + if (!threadScanInfo.m_page_tuples_all_visible) { + visible = HeapTupleSatisfiesVisibility(&threadScanInfo.m_tuple, m_snapshot, threadScanInfo.m_buffer); + HeapCheckForSerializableConflictOut(visible, m_rel, &threadScanInfo.m_tuple, threadScanInfo.m_buffer, + m_snapshot); + /* skip tuples not visible to this snapshot */ + if (!visible) + continue; + } + + pgstat_count_heap_getnext(m_rel); + + InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, + threadScanInfo.m_output_vector_size); + } + + /* No more items on current page */ + if (!threadScanInfo.m_page_tuples_left) { + m_parallel_scan_state.m_lock.lock(); + UnlockReleaseBuffer(threadScanInfo.m_buffer); + m_parallel_scan_state.m_lock.unlock(); + threadScanInfo.m_read_next_page = true; + block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber(); + } + + /* We have collected STANDARD_VECTOR_SIZE */ + if (threadScanInfo.m_output_vector_size == STANDARD_VECTOR_SIZE) { + output.SetCardinality(threadScanInfo.m_output_vector_size); + threadScanInfo.m_output_vector_size = 0; + return true; + } + } + + /* Next assigned block number is InvalidBlockNumber so we check did we write any tuples in output vector */ + if (threadScanInfo.m_output_vector_size) { + output.SetCardinality(threadScanInfo.m_output_vector_size); + threadScanInfo.m_output_vector_size = 0; + } + + threadScanInfo.m_buffer = InvalidBuffer; + threadScanInfo.m_block_number = InvalidBlockNumber; + threadScanInfo.m_tuple.t_data = NULL; + threadScanInfo.m_read_next_page = false; + + return false; +} + +BlockNumber +PostgresHeapSeqScan::ParallelScanState::AssignNextBlockNumber() { + m_lock.lock(); + BlockNumber block_number = InvalidBlockNumber; + if (m_last_assigned_block_number == InvalidBlockNumber) { + block_number = m_last_assigned_block_number = 0; + } else if (m_last_assigned_block_number < m_nblocks - 1) { + block_number = ++m_last_assigned_block_number; + } + m_lock.unlock(); + return block_number; +} + +PostgresHeapSeqScanThreadInfo::PostgresHeapSeqScanThreadInfo() + : m_tuple_desc(NULL), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), + m_buffer(InvalidBuffer), m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) { + m_tuple.t_data = NULL; + ItemPointerSetInvalid(&m_tuple.t_self); +} + +PostgresHeapSeqScanThreadInfo::~PostgresHeapSeqScanThreadInfo() { +} + +void +PostgresHeapSeqScanThreadInfo::EndScan() { +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_memory_allocator.cpp b/src/quack_memory_allocator.cpp new file mode 100644 index 00000000..9fde45b3 --- /dev/null +++ b/src/quack_memory_allocator.cpp @@ -0,0 +1,27 @@ +#include "duckdb/common/allocator.hpp" + +extern "C" { +#include "postgres.h" +} + +#include "quack/quack_memory_allocator.hpp" + +namespace quack { + +duckdb::data_ptr_t +QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size) { + return reinterpret_cast(palloc(size)); +} + +void +QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, duckdb::idx_t idx) { + return pfree(pointer); +} + +duckdb::data_ptr_t +QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, duckdb::idx_t old_size, + duckdb::idx_t size) { + return reinterpret_cast(repalloc(pointer, size)); +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_select.cpp b/src/quack_select.cpp index b6f0e365..53b2c4d8 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -18,12 +18,14 @@ extern "C" { #include "quack/quack_heap_scan.hpp" #include "quack/quack_types.hpp" +#include "quack/quack_memory_allocator.hpp" namespace quack { static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; + //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); return duckdb::make_uniq(nullptr, &config); } @@ -33,19 +35,20 @@ extern "C" bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { auto db = quack::quack_open_database(); + /* Add heap tables */ db->instance->config.replacement_scans.emplace_back( - quack::PostgresReplacementScan, - duckdb::make_uniq_base(query_desc)); + quack::PostgresHeapReplacementScan, + duckdb::make_uniq_base(query_desc)); auto connection = duckdb::make_uniq(*db); // Add the postgres_scan inserted by the replacement scan auto &context = *connection->context; - quack::PostgresScanFunction scan_fun; - duckdb::CreateTableFunctionInfo scan_info_info(scan_fun); + quack::PostgresHeapScanFunction heap_scan_fun; + duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); auto &catalog = duckdb::Catalog::GetSystemCatalog(context); context.transaction.BeginTransaction(); - catalog.CreateTableFunction(context, &scan_info_info); + catalog.CreateTableFunction(context, &heap_scan_info); context.transaction.Commit(); idx_t column_count; @@ -101,7 +104,6 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co } } } - dest->rShutdown(dest); return true; } \ No newline at end of file diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 4a23d8f9..11c9ac34 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -144,9 +144,111 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { offset); break; default: - elog(ERROR, "Unsupported quack type: %hhu", result.GetType().id()); + elog(ERROR, "Unsupported quack type: %d", static_cast(result.GetType().id())); break; } } +typedef struct HeapTuplePageReadState { + bool m_slow = 0; + int m_nvalid = 0; + uint32 m_offset = 0; +} HeapTuplePageReadState; + +static Datum +HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePageReadState &heapTupleReadState, + int natts, bool *isNull) { + + HeapTupleHeader tup = tuple->t_data; + bool hasnulls = HeapTupleHasNulls(tuple); + int attnum; + char *tp; + uint32 off; + bits8 *bp = tup->t_bits; + bool slow = false; + Datum value = (Datum)0; + + /* We can only fetch as many attributes as the tuple has. */ + natts = Min(HeapTupleHeaderGetNatts(tuple->t_data), natts); + + attnum = heapTupleReadState.m_nvalid; + if (attnum == 0) { + /* Start from the first attribute */ + off = 0; + heapTupleReadState.m_slow = false; + } else { + /* Restore state from previous execution */ + off = heapTupleReadState.m_offset; + slow = heapTupleReadState.m_slow; + } + + tp = (char *)tup + tup->t_hoff; + + for (; attnum < natts; attnum++) { + Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum); + + if (hasnulls && att_isnull(attnum, bp)) { + value = (Datum)0; + *isNull = true; + slow = true; /* can't use attcacheoff anymore */ + continue; + } + + *isNull = false; + + if (!slow && thisatt->attcacheoff >= 0) { + off = thisatt->attcacheoff; + } else if (thisatt->attlen == -1) { + + if (!slow && off == att_align_nominal(off, thisatt->attalign)) { + thisatt->attcacheoff = off; + } else { + off = att_align_pointer(off, thisatt->attalign, -1, tp + off); + slow = true; + } + } else { + off = att_align_nominal(off, thisatt->attalign); + + if (!slow) { + thisatt->attcacheoff = off; + } + } + + value = fetchatt(thisatt, tp + off); + + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + if (thisatt->attlen <= 0) { + slow = true; + } + } + + heapTupleReadState.m_nvalid = attnum; + heapTupleReadState.m_offset = off; + + if (slow) { + heapTupleReadState.m_slow = true; + } else { + heapTupleReadState.m_slow = false; + } + + return value; +} + +void +InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset) { + HeapTuplePageReadState heapTupleReadState = {}; + for (int i = 0; i < tupleDesc->natts; i++) { + auto &result = output.data[i]; + bool isNull = false; + Datum value = HeapTupleFetchNextDatumValue(tupleDesc, slot, heapTupleReadState, i + 1, &isNull); + if (isNull) { + auto &array_mask = duckdb::FlatVector::Validity(result); + array_mask.SetInvalid(offset); + } else { + ConvertPostgresToDuckValue(value, result, offset); + } + } +} + } // namespace quack