Skip to content

Commit

Permalink
Merge pull request #9 from hydradatabase/memory_and_parallel_scan
Browse files Browse the repository at this point in the history
PostgreSQL memory allocator and multi thread scan
  • Loading branch information
mkaruza authored Apr 18, 2024
2 parents d2043a8 + 7e3a985 commit 6905d8c
Show file tree
Hide file tree
Showing 22 changed files with 646 additions and 254 deletions.
39 changes: 26 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_heap_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack.cpp
SRCS = src/quack_heap_seq_scan.cpp \
src/quack_heap_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack_memory_allocator.cpp \
src/quack.cpp

OBJS = $(subst .cpp,.o, $(SRCS))

REGRESS = create_extension
REGRESS = basic

PG_CONFIG ?= pg_config

Expand All @@ -21,10 +23,20 @@ PG_LIB := $(shell $(PG_CONFIG) --pkglibdir)
INCLUDEDIR := ${shell $(PG_CONFIG) --includedir}
INCLUDEDIR_SERVER := ${shell $(PG_CONFIG) --includedir-server}

DEBUG_FLAGS = -g -O0 -fsanitize=address
override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17
QUACK_BUILD_CXX_FLAGS=
QUACK_BUILD_DUCKDB=

SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/debug/src -lstdc++
ifeq ($(QUACK_BUILD), Debug)
QUACK_BUILD_CXX_FLAGS = -g -O0
QUACK_BUILD_DUCKDB = debug
else
QUACK_BUILD_CXX_FLAGS =
QUACK_BUILD_DUCKDB = release
endif

override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 ${QUACK_BUILD_CXX_FLAGS}

SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src -lstdc++

COMPILE.cc.bc = $(CXX) -Wno-ignored-attributes -Wno-register $(BITCODE_CXXFLAGS) $(CXXFLAGS) $(PG_CPPFLAGS) -I$(INCLUDEDIR_SERVER) -emit-llvm -c

Expand All @@ -44,21 +56,22 @@ all: duckdb $(OBJS)

include $(PGXS)

duckdb: third_party/duckdb third_party/duckdb/build/debug/src/$(DUCKDB_LIB)
duckdb: third_party/duckdb third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB)

third_party/duckdb:
git submodule update --init --recursive

third_party/duckdb/build/debug/src/$(DUCKDB_LIB):
$(MAKE) -C third_party/duckdb debug DISABLE_SANITIZER=1
third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB):
$(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF CMAKE_EXPORT_COMPILE_COMMANDS=1

install_duckdb:
$(install_bin) -m 755 third_party/duckdb/build/debug/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB)
$(install_bin) -m 755 third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB)

clean_duckdb:
rm -rf third_party/duckdb/build

install: install_duckdb

clean: clean_duckdb

lintcheck:
Expand Down
46 changes: 46 additions & 0 deletions expected/basic.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
CREATE EXTENSION quack;
SET client_min_messages to 'DEBUG3';
CREATE TABLE t(a INT);
INSERT INTO t SELECT g % 10 from generate_series(1,1000000) g;
SELECT COUNT(*) FROM t;
DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER --
DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads --
count
---------
1000000
(1 row)

SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a;
DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER --
DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads --
a | count
---+--------
6 | 100000
7 | 100000
8 | 100000
9 | 100000
(4 rows)

SET quack.max_threads_per_query to 4;
SELECT COUNT(*) FROM t;
DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER --
DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads --
count
---------
1000000
(1 row)

SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a;
DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER --
DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads --
a | count
---+--------
6 | 100000
7 | 100000
8 | 100000
9 | 100000
(4 rows)

SET quack.max_threadS_per_query TO default;
SET client_min_messages TO default;
DROP TABLE t;
10 changes: 0 additions & 10 deletions expected/create_extension.out

This file was deleted.

1 change: 1 addition & 0 deletions include/quack/quack.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

// quack.c
extern int quack_max_threads_per_query;
extern "C" void _PG_init(void);

// quack_hooks.c
Expand Down
93 changes: 40 additions & 53 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,99 +9,86 @@ extern "C" {
#include "access/relscan.h"
}

// Postgres Relation

class PostgresRelation {
public:
PostgresRelation(RangeTblEntry *table);
~PostgresRelation();
PostgresRelation(const PostgresRelation &other) = delete;
PostgresRelation &operator=(const PostgresRelation &other) = delete;
PostgresRelation &operator=(PostgresRelation &&other) = delete;
PostgresRelation(PostgresRelation &&other);
#include "quack/quack.h"
#include "quack/quack_heap_seq_scan.hpp"

public:
Relation GetRelation();
bool IsValid() const;
// Postgres Relation

private:
Relation rel = nullptr;
};

namespace quack {

// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresScanLocalState(PostgresRelation &relation, Snapshot snapshot);
~PostgresScanLocalState() override;
PostgresHeapScanLocalState(PostgresHeapSeqScan &relation);
~PostgresHeapScanLocalState() override;

public:
TableScanDesc scanDesc = nullptr;
const struct TableAmRoutine *tableam;
bool exhausted_scan = false;
PostgresHeapSeqScan & m_rel;
PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info;
bool m_exhausted_scan = false;
};

// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState();
std::mutex lock;
struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
return quack_max_threads_per_query;
}
};

// Bind Data

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
struct PostgresHeapScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresScanFunctionData(PostgresRelation &&relation, Snapshot snapshot);
~PostgresScanFunctionData() override;
PostgresHeapScanFunctionData(PostgresHeapSeqScan &&relation, Snapshot Snapshot);
~PostgresHeapScanFunctionData() override;

public:
PostgresRelation relation;
Snapshot snapshot;
PostgresHeapSeqScan m_relation;
};

// ------- Table Function -------

struct PostgresScanFunction : public duckdb::TableFunction {
struct PostgresHeapScanFunction : public duckdb::TableFunction {
public:
PostgresScanFunction();
PostgresHeapScanFunction();

public:
static duckdb::unique_ptr<duckdb::FunctionData> PostgresBind(duckdb::ClientContext &context,
duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types,
duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::FunctionData> PostgresHeapBind(duckdb::ClientContext &context,
duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types,
duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
PostgresHeapInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
// static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p);
// static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
// PostgresSerialize(Serializer &serializer, const optional_ptr<FunctionData> bind_data, const TableFunction
// &function);
public:
static void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset);
};

struct PostgresReplacementScanData : public duckdb::ReplacementScanData {
struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData {
public:
PostgresReplacementScanData(QueryDesc *desc);
~PostgresReplacementScanData() override;
PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) {
}
~PostgresHeapReplacementScanData() override {};

public:
QueryDesc *desc;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresReplacementScan(duckdb::ClientContext &context,
const duckdb::string &table_name,
duckdb::ReplacementScanData *data);
duckdb::unique_ptr<duckdb::TableRef> PostgresHeapReplacementScan(duckdb::ClientContext &context,
const duckdb::string &table_name,
duckdb::ReplacementScanData *data);

} // namespace quack
76 changes: 76 additions & 0 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
}

#include <mutex>

namespace quack {

class PostgresHeapSeqScanThreadInfo {
public:
PostgresHeapSeqScanThreadInfo();
~PostgresHeapSeqScanThreadInfo();
void EndScan();

public:
TupleDesc m_tuple_desc;
bool m_inited;
bool m_read_next_page;
bool m_page_tuples_all_visible;
int m_output_vector_size;
BlockNumber m_block_number;
Buffer m_buffer;
OffsetNumber m_current_tuple_index;
int m_page_tuples_left;
HeapTupleData m_tuple;
};

class PostgresHeapSeqScan {
private:
class ParallelScanState {
public:
ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) {
}
BlockNumber AssignNextBlockNumber();
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
};

public:
PostgresHeapSeqScan(RangeTblEntry *table);
~PostgresHeapSeqScan();
PostgresHeapSeqScan(const PostgresHeapSeqScan &other) = delete;
PostgresHeapSeqScan &operator=(const PostgresHeapSeqScan &other) = delete;
PostgresHeapSeqScan &operator=(PostgresHeapSeqScan &&other) = delete;
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState();
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
}

public:
Relation GetRelation();
TupleDesc GetTupleDesc();
bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo);
bool IsValid() const;

private:
Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo);

private:
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
ParallelScanState m_parallel_scan_state;
};

} // namespace quack
17 changes: 17 additions & 0 deletions include/quack/quack_memory_allocator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include "duckdb/common/allocator.hpp"

namespace quack {

struct QuackAllocatorData : public duckdb::PrivateAllocatorData {
explicit QuackAllocatorData() {
}
};

duckdb::data_ptr_t QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size);
void QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t ptr, duckdb::idx_t idx);
duckdb::data_ptr_t QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer,
duckdb::idx_t old_size, duckdb::idx_t size);

} // namespace quack
1 change: 1 addition & 0 deletions include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset);
} // namespace quack
Loading

0 comments on commit 6905d8c

Please sign in to comment.