Skip to content

Commit

Permalink
PostgreSQL memory allocator and muli thread scan
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed Apr 18, 2024
1 parent d2043a8 commit efcd279
Show file tree
Hide file tree
Showing 13 changed files with 559 additions and 206 deletions.
12 changes: 7 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_heap_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack.cpp
SRCS = src/quack_heap_seq_scan.cpp \
src/quack_heap_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack_memory_allocator.cpp \
src/quack.cpp

OBJS = $(subst .cpp,.o, $(SRCS))

Expand Down
1 change: 1 addition & 0 deletions include/quack/quack.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

// quack.c
extern int quack_max_threads_per_query;
extern "C" void _PG_init(void);

// quack_hooks.c
Expand Down
93 changes: 40 additions & 53 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,99 +9,86 @@ extern "C" {
#include "access/relscan.h"
}

// Postgres Relation

class PostgresRelation {
public:
PostgresRelation(RangeTblEntry *table);
~PostgresRelation();
PostgresRelation(const PostgresRelation &other) = delete;
PostgresRelation &operator=(const PostgresRelation &other) = delete;
PostgresRelation &operator=(PostgresRelation &&other) = delete;
PostgresRelation(PostgresRelation &&other);
#include "quack/quack.h"
#include "quack/quack_heap_seq_scan.hpp"

public:
Relation GetRelation();
bool IsValid() const;
// Postgres Relation

private:
Relation rel = nullptr;
};

namespace quack {

// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot);
~PostgresScanLocalState() override;
PostgresHeapScanLocalState(PostgresHeapSeqScan &relation);
~PostgresHeapScanLocalState() override;

public:
TableScanDesc scanDesc = nullptr;
const struct TableAmRoutine *tableam;
bool exhausted_scan = false;
PostgresHeapSeqScan & m_rel;
PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info;
bool m_exhausted_scan = false;
};

// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState();
std::mutex lock;
struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
return quack_max_threads_per_query;
}
};

// Bind Data

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
struct PostgresHeapScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot);
~PostgresScanFunctionData() override;
PostgresHeapScanFunctionData(PostgresHeapSeqScan &&relation, Snapshot Snapshot);
~PostgresHeapScanFunctionData() override;

public:
PostgresRelation relation;
Snapshot snapshot;
PostgresHeapSeqScan m_relation;
};

// ------- Table Function -------

struct PostgresScanFunction : public duckdb::TableFunction {
struct PostgresHeapScanFunction : public duckdb::TableFunction {
public:
PostgresScanFunction();
PostgresHeapScanFunction();

public:
static duckdb::unique_ptr<duckdb::FunctionData> PostgresBind(duckdb::ClientContext &context,
duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types,
duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::FunctionData> PostgresHeapBind(duckdb::ClientContext &context,
duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types,
duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
PostgresHeapInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
// static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p);
// static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
// PostgresSerialize(Serializer &serializer, const optional_ptr<FunctionData> bind_data, const TableFunction
// &function);
public:
static void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset);
};

struct PostgresReplacementScanData : public duckdb::ReplacementScanData {
struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData {
public:
PostgresReplacementScanData(QueryDesc *desc);
~PostgresReplacementScanData() override;
PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) {
}
~PostgresHeapReplacementScanData() override {};

public:
QueryDesc *desc;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresReplacementScan(duckdb::ClientContext &context,
const duckdb::string &table_name,
duckdb::ReplacementScanData *data);
duckdb::unique_ptr<duckdb::TableRef> PostgresHeapReplacementScan(duckdb::ClientContext &context,
const duckdb::string &table_name,
duckdb::ReplacementScanData *data);

} // namespace quack
76 changes: 76 additions & 0 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
}

#include <mutex>

namespace quack {

class PostgresHeapSeqScanThreadInfo {
public:
PostgresHeapSeqScanThreadInfo();
~PostgresHeapSeqScanThreadInfo();
void EndScan();

public:
TupleDesc m_tuple_desc;
bool m_inited;
bool m_read_next_page;
bool m_page_tuples_all_visible;
int m_output_vector_size;
BlockNumber m_block_number;
Buffer m_buffer;
OffsetNumber m_current_tuple_index;
int m_page_tuples_left;
HeapTupleData m_tuple;
};

class PostgresHeapSeqScan {
private:
class ParallelScanState {
public:
ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) {
}
BlockNumber AssignNextBlockNumber();
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
};

public:
PostgresHeapSeqScan(RangeTblEntry *table);
~PostgresHeapSeqScan();
PostgresHeapSeqScan(const PostgresHeapSeqScan &other) = delete;
PostgresHeapSeqScan &operator=(const PostgresHeapSeqScan &other) = delete;
PostgresHeapSeqScan &operator=(PostgresHeapSeqScan &&other) = delete;
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState();
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
}

public:
Relation GetRelation();
TupleDesc GetTupleDesc();
bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo);
bool IsValid() const;

private:
Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo);

private:
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
ParallelScanState m_parallel_scan_state;
};

} // namespace quack
17 changes: 17 additions & 0 deletions include/quack/quack_memory_allocator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include "duckdb/common/allocator.hpp"

namespace quack {

struct QuackAllocatorData : public duckdb::PrivateAllocatorData {
explicit QuackAllocatorData() {
}
};

duckdb::data_ptr_t QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size);
void QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t ptr, duckdb::idx_t idx);
duckdb::data_ptr_t QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer,
duckdb::idx_t old_size, duckdb::idx_t size);

} // namespace quack
1 change: 1 addition & 0 deletions include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset);
} // namespace quack
7 changes: 0 additions & 7 deletions src/CMakeLists.txt

This file was deleted.

15 changes: 14 additions & 1 deletion src/quack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ extern "C" {

static void quack_init_guc(void);

int quack_max_threads_per_query = 1;

extern "C" {
PG_MODULE_MAGIC;

Expand All @@ -21,5 +23,16 @@ _PG_init(void) {
/* clang-format off */
static void
quack_init_guc(void) {

DefineCustomIntVariable("quack.max_threads_per_query",
gettext_noop("DuckDB max no. threads per query."),
NULL,
&quack_max_threads_per_query,
1,
1,
64,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
}
Loading

0 comments on commit efcd279

Please sign in to comment.