Skip to content

Commit

Permalink
PostgreSQL memory allocator and muli thread scan
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed Apr 11, 2024
1 parent d9cacf0 commit d0fc731
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 196 deletions.
18 changes: 10 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_heap_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack.cpp
SRCS = src/quack_heap_relation.cpp \
src/quack_heap_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack_memory_allocator.cpp \
src/quack.cpp

OBJS = $(subst .cpp,.o, $(SRCS))

Expand All @@ -18,8 +20,8 @@ PG_CONFIG ?= pg_config

PGXS := $(shell $(PG_CONFIG) --pgxs)
PG_LIB := $(shell $(PG_CONFIG) --pkglibdir)
PG_CPPFLAGS := -Iinclude -Ithird_party/duckdb/src/include -std=c++17
SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -L$(PG_LIB) -lduckdb -Lthird_party/build/src -lc++
PG_CPPFLAGS := -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -O0 -g
SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -L$(PG_LIB) -lduckdb -Lthird_party/build/src #-lc++

# determine the name of the duckdb library that is built
UNAME_S := $(shell uname -s)
Expand All @@ -43,7 +45,7 @@ third_party/build/src/$(DUCKDB_LIB):
cd third_party && \
mkdir build && \
cd build && \
cmake ../duckdb && \
cmake -DENABLE_SANITIZER=FALSE -DENABLE_UBSAN=FALSE -DBUILD_UNITTESTS=ON -DCMAKE_BUILD_TYPE=Debug ../duckdb && \
make

install_duckdb:
Expand Down
1 change: 1 addition & 0 deletions include/quack/quack.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

// quack.c
extern int quack_max_threads_per_query;
extern "C" void _PG_init(void);

// quack_hooks.c
Expand Down
37 changes: 37 additions & 0 deletions include/quack/quack_heap_relation.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
}

#include <mutex>

namespace quack {

class PostgresHeapRelation {
public:
PostgresHeapRelation(RangeTblEntry *table);
~PostgresHeapRelation();
PostgresHeapRelation(const PostgresHeapRelation &other) = delete;
PostgresHeapRelation &operator=(const PostgresHeapRelation &other) = delete;
PostgresHeapRelation &operator=(PostgresHeapRelation &&other) = delete;
PostgresHeapRelation(PostgresHeapRelation &&other);

public:
Relation GetRelation();
TupleDesc GetTupleDesc();
bool GetNextTuple(TableScanDesc scan, ScanDirection direction, TupleTableSlot *slot);
bool IsValid() const;

private:
void GetPageTuple(HeapScanDesc scan, ScanDirection dir, int nkeys, ScanKey key);
BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir);

private:
Relation rel = nullptr;
std::mutex lock;
};

} // namespace quack
94 changes: 43 additions & 51 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,99 +9,91 @@ extern "C" {
#include "access/relscan.h"
}

// Postgres Relation

class PostgresRelation {
public:
PostgresRelation(RangeTblEntry *table);
~PostgresRelation();
PostgresRelation(const PostgresRelation &other) = delete;
PostgresRelation &operator=(const PostgresRelation &other) = delete;
PostgresRelation &operator=(PostgresRelation &&other) = delete;
PostgresRelation(PostgresRelation &&other);
#include "quack/quack.h"
#include "quack/quack_heap_relation.hpp"

public:
Relation GetRelation();
bool IsValid() const;
// Postgres Relation

private:
Relation rel = nullptr;
};

namespace quack {

// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot);
~PostgresScanLocalState() override;
PostgresHeapScanLocalState(ParallelBlockTableScanDesc scan, PostgresHeapRelation &relation, Snapshot snapshot);
~PostgresHeapScanLocalState() override;

public:
TableScanDesc scanDesc = nullptr;
const struct TableAmRoutine *tableam;
PostgresHeapRelation & rel;
TupleTableSlot *slot = nullptr;
TableScanDesc thread_scan_desc = nullptr;
bool exhausted_scan = false;
};

// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState();
std::mutex lock;
};
struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapRelation &relation, Snapshot snapshot);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
return quack_max_threads_per_query;
}

// Bind Data
public:
ParallelBlockTableScanDescData parallel_block_scan_desc;
};

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
struct PostgresHeapScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot);
~PostgresScanFunctionData() override;
PostgresHeapScanFunctionData(PostgresHeapRelation &&relation, Snapshot snapshot);
~PostgresHeapScanFunctionData() override;

public:
PostgresRelation relation;
PostgresHeapRelation relation;
Snapshot snapshot;
};

// ------- Table Function -------

struct PostgresScanFunction : public duckdb::TableFunction {
struct PostgresHeapScanFunction : public duckdb::TableFunction {
public:
PostgresScanFunction();
PostgresHeapScanFunction();

public:
static duckdb::unique_ptr<duckdb::FunctionData> PostgresBind(duckdb::ClientContext &context,
duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types,
duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::FunctionData> PostgresHeapBind(duckdb::ClientContext &context,
duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types,
duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
PostgresHeapInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
// static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p);
// static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
static void PostgresHeapFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
// PostgresSerialize(Serializer &serializer, const optional_ptr<FunctionData> bind_data, const TableFunction
// &function);
public:
static void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset);
};

struct PostgresReplacementScanData : public duckdb::ReplacementScanData {
struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData {
public:
PostgresReplacementScanData(QueryDesc *desc);
~PostgresReplacementScanData() override;
PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) {
}
~PostgresHeapReplacementScanData() override {};

public:
QueryDesc *desc;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresReplacementScan(duckdb::ClientContext &context,
const duckdb::string &table_name,
duckdb::ReplacementScanData *data);
duckdb::unique_ptr<duckdb::TableRef> PostgresHeapReplacementScan(duckdb::ClientContext &context,
const duckdb::string &table_name,
duckdb::ReplacementScanData *data);

} // namespace quack
17 changes: 17 additions & 0 deletions include/quack/quack_memory_allocator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include "duckdb/common/allocator.hpp"

namespace quack {

struct QuackAllocatorData : public duckdb::PrivateAllocatorData {
explicit QuackAllocatorData() {
}
};

duckdb::data_ptr_t QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size);
void QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t ptr, duckdb::idx_t idx);
duckdb::data_ptr_t QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer,
duckdb::idx_t old_size, duckdb::idx_t size);

} // namespace quack
7 changes: 0 additions & 7 deletions src/CMakeLists.txt

This file was deleted.

15 changes: 14 additions & 1 deletion src/quack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ extern "C" {

static void quack_init_guc(void);

int quack_max_threads_per_query = 1;

extern "C" {
PG_MODULE_MAGIC;

Expand All @@ -21,5 +23,16 @@ _PG_init(void) {
/* clang-format off */
static void
quack_init_guc(void) {

DefineCustomIntVariable("quack.max_threads_per_query",
gettext_noop("DuckDB max no. threads per query."),
NULL,
&quack_max_threads_per_query,
1,
1,
32,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
}
Loading

0 comments on commit d0fc731

Please sign in to comment.