Skip to content

Commit

Permalink
Quack node - WIP
Browse files Browse the repository at this point in the history
* Created quack node which will be used for duckdb exection. Reason for
  moving is to enable: EXPLAIN and CUSTOM RETURN TABLE FUNCTIONS to
  work.
  • Loading branch information
mkaruza committed May 3, 2024
1 parent e55721c commit 13e34b1
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 103 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SRCS = src/quack_detoast.cpp \
src/quack_heap_scan.cpp \
src/quack_heap_seq_scan.cpp \
src/quack_hooks.cpp \
src/quack_node.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack_memory_allocator.cpp \
Expand All @@ -32,8 +33,8 @@ ifeq ($(QUACK_BUILD), Debug)
QUACK_BUILD_CXX_FLAGS = -g -O0
QUACK_BUILD_DUCKDB = debug
else
QUACK_BUILD_CXX_FLAGS =
QUACK_BUILD_DUCKDB = release
QUACK_BUILD_CXX_FLAGS = -g -O0
QUACK_BUILD_DUCKDB = debug
endif

override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS}
Expand Down
5 changes: 3 additions & 2 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction {

struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData {
public:
PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) {
PostgresHeapReplacementScanData(Query *parse, const char *query) : m_parse(parse), m_query(query) {
}
~PostgresHeapReplacementScanData() override {};

public:
QueryDesc *desc;
Query *m_parse;
std::string m_query;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresHeapReplacementScan(duckdb::ClientContext &context,
Expand Down
2 changes: 2 additions & 0 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class PostgresHeapSeqScan {

public:
Relation GetRelation();
void CloseRelation();
TupleDesc GetTupleDesc();
bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo);
bool IsValid() const;
Expand All @@ -87,6 +88,7 @@ class PostgresHeapSeqScan {
Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo);

private:
RangeTblEntry * m_tableEntry = nullptr;
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
PostgresHeapSeqParallelScanState m_parallel_scan_state;
Expand Down
7 changes: 7 additions & 0 deletions include/quack/quack_node.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once

#include "postgres.h"
#include "nodes/extensible.h"

extern "C" Plan * quack_create_plan(Query *parse, const char *query);
extern "C" void quack_init_node(void);
1 change: 1 addition & 0 deletions include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern "C" {

namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo,
Expand Down
8 changes: 8 additions & 0 deletions quack--0.0.1.sql
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
LOAD 'quack';

CREATE OR REPLACE FUNCTION read_parquet(path text)
RETURNS SETOF record LANGUAGE 'plpgsql' AS
$func$
BEGIN
RETURN QUERY EXECUTE 'SELECT 1';
END;
$func$;
2 changes: 2 additions & 0 deletions src/quack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern "C" {
}

#include "quack/quack.h"
#include "quack/quack_node.hpp"

static void quack_init_guc(void);

Expand All @@ -17,6 +18,7 @@ void
_PG_init(void) {
quack_init_guc();
quack_init_hooks();
quack_init_node();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/quack_heap_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string
auto &scan_data = reinterpret_cast<PostgresHeapReplacementScanData &>(*data);

/* Check name against query rtable list and verify that it is heap table */
auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name);
auto table = FindMatchingHeapRelation(scan_data.m_parse->rtable, table_name);

if (!table) {
return nullptr;
}

// Create POINTER values from the 'table' and 'snapshot' variables
auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot);
auto children = CreateFunctionArguments(table, GetActiveSnapshot());
auto table_function = duckdb::make_uniq<duckdb::TableFunctionRef>();
table_function->function = duckdb::make_uniq<duckdb::FunctionExpression>("postgres_heap_scan", std::move(children));

Expand Down
28 changes: 20 additions & 8 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern "C" {
namespace quack {

PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table)
: m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) {
: m_tableEntry(table), m_rel(nullptr), m_snapshot(nullptr) {
}

PostgresHeapSeqScan::~PostgresHeapSeqScan() {
Expand All @@ -25,15 +25,28 @@ PostgresHeapSeqScan::~PostgresHeapSeqScan() {
}
}

PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) {
other.m_rel = nullptr;
PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other)
: m_tableEntry(other.m_tableEntry), m_rel(nullptr) {
other.CloseRelation();
other.m_tableEntry = nullptr;
}

Relation
PostgresHeapSeqScan::GetRelation() {
if (m_tableEntry && m_rel == nullptr) {
m_rel = RelationIdGetRelation(m_tableEntry->relid);
}
return m_rel;
}

void
PostgresHeapSeqScan::CloseRelation() {
if (IsValid()) {
RelationClose(m_rel);
}
m_rel = nullptr;
}

bool
PostgresHeapSeqScan::IsValid() const {
return RelationIsValid(m_rel);
Expand All @@ -56,7 +69,8 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn
}

void
PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) {
PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) {
(void) GetRelation();
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);

/* SELECT COUNT(*) FROM */
Expand All @@ -80,8 +94,7 @@ PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &inpu
}
}


//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
// m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_filters = input.filters.get();
}

Expand Down Expand Up @@ -110,7 +123,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
threadScanInfo.m_buffer =
ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, m_parallel_scan_state.m_strategy);
LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE);
//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
// m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_lock.unlock();
page = PreparePageRead(threadScanInfo);
threadScanInfo.m_read_next_page = false;
Expand Down Expand Up @@ -196,7 +209,6 @@ PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) {
(m_last_prefetch_block - m_last_assigned_block_number) > 8)
return;


for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) {
PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block);
m_last_prefetch_block++;
Expand Down
69 changes: 56 additions & 13 deletions src/quack_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ extern "C" {
#include "catalog/pg_namespace.h"
#include "commands/extension.h"
#include "utils/rel.h"
#include "optimizer/planner.h"
}

#include "quack/quack.h"
#include "quack/quack_select.h"
#include "quack/quack_node.hpp"

static ExecutorRun_hook_type PrevExecutorRunHook = NULL;
static planner_hook_type PrevPlannerHook = NULL;

static bool
is_quack_extension_registered() {
Expand All @@ -33,24 +35,65 @@ is_catalog_table(List *tables) {
return false;
}

static void
quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) {
if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) &&
queryDesc->operation == CMD_SELECT) {
if (quack_execute_select(queryDesc, direction, count)) {
return;
}
static PlannedStmt *
quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) {

/* We need to check can we DuckDB create plan */
Plan *quackPlan = (Plan *)castNode(CustomScan, quack_create_plan(parse, query_string));

if (!quackPlan) {
return nullptr;
}

elog(DEBUG3, "quack_executor_run: Failing back to PG execution");
/* build the PlannedStmt result */
PlannedStmt *result = makeNode(PlannedStmt);

result->commandType = parse->commandType;
result->queryId = parse->queryId;
result->hasReturning = (parse->returningList != NIL);
result->hasModifyingCTE = parse->hasModifyingCTE;
result->canSetTag = parse->canSetTag;
result->transientPlan = false;
result->dependsOnRole = false;
result->parallelModeNeeded = false;
result->planTree = quackPlan;
result->rtable = NULL;
result->permInfos = NULL;
result->resultRelations = NULL;
result->appendRelations = NULL;
result->subplans = NIL;
result->rewindPlanIDs = NULL;
result->rowMarks = NIL;
result->relationOids = NIL;
result->invalItems = NIL;
result->paramExecTypes = NIL;

/* utilityStmt should be null, but we might as well copy it */
result->utilityStmt = parse->utilityStmt;
result->stmt_location = parse->stmt_location;
result->stmt_len = parse->stmt_len;

return result;
}

static PlannedStmt *
quack_planner(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) {
if (is_quack_extension_registered() && !is_catalog_table(parse->rtable) && parse->commandType == CMD_SELECT) {
PlannedStmt * quackPlan = quack_plan_node(parse, query_string, cursorOptions, boundParams);
if (quackPlan) {
return quackPlan;
}
}

if (PrevExecutorRunHook) {
PrevExecutorRunHook(queryDesc, direction, count, execute_once);
if (PrevPlannerHook) {
return PrevPlannerHook(parse, query_string, cursorOptions, boundParams);
} else {
return standard_planner(parse, query_string, cursorOptions, boundParams);
}
}

void
quack_init_hooks(void) {
PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun;
ExecutorRun_hook = quack_executor_run;
PrevPlannerHook = planner_hook;
planner_hook = quack_planner;
}
Loading

0 comments on commit 13e34b1

Please sign in to comment.