diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml new file mode 100644 index 0000000..79fa8b2 --- /dev/null +++ b/.github/workflows/build_and_test.yaml @@ -0,0 +1,57 @@ +name: pg_quack Build and Test +on: + push: + pull_request: +jobs: + build-and-test: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + version: [REL_16_STABLE] + runs-on: ${{ matrix.os }} + + steps: + - name: Test details + run: echo Build and test pg_quack on ${{ matrix.os }} with PostgreSQL ${{ matrix.version }} branch + + - name: Checkout and build PostgreSQL code + run: | + sudo apt-get update -qq + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 + sudo apt-get install -y build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config libc++-dev libc++abi-dev libglib2.0-dev libtinfo5 cmake libstdc++-12-dev + rm -rf postgres + git clone --branch ${{ matrix.version }} --single-branch --depth 1 https://github.com/postgres/postgres.git + pushd postgres + git branch + ./configure --prefix=$PWD/inst/ --enable-cassert --enable-debug --with-openssl + make -j4 install + + - name: Start Postgres + run: | + pushd postgres + cd inst/bin + ./initdb -D data + ./pg_ctl -D data -l logfile start + popd + + - name: Checkout pg_quack extension code + uses: actions/checkout@v4 + with: + path: quack + + - name: Build and test pg_quack extension + id: regression-tests + run: | + export PATH="${PWD}/postgres/inst/bin:$PATH" + pushd quack + git submodule update --init --recursive + make + make install + make installcheck + popd + + - name: Print regression.diffs if regression tests failed + if: failure() && steps.regression-tests.outcome != 'success' + run: | + cat quack/regression.diffs diff --git a/.gitignore b/.gitignore index 5ff4f63..127065e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,12 +5,12 @@ .ccls compile_commands.json -build/ - *.a *.so *.o *.bc *.dylib -results \ No newline at end of file +results +regression.diffs +regression.out \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt deleted file mode 100644 index a59a4ca..0000000 --- a/CMakeLists.txt +++ /dev/null @@ -1,197 +0,0 @@ -set(CMAKE_WARN_DEPRECATED OFF CACHE BOOL "" FORCE) -cmake_minimum_required(VERSION 3.2 FATAL_ERROR) - -if (${CMAKE_SOURCE_DIR} STREQUAL ${CMAKE_BINARY_DIR}) - message(FATAL_ERROR "In-source builds not allowed. - Please make a new directory and run CMake from there. - You may need to remove CMakeCache.txt." ) -endif() - -list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") - -# Build Type - set a default build type `Release` if none was specified -set(PROJECT_DEFAULT_BUILD_TYPE "Release") - -if (CMAKE_BUILD_TYPE AND - NOT CMAKE_BUILD_TYPE MATCHES "^(Debug|Release|RelWithDebInfo|MinSizeRel)$") - message(FATAL_ERROR "Invalid value for CMAKE_BUILD_TYPE: ${CMAKE_BUILD_TYPE} - valid values are Debug|Release|RelWithDebInfo|MinSizeRel") -endif() - -if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) - message(STATUS "Setting build type to '${PROJECT_DEFAULT_BUILD_TYPE}' as none was specified.") - set(CMAKE_BUILD_TYPE "${PROJECT_DEFAULT_BUILD_TYPE}" CACHE - STRING "Choose the type of build." FORCE) -endif() - -message(STATUS "CMAKE_BUILD_TYPE ${CMAKE_BUILD_TYPE}") - -project(quack VERSION 0.0.1 LANGUAGES C CXX) - -string(TOLOWER "${PROJECT_NAME}" PROJECT_NAME_LOWER) - -set(PROJECT_LIB_VERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}") -set(PROJECT_LIB_NAME "${PROJECT_NAME_LOWER}") - -string(TIMESTAMP COMPILATION_DATE "%Y/%m/%d" UTC) - -set(POSTGRESQL_MINIMUM_VERSION "16.0.0") -set(BOOST_MINIMUM_VERSION "1.81.0") - -message(STATUS "POSTGRESQL_MINIMUM_VERSION=${POSTGRESQL_MINIMUM_VERSION}") -message(STATUS "BOOST_MINIMUM_VERSION=${BOOST_MINIMUM_VERSION}") - -# PostgreSQL - -find_package(PostgreSQL) - -if(NOT PostgreSQL_FOUND OR NOT PostgreSQL_VERSION_STRING) - message(FATAL_ERROR "PostgreSQL not found - Please check your PostgreSQL installation.") -endif() - -# for XbetaY XalphaY XrcY -> X.Y -string(REGEX REPLACE "([0-9]+)[beta|alpha|rc|devel].*" "\\1.0" POSTGRESQL_VERSION_STRING ${PostgreSQL_VERSION_STRING}) -STRING(REGEX MATCH "([0-9]+)\.([0-9]+)" POSTGRESQL_VERSION "${PostgreSQL_VERSION_STRING}") - -#for X.Y.Z -> XY Y<10 -string(REGEX REPLACE "^([0-9]+)\\.([0-9]+).*" "\\1\\2" PGSQL_VERSION ${POSTGRESQL_VERSION}) - -if("${POSTGRESQL_VERSION}" VERSION_LESS "${POSTGRESQL_MINIMUM_VERSION}") - message(FATAL_ERROR " PostgreSQL ${POSTGRESQL_MINIMUM_VERSION} or greater is required.") -endif("${POSTGRESQL_VERSION}" VERSION_LESS "${POSTGRESQL_MINIMUM_VERSION}") - -include_directories(${PostgreSQL_SERVER_INCLUDE_DIRS}) - -# For Apple and Postgres 16 use .dylib instead of .so -if (APPLE AND POSTGRESQL_VERSION VERSION_GREATER_EQUAL "16") - set(CMAKE_SHARED_MODULE_SUFFIX ".dylib") -endif() - -# Boost - -find_package(Boost ${BOOST_MINIMUM_VERSION} REQUIRED) - -if (NOT Boost_VERSION_MACRO) - set(Boost_VERSION_MACRO ${Boost_VERSION}) -endif() - -set(BOOST_VERSION "${Boost_MAJOR_VERSION}.${Boost_MINOR_VERSION}.${Boost_SUBMINOR_VERSION}") - -include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) - -add_definitions(-DBOOST_ALLOW_DEPRECATED_HEADERS) - -# Third party libs - -include(third_party/third_party.cmake) - -# Compiler - -include(CheckCCompilerFlag) -include(CheckCXXCompilerFlag) -set(CMAKE_CXX_STANDARD 17) - -set(COMPILER_VERSION "${CMAKE_CXX_COMPILER_ID}-${CMAKE_CXX_COMPILER_VERSION}") - -#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# compiler directives -#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# https://www.postgresql.org/docs/10/xfunc-c.html - - -CHECK_C_COMPILER_FLAG("-fPIC" C_COMPILER_SUPPORTS_FPIC) -CHECK_CXX_COMPILER_FLAG("-fPIC" CXX_COMPILER_SUPPORTS_FPIC) - -if(C_COMPILER_SUPPORTS_FPIC) - set(CMAKE_C_FLAGS "-fPIC") -endif() -if(CXX_COMPILER_SUPPORTS_FPIC) - set(CMAKE_CXX_FLAGS "-fPIC") -endif() - -message(STATUS "COMPILER: ${CMAKE_CXX_COMPILER_ID}") - -if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") - # Append CFLAGS/CXXFLAGS and PostgreSQL CFLAGS/CXXFAGS - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PostgreSQL_CFLAGS} $ENV{CFLAGS}") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-register ${PostgreSQL_CXXFLAGS} $ENV{CXXFLAGS}") - - # Debug compiler flags - if(CMAKE_BUILD_TYPE MATCHES "Debug") - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0") - endif() - # Release compiler flags - if(CMAKE_BUILD_TYPE MATCHES "Release") - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3") - endif() -endif() - -message(STATUS "CMAKE_C_FLAGS: ${CMAKE_C_FLAGS}") -message(STATUS "CMAKE_CXX_FLAGS: ${CMAKE_CXX_FLAGS}") - -# Include directory - -include_directories(${PROJECT_SOURCE_DIR}/include) - -# Source directories - -add_subdirectory("src") - -# Library - -set(LIBRARY_OUTPUT_PATH lib) -add_library(${PROJECT_LIB_NAME} MODULE ${PROJECT_OBJECTS}) - -message(STATUS "PROJECT_LIB_NAME ${PROJECT_LIB_NAME}") - -set(LINK_FLAGS "${PostgreSQL_SHARED_LINK_OPTIONS}") - -foreach(_dir ${PostgreSQL_SERVER_LIBRARY_DIRS}) - set(LINK_FLAGS "${LINK_FLAGS} -L${_dir}") -endforeach() - -if(APPLE) - set(LINK_FLAGS "${LINK_FLAGS} -bundle_loader ${PG_BINARY} -undefined dynamic_lookup") -endif() - -foreach (_third_party_lib ${QUACK_THIRD_PARTY_LIBS}) - target_link_libraries(${PROJECT_LIB_NAME} PUBLIC ${_third_party_lib}) -endforeach () - -set_target_properties(${PROJECT_LIB_NAME} - PROPERTIES PREFIX "" - LINK_FLAGS "${LINK_FLAGS} $ENV{LDFLAGS}") - -# PostgreSQL extension control and installation sql files - -add_subdirectory(sql) - -# Installation - -install(TARGETS ${PROJECT_LIB_NAME} DESTINATION ${PostgreSQL_PACKAGE_LIBRARY_DIR}) -install(FILES ${PROJECT_FILES_TO_INSTALL} DESTINATION "${PostgreSQL_EXTENSION_DIR}") - -# Install third party libraries - -foreach (_third_party_lib ${QUACK_THIRD_PARTY_LIBS}) - install(TARGETS ${_third_party_lib} DESTINATION ${PostgreSQL_PACKAGE_LIBRARY_DIR}) -endforeach () - -# Add regress test - -if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) - include(CTest) -endif() - -add_subdirectory("regress") - -# Copy compile-commands.json to ROOT - -if (CMAKE_EXPORT_COMPILE_COMMANDS) - add_custom_target(copy-compile-commands ALL - ${CMAKE_COMMAND} -E copy_if_different - ${CMAKE_BINARY_DIR}/compile_commands.json - ${PROJECT_SOURCE_DIR}) -endif() diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0f8c09a --- /dev/null +++ b/Makefile @@ -0,0 +1,84 @@ +.PHONY: duckdb install_duckdb clean_duckdb lintcheck + +MODULE_big = quack +EXTENSION = quack +DATA = quack.control $(wildcard quack--*.sql) + +SRCS = src/quack_heap_seq_scan.cpp \ + src/quack_heap_scan.cpp \ + src/quack_hooks.cpp \ + src/quack_select.cpp \ + src/quack_types.cpp \ + src/quack_memory_allocator.cpp \ + src/quack.cpp + +OBJS = $(subst .cpp,.o, $(SRCS)) + +REGRESS = basic + +PG_CONFIG ?= pg_config + +PGXS := $(shell $(PG_CONFIG) --pgxs) +PG_LIB := $(shell $(PG_CONFIG) --pkglibdir) +INCLUDEDIR := ${shell $(PG_CONFIG) --includedir} +INCLUDEDIR_SERVER := ${shell $(PG_CONFIG) --includedir-server} + +QUACK_BUILD_CXX_FLAGS= +QUACK_BUILD_DUCKDB= + +ifeq ($(QUACK_BUILD), Debug) + QUACK_BUILD_CXX_FLAGS = -g -O0 + QUACK_BUILD_DUCKDB = debug +else + QUACK_BUILD_CXX_FLAGS = + QUACK_BUILD_DUCKDB = release +endif + +override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 ${QUACK_BUILD_CXX_FLAGS} + +SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src -lstdc++ + +COMPILE.cc.bc = $(CXX) -Wno-ignored-attributes -Wno-register $(BITCODE_CXXFLAGS) $(CXXFLAGS) $(PG_CPPFLAGS) -I$(INCLUDEDIR_SERVER) -emit-llvm -c + +%.bc : %.cpp + $(COMPILE.cc.bc) $(SHLIB_LINK) $(PG_CPPFLAGS) -I$(INCLUDE_SERVER) -o $@ $< + +# determine the name of the duckdb library that is built +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Darwin) + DUCKDB_LIB = libduckdb.dylib +endif +ifeq ($(UNAME_S),Linux) + DUCKDB_LIB = libduckdb.so +endif + +all: duckdb $(OBJS) + +include $(PGXS) + +duckdb: third_party/duckdb third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) + +third_party/duckdb: + git submodule update --init --recursive + +third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB): + $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF CMAKE_EXPORT_COMPILE_COMMANDS=1 + +install_duckdb: + $(install_bin) -m 755 third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB) + +clean_duckdb: + rm -rf third_party/duckdb/build + +install: install_duckdb + +clean: clean_duckdb + +lintcheck: + clang-tidy $(SRCS) -- -I$(INCLUDEDIR) -I$(INCLUDEDIR_SERVER) -Iinclude $(CPPFLAGS) -std=c++17 + +.depend: + $(RM) -f .depend + $(foreach SRC,$(SRCS),$(CXX) $(CPPFLAGS) -I$(INCLUDEDIR) -I$(INCLUDEDIR_SERVER) -MM -MT $(SRC:.cpp=.o) $(SRC) >> .depend;) + +include .depend diff --git a/cmake/FindPostgreSQL.cmake b/cmake/FindPostgreSQL.cmake deleted file mode 100644 index 1a9af2d..0000000 --- a/cmake/FindPostgreSQL.cmake +++ /dev/null @@ -1,189 +0,0 @@ -# Copyright 2020 Mats Kindahl -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. -# -# .rst: FindPostgreSQL -# -------------------- -# -# Find the PostgreSQL installation. -# -# This module defines the following variables -# -# :: -# -# PostgreSQL_LIBRARIES - the PostgreSQL libraries needed for linking -# -# PostgreSQL_INCLUDE_DIRS - include directories -# -# PostgreSQL_SERVER_INCLUDE_DIRS - include directories for server programming -# -# PostgreSQL_LIBRARY_DIRS - link directories for PostgreSQL libraries -# -# PostgreSQL_EXTENSION_DIR - the directory for extensions -# -# PostgreSQL_SHARED_LINK_OPTIONS - options for shared libraries -# -# PostgreSQL_LINK_OPTIONS - options for static libraries and executables -# -# PostgreSQL_VERSION_STRING - the version of PostgreSQL found (since CMake -# 2.8.8) -# -# ---------------------------------------------------------------------------- -# History: This module is derived from the existing FindPostgreSQL.cmake and try -# to use most of the existing output variables of that module, but uses -# `pg_config` to extract the necessary information instead and add a macro for -# creating extensions. The use of `pg_config` is aligned with how the PGXS code -# distributed with PostgreSQL itself works. - -# Define additional search paths for root directories. -set(PostgreSQL_ROOT_DIRECTORIES ENV PGROOT ENV PGPATH ${PostgreSQL_ROOT}) - -if (DEFINED ENV{PG_CONFIG}) - set(PG_CONFIG "$ENV{PG_CONFIG}") -else() - find_program( - PG_CONFIG pg_config - PATHS ${PostgreSQL_ROOT_DIRECTORIES} - PATH_SUFFIXES bin) -endif() - -if(NOT PG_CONFIG) - message(FATAL_ERROR "Could not find pg_config") -else() - set(PostgreSQL_FOUND TRUE) -endif() - -message(STATUS "Found pg_config as ${PG_CONFIG}") - -if(PostgreSQL_FOUND) - macro(PG_CONFIG VAR OPT) - execute_process( - COMMAND ${PG_CONFIG} ${OPT} - OUTPUT_VARIABLE ${VAR} - OUTPUT_STRIP_TRAILING_WHITESPACE) - endmacro() - - pg_config(_pg_bindir --bindir) - pg_config(_pg_includedir --includedir) - pg_config(_pg_pkgincludedir --pkgincludedir) - pg_config(_pg_sharedir --sharedir) - pg_config(_pg_includedir_server --includedir-server) - pg_config(_pg_libs --libs) - pg_config(_pg_ldflags --ldflags) - pg_config(_pg_ldflags_sl --ldflags_sl) - pg_config(_pg_ldflags_ex --ldflags_ex) - pg_config(_pg_pkglibdir --pkglibdir) - pg_config(_pg_libdir --libdir) - pg_config(_pg_version --version) - pg_config(_pg_compileflags --configure) - pg_config(_pg_cflags --cflags) - pg_config(_pg_cflags_sl --cflags_sl) - pg_config(_pg_cxxflags --cppflags) - - separate_arguments(_pg_ldflags) - separate_arguments(_pg_ldflags_sl) - separate_arguments(_pg_ldflags_ex) - - set(_server_lib_dirs ${_pg_libdir} ${_pg_pkglibdir}) - set(_server_inc_dirs ${_pg_includedir_server} ${_pg_pkgincludedir}) - string(REPLACE ";" " " _shared_link_options - "${_pg_ldflags};${_pg_ldflags_sl}") - - - set(_link_options ${_pg_ldflags}) - if(_pg_ldflags_ex) - list(APPEND _link_options ${_pg_ldflags_ex}) - endif() - - string(FIND ${_pg_compileflags} "--with-llvm" _pg_with_llvm_idx) - - if(${_pg_with_llvm_idx} EQUAL -1) - set(_pg_with_llvm FALSE) - else() - set(_pg_with_llvm TRUE) - endif() - - set(PostgreSQL_INCLUDE_DIRS - "${_pg_includedir}" - CACHE PATH - "Top-level directory containing the PostgreSQL include directories." - ) - set(PostgreSQL_EXTENSION_DIR - "${_pg_sharedir}/extension" - CACHE PATH "Directory containing extension SQL and control files") - set(PostgreSQL_SERVER_INCLUDE_DIRS - "${_server_inc_dirs}" - CACHE PATH "PostgreSQL include directories for server include files.") - set(PostgreSQL_LIBRARY_DIRS - "${_pg_libdir}" - CACHE PATH "library directory for PostgreSQL") - set(PostgreSQL_LIBRARIES - "${_pg_libs}" - CACHE PATH "Libraries for PostgreSQL") - set(PostgreSQL_SHARED_LINK_OPTIONS - "${_shared_link_options}" - CACHE STRING "PostgreSQL linker options for shared libraries.") - set(PostgreSQL_LINK_OPTIONS - "${_pg_ldflags},${_pg_ldflags_ex}" - CACHE STRING "PostgreSQL linker options for executables.") - set(PostgreSQL_SERVER_LIBRARY_DIRS - "${_server_lib_dirs}" - CACHE PATH "PostgreSQL server library directories.") - set(PostgreSQL_VERSION_STRING - "${_pg_version}" - CACHE STRING "PostgreSQL version string") - set(PostgreSQL_PACKAGE_LIBRARY_DIR - "${_pg_pkglibdir}" - CACHE STRING "PostgreSQL package library directory") - set(PostgreSQL_WITH_LLVM - "${_pg_with_llvm}" - CACHE BOOL "PostgreSQL -with-llvm flag.") - set(PostgreSQL_CFLAGS - "${_pg_cflags}" - CACHE STRING "PostgreSQL CFLAGS") - set(PostgreSQL_CFLAGS_SL - "${_pg_cflags_sl}" - CACHE STRING "PostgreSQL CFLAGS_SL") - set(PostgreSQL_CXXFLAGS - "${_pg_cxxflags}" - CACHE STRING "PostgreSQL CXXFLAGS") - - find_program( - PG_BINARY postgres - PATHS ${PostgreSQL_ROOT_DIRECTORIES} - HINTS ${_pg_bindir} - PATH_SUFFIXES bin) - - if(NOT PG_BINARY) - message(FATAL_ERROR "Could not find postgres binary") - endif() - - message(STATUS "Found postgres binary at ${PG_BINARY}") - - find_program(PG_REGRESS pg_regress HINT - ${PostgreSQL_PACKAGE_LIBRARY_DIR}/pgxs/src/test/regress) - if(NOT PG_REGRESS) - message(STATUS "Could not find pg_regress, tests not executed") - endif() - - message(STATUS "PostgreSQL version: ${PostgreSQL_VERSION_STRING} found") - message( - STATUS - "PostgreSQL package library directory: ${PostgreSQL_PACKAGE_LIBRARY_DIR}") - message(STATUS "PostgreSQL libraries: ${PostgreSQL_LIBRARIES}") - message(STATUS "PostgreSQL extension directory: ${PostgreSQL_EXTENSION_DIR}") - message(STATUS "PostgreSQL linker options: ${PostgreSQL_LINK_OPTIONS}") - message( - STATUS "PostgreSQL shared linker options: ${PostgreSQL_SHARED_LINK_OPTIONS}" - ) -endif() \ No newline at end of file diff --git a/expected/basic.out b/expected/basic.out new file mode 100644 index 0000000..9197a9e --- /dev/null +++ b/expected/basic.out @@ -0,0 +1,46 @@ +CREATE EXTENSION quack; +SET client_min_messages to 'DEBUG3'; +CREATE TABLE t(a INT); +INSERT INTO t SELECT g % 10 from generate_series(1,1000000) g; +SELECT COUNT(*) FROM t; +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads -- + count +--------- + 1000000 +(1 row) + +SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads -- + a | count +---+-------- + 6 | 100000 + 7 | 100000 + 8 | 100000 + 9 | 100000 +(4 rows) + +SET quack.max_threads_per_query to 4; +SELECT COUNT(*) FROM t; +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads -- + count +--------- + 1000000 +(1 row) + +SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads -- + a | count +---+-------- + 6 | 100000 + 7 | 100000 + 8 | 100000 + 9 | 100000 +(4 rows) + +SET quack.max_threads_per_query TO default; +SET client_min_messages TO default; +DROP TABLE t; diff --git a/include/quack/quack.h b/include/quack/quack.h index 8be0d64..647c249 100644 --- a/include/quack/quack.h +++ b/include/quack/quack.h @@ -1,6 +1,8 @@ #pragma once -void _PG_init(void); +// quack.c +extern int quack_max_threads_per_query; +extern "C" void _PG_init(void); -// quack_internal.cpp -const char * quack_duckdb_version(); \ No newline at end of file +// quack_hooks.c +extern void quack_init_hooks(void); \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp new file mode 100644 index 0000000..5aa4ff3 --- /dev/null +++ b/include/quack/quack_heap_scan.hpp @@ -0,0 +1,94 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "executor/executor.h" +#include "access/relscan.h" +} + +#include "quack/quack.h" +#include "quack/quack_heap_seq_scan.hpp" + +// Postgres Relation + + +namespace quack { + +struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { +public: + PostgresHeapScanLocalState(PostgresHeapSeqScan &relation); + ~PostgresHeapScanLocalState() override; + +public: + PostgresHeapSeqScan & m_rel; + PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info; + bool m_exhausted_scan = false; +}; + +// Global State + +struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState { + explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation); + ~PostgresHeapScanGlobalState(); + idx_t + MaxThreads() const override { + return quack_max_threads_per_query; + } +}; + +struct PostgresHeapScanFunctionData : public duckdb::TableFunctionData { +public: + PostgresHeapScanFunctionData(PostgresHeapSeqScan &&relation, Snapshot Snapshot); + ~PostgresHeapScanFunctionData() override; + +public: + PostgresHeapSeqScan m_relation; +}; + +struct PostgresHeapScanFunction : public duckdb::TableFunction { +public: + PostgresHeapScanFunction(); + +public: + static duckdb::unique_ptr PostgresHeapBind(duckdb::ClientContext &context, + duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names); + static duckdb::unique_ptr + PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); + static duckdb::unique_ptr + PostgresHeapInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate); + // static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p); + // static bool PostgresParallelStateNext(ClientContext &context, const FunctionData *bind_data_p, + // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext + // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); + static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output); + // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); + // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, + // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void + // PostgresSerialize(Serializer &serializer, const optional_ptr bind_data, const TableFunction + // &function); +public: + static void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, TupleTableSlot *slot, idx_t offset); +}; + +struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData { +public: + PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) { + } + ~PostgresHeapReplacementScanData() override {}; + +public: + QueryDesc *desc; +}; + +duckdb::unique_ptr PostgresHeapReplacementScan(duckdb::ClientContext &context, + const duckdb::string &table_name, + duckdb::ReplacementScanData *data); + +} // namespace quack diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp new file mode 100644 index 0000000..4a30a9a --- /dev/null +++ b/include/quack/quack_heap_seq_scan.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "access/tableam.h" +#include "access/heapam.h" +} + +#include + +namespace quack { + +class PostgresHeapSeqScanThreadInfo { +public: + PostgresHeapSeqScanThreadInfo(); + ~PostgresHeapSeqScanThreadInfo(); + void EndScan(); + +public: + TupleDesc m_tuple_desc; + bool m_inited; + bool m_read_next_page; + bool m_page_tuples_all_visible; + int m_output_vector_size; + BlockNumber m_block_number; + Buffer m_buffer; + OffsetNumber m_current_tuple_index; + int m_page_tuples_left; + HeapTupleData m_tuple; +}; + +class PostgresHeapSeqScan { +private: + class ParallelScanState { + public: + ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) { + } + BlockNumber AssignNextBlockNumber(); + std::mutex m_lock; + BlockNumber m_nblocks; + BlockNumber m_last_assigned_block_number; + }; + +public: + PostgresHeapSeqScan(RangeTblEntry *table); + ~PostgresHeapSeqScan(); + PostgresHeapSeqScan(const PostgresHeapSeqScan &other) = delete; + PostgresHeapSeqScan &operator=(const PostgresHeapSeqScan &other) = delete; + PostgresHeapSeqScan &operator=(PostgresHeapSeqScan &&other) = delete; + PostgresHeapSeqScan(PostgresHeapSeqScan &&other); + +public: + void InitParallelScanState(); + void + SetSnapshot(Snapshot snapshot) { + m_snapshot = snapshot; + } + +public: + Relation GetRelation(); + TupleDesc GetTupleDesc(); + bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo); + bool IsValid() const; + +private: + Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo); + +private: + Relation m_rel = nullptr; + Snapshot m_snapshot = nullptr; + ParallelScanState m_parallel_scan_state; +}; + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_memory_allocator.hpp b/include/quack/quack_memory_allocator.hpp new file mode 100644 index 0000000..66a0a8c --- /dev/null +++ b/include/quack/quack_memory_allocator.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "duckdb/common/allocator.hpp" + +namespace quack { + +struct QuackAllocatorData : public duckdb::PrivateAllocatorData { + explicit QuackAllocatorData() { + } +}; + +duckdb::data_ptr_t QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size); +void QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t ptr, duckdb::idx_t idx); +duckdb::data_ptr_t QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, + duckdb::idx_t old_size, duckdb::idx_t size); + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_select.h b/include/quack/quack_select.h new file mode 100644 index 0000000..9ea8b57 --- /dev/null +++ b/include/quack/quack_select.h @@ -0,0 +1,9 @@ +#pragma once + +extern "C" { +#include "postgres.h" + +#include "executor/executor.h" +} + +extern "C" bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count); \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp new file mode 100644 index 0000000..05f3860 --- /dev/null +++ b/include/quack/quack_types.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "executor/tuptable.h" +} + +namespace quack { +duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); +void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); +void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); +void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset); +} // namespace quack \ No newline at end of file diff --git a/quack--0.0.1.sql b/quack--0.0.1.sql new file mode 100644 index 0000000..97a4d36 --- /dev/null +++ b/quack--0.0.1.sql @@ -0,0 +1 @@ +LOAD 'quack'; diff --git a/sql/quack.control b/quack.control similarity index 100% rename from sql/quack.control rename to quack.control diff --git a/regress/CMakeLists.txt b/regress/CMakeLists.txt deleted file mode 100644 index 82cfa5f..0000000 --- a/regress/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND} --verbose) - -if(PG_REGRESS) - -add_test( - NAME regress - COMMAND ${PG_REGRESS} - --temp-config=${PROJECT_SOURCE_DIR}/regress/regress.conf - --temp-instance=${CMAKE_BINARY_DIR}/tmp - --inputdir=${PROJECT_SOURCE_DIR}/regress - --schedule=${PROJECT_SOURCE_DIR}/regress/schedule.conf - --load-extension=quack -) - -endif() \ No newline at end of file diff --git a/regress/expected/basic.out b/regress/expected/basic.out deleted file mode 100644 index 3255711..0000000 --- a/regress/expected/basic.out +++ /dev/null @@ -1,2 +0,0 @@ -CREATE TABLE t(a INT); -DROP TABLE t; diff --git a/regress/regress.conf b/regress/regress.conf deleted file mode 100644 index f327aca..0000000 --- a/regress/regress.conf +++ /dev/null @@ -1,3 +0,0 @@ -# Configuration - -log_temp_files = -1 \ No newline at end of file diff --git a/regress/schedule.conf b/regress/schedule.conf deleted file mode 100644 index 43a5f3e..0000000 --- a/regress/schedule.conf +++ /dev/null @@ -1 +0,0 @@ -test: basic \ No newline at end of file diff --git a/regress/sql/basic.sql b/regress/sql/basic.sql deleted file mode 100644 index 27fc35c..0000000 --- a/regress/sql/basic.sql +++ /dev/null @@ -1,2 +0,0 @@ -CREATE TABLE t(a INT); -DROP TABLE t; \ No newline at end of file diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt deleted file mode 100644 index 627c87b..0000000 --- a/sql/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -file(GLOB SQL_FILES "${PROJECT_NAME_LOWER}--*") -SET(PROJECT_FILES_TO_INSTALL ${SQL_FILES} ${CMAKE_CURRENT_SOURCE_DIR}/${PROJECT_NAME_LOWER}.control PARENT_SCOPE) diff --git a/sql/basic.sql b/sql/basic.sql new file mode 100644 index 0000000..ddc1bd7 --- /dev/null +++ b/sql/basic.sql @@ -0,0 +1,20 @@ +CREATE EXTENSION quack; + +SET client_min_messages to 'DEBUG3'; + +CREATE TABLE t(a INT); + +INSERT INTO t SELECT g % 10 from generate_series(1,1000000) g; + +SELECT COUNT(*) FROM t; +SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; + +SET quack.max_threads_per_query to 4; + +SELECT COUNT(*) FROM t; +SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; + +SET quack.max_threads_per_query TO default; +SET client_min_messages TO default; + +DROP TABLE t; \ No newline at end of file diff --git a/sql/quack--0.0.1.sql b/sql/quack--0.0.1.sql deleted file mode 100644 index 87dee75..0000000 --- a/sql/quack--0.0.1.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE SCHEMA quack; \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt deleted file mode 100644 index 7d7c870..0000000 --- a/src/CMakeLists.txt +++ /dev/null @@ -1,4 +0,0 @@ -ADD_LIBRARY(extension OBJECT quack.c - quack_internal.cpp) - -set(PROJECT_OBJECTS ${PROJECT_OBJECTS} "$" PARENT_SCOPE) diff --git a/src/quack.c b/src/quack.c deleted file mode 100644 index cacbcea..0000000 --- a/src/quack.c +++ /dev/null @@ -1,21 +0,0 @@ -#include "postgres.h" - -#include "utils/guc.h" - -#include "quack/quack.h" - -PG_MODULE_MAGIC; - -static void quack_init_guc(void); - -void -_PG_init(void) { - quack_init_guc(); - elog(WARNING, "DuckDB version %s", quack_duckdb_version()); -} - -/* clang-format off */ -static void -quack_init_guc(void) { - -} diff --git a/src/quack.cpp b/src/quack.cpp new file mode 100644 index 0000000..dd50791 --- /dev/null +++ b/src/quack.cpp @@ -0,0 +1,38 @@ +extern "C" { +#include "postgres.h" + +#include "utils/guc.h" +} + +#include "quack/quack.h" + +static void quack_init_guc(void); + +int quack_max_threads_per_query = 1; + +extern "C" { +PG_MODULE_MAGIC; + +void +_PG_init(void) { + quack_init_guc(); + quack_init_hooks(); +} +} + +/* clang-format off */ +static void +quack_init_guc(void) { + DefineCustomIntVariable("quack.max_threads_per_query", + gettext_noop("DuckDB max no. threads per query."), + NULL, + &quack_max_threads_per_query, + 1, + 1, + 64, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); +} \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp new file mode 100644 index 0000000..09d0606 --- /dev/null +++ b/src/quack_heap_scan.cpp @@ -0,0 +1,201 @@ +#include "duckdb/main/client_context.hpp" +#include "duckdb/function/replacement_scan.hpp" +#include "duckdb/parser/tableref/table_function_ref.hpp" +#include "duckdb/parser/expression/function_expression.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" +#include "duckdb/parser/expression/comparison_expression.hpp" +#include "duckdb/parser/expression/columnref_expression.hpp" +#include "duckdb/common/enums/expression_type.hpp" + +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" + +namespace quack { + +// +// PostgresHeapScanFunctionData +// + +PostgresHeapScanFunctionData::PostgresHeapScanFunctionData(PostgresHeapSeqScan &&relation, Snapshot snapshot) + : m_relation(std::move(relation)) { + m_relation.SetSnapshot(snapshot); +} + +PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { +} + +// +// PostgresHeapScanGlobalState +// + +PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) { + relation.InitParallelScanState(); + elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads()); +} + +PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { +} + +// +// PostgresHeapScanLocalState +// + +PostgresHeapScanLocalState::PostgresHeapScanLocalState(PostgresHeapSeqScan &relation) : m_rel(relation) { + m_thread_seq_scan_info.m_tuple.t_tableOid = RelationGetRelid(relation.GetRelation()); + m_thread_seq_scan_info.m_tuple_desc = RelationGetDescr(relation.GetRelation()); +} + +PostgresHeapScanLocalState::~PostgresHeapScanLocalState() { + m_thread_seq_scan_info.EndScan(); +} + +// +// PostgresHeapScanFunction +// + +PostgresHeapScanFunction::PostgresHeapScanFunction() + : TableFunction("postgres_heap_scan", {}, PostgresHeapScanFunc, PostgresHeapBind, PostgresHeapInitGlobal, + PostgresHeapInitLocal) { + named_parameters["table"] = duckdb::LogicalType::POINTER; + named_parameters["snapshot"] = duckdb::LogicalType::POINTER; + // projection_pushdown = true; +} + +duckdb::unique_ptr +PostgresHeapScanFunction::PostgresHeapBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, + duckdb::vector &return_types, + duckdb::vector &names) { + auto table = (reinterpret_cast(input.named_parameters["table"].GetPointer())); + auto snapshot = (reinterpret_cast(input.named_parameters["snapshot"].GetPointer())); + + auto rel = PostgresHeapSeqScan(table); + auto tupleDesc = RelationGetDescr(rel.GetRelation()); + + if (!tupleDesc) { + elog(ERROR, "Failed to get tuple descriptor for relation with OID %u", table->relid); + return nullptr; + } + + for (int i = 0; i < tupleDesc->natts; i++) { + Form_pg_attribute attr = &tupleDesc->attrs[i]; + Oid type_oid = attr->atttypid; + auto col_name = duckdb::string(NameStr(attr->attname)); + auto duck_type = ConvertPostgresToDuckColumnType(type_oid); + return_types.push_back(duck_type); + names.push_back(col_name); + /* Log column name and type */ + elog(DEBUG3, "-- (DuckDB/PostgresHeapBind) Column name: %s, Type: %s --", col_name.c_str(), + duck_type.ToString().c_str()); + } + return duckdb::make_uniq(std::move(rel), snapshot); +} + +duckdb::unique_ptr +PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context, + duckdb::TableFunctionInitInput &input) { + auto &bind_data = input.bind_data->CastNoConst(); + return duckdb::make_uniq(bind_data.m_relation); +} + +duckdb::unique_ptr +PostgresHeapScanFunction::PostgresHeapInitLocal(duckdb::ExecutionContext &context, + duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate) { + auto &bind_data = input.bind_data->CastNoConst(); + return duckdb::make_uniq(bind_data.m_relation); +} + +void +PostgresHeapScanFunction::PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, + duckdb::DataChunk &output) { + auto &bind_data = data_p.bind_data->CastNoConst(); + auto &l_data = data_p.local_state->Cast(); + + auto &relation = bind_data.m_relation; + auto &exhausted_scan = l_data.m_exhausted_scan; + + l_data.m_thread_seq_scan_info.m_output_vector_size = 0; + + /* We have exhausted seq scan of heap table so we can return */ + if (exhausted_scan) { + output.SetCardinality(0); + return; + } + + auto has_tuple = relation.ReadPageTuples(output, l_data.m_thread_seq_scan_info); + + if (!has_tuple || l_data.m_thread_seq_scan_info.m_block_number == InvalidBlockNumber) { + exhausted_scan = true; + } +} + +// +// PostgresHeapReplacementScan +// + +static RangeTblEntry * +FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) { + ListCell *lc; + foreach (lc, tables) { + RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); + if (table->relid) { + auto rel = RelationIdGetRelation(table->relid); + + if (!RelationIsValid(rel)) { + elog(ERROR, "Relation with OID %u is not valid", table->relid); + return nullptr; + } + + char *rel_name = RelationGetRelationName(rel); + auto table_name = std::string(rel_name); + if (duckdb::StringUtil::CIEquals(table_name, to_find)) { + /* Allow only heap tables */ + if (!rel->rd_amhandler || (GetTableAmRoutine(rel->rd_amhandler) != GetHeapamTableAmRoutine())) { + /* This doesn't have an access method handler, we cant read from this */ + RelationClose(rel); + return nullptr; + } + RelationClose(rel); + return table; + } + RelationClose(rel); + } + } + return nullptr; +} + +static duckdb::vector> +CreateFunctionArguments(RangeTblEntry *table, Snapshot snapshot) { + duckdb::vector> children; + children.push_back(duckdb::make_uniq( + duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("table"), + duckdb::make_uniq(duckdb::Value::POINTER(duckdb::CastPointerToValue(table))))); + + children.push_back(duckdb::make_uniq( + duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("snapshot"), + duckdb::make_uniq(duckdb::Value::POINTER(duckdb::CastPointerToValue(snapshot))))); + return children; +} + +duckdb::unique_ptr +PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string &table_name, + duckdb::ReplacementScanData *data) { + + auto &scan_data = reinterpret_cast(*data); + + /* Check name against query rtable list and verify that it is heap table */ + auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name); + + if (!table) { + return nullptr; + } + + // Create POINTER values from the 'table' and 'snapshot' variables + auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot); + auto table_function = duckdb::make_uniq(); + table_function->function = duckdb::make_uniq("postgres_heap_scan", std::move(children)); + + return std::move(table_function); +} + +} // namespace quack diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp new file mode 100644 index 0000000..3cf9087 --- /dev/null +++ b/src/quack_heap_seq_scan.cpp @@ -0,0 +1,179 @@ +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "pgstat.h" +#include "access/valid.h" +#include "access/heapam.h" +#include "storage/bufmgr.h" +} + +#include "quack/quack_heap_seq_scan.hpp" +#include "quack/quack_types.hpp" + +#include + +namespace quack { + +PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table) + : m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) { +} + +PostgresHeapSeqScan::~PostgresHeapSeqScan() { + if (IsValid()) { + RelationClose(m_rel); + } +} + +PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) { + other.m_rel = nullptr; +} + +Relation +PostgresHeapSeqScan::GetRelation() { + return m_rel; +} + +bool +PostgresHeapSeqScan::IsValid() const { + return RelationIsValid(m_rel); +} + +TupleDesc +PostgresHeapSeqScan::GetTupleDesc() { + return RelationGetDescr(m_rel); +} + +Page +PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo) { + Page page = BufferGetPage(threadScanInfo.m_buffer); + TestForOldSnapshot(m_snapshot, m_rel, page); + threadScanInfo.m_page_tuples_all_visible = PageIsAllVisible(page) && !m_snapshot->takenDuringRecovery; + threadScanInfo.m_page_tuples_left = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1; + threadScanInfo.m_current_tuple_index = FirstOffsetNumber; + return page; +} + +void +PostgresHeapSeqScan::InitParallelScanState() { + m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); +} + +bool +PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo) { + BlockNumber block = InvalidBlockNumber; + Page page = nullptr; + + if (!threadScanInfo.m_inited) { + block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber(); + if (threadScanInfo.m_block_number == InvalidBlockNumber) { + return false; + } + threadScanInfo.m_inited = true; + threadScanInfo.m_read_next_page = true; + } else { + block = threadScanInfo.m_block_number; + page = BufferGetPage(threadScanInfo.m_buffer); + } + + while (block != InvalidBlockNumber) { + if (threadScanInfo.m_read_next_page) { + CHECK_FOR_INTERRUPTS(); + m_parallel_scan_state.m_lock.lock(); + block = threadScanInfo.m_block_number; + threadScanInfo.m_buffer = + ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, GetAccessStrategy(BAS_BULKREAD)); + LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE); + m_parallel_scan_state.m_lock.unlock(); + + page = PreparePageRead(threadScanInfo); + threadScanInfo.m_read_next_page = false; + } + + for (; threadScanInfo.m_page_tuples_left > 0 && threadScanInfo.m_output_vector_size < STANDARD_VECTOR_SIZE; + threadScanInfo.m_page_tuples_left--, threadScanInfo.m_current_tuple_index++, + threadScanInfo.m_output_vector_size++) { + bool visible = true; + ItemId lpp = PageGetItemId(page, threadScanInfo.m_current_tuple_index); + + if (!ItemIdIsNormal(lpp)) + continue; + + threadScanInfo.m_tuple.t_data = (HeapTupleHeader)PageGetItem(page, lpp); + threadScanInfo.m_tuple.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(threadScanInfo.m_tuple.t_self), block, threadScanInfo.m_current_tuple_index); + + if (!threadScanInfo.m_page_tuples_all_visible) { + visible = HeapTupleSatisfiesVisibility(&threadScanInfo.m_tuple, m_snapshot, threadScanInfo.m_buffer); + HeapCheckForSerializableConflictOut(visible, m_rel, &threadScanInfo.m_tuple, threadScanInfo.m_buffer, + m_snapshot); + /* skip tuples not visible to this snapshot */ + if (!visible) + continue; + } + + pgstat_count_heap_getnext(m_rel); + + InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, + threadScanInfo.m_output_vector_size); + } + + /* No more items on current page */ + if (!threadScanInfo.m_page_tuples_left) { + m_parallel_scan_state.m_lock.lock(); + UnlockReleaseBuffer(threadScanInfo.m_buffer); + m_parallel_scan_state.m_lock.unlock(); + threadScanInfo.m_read_next_page = true; + block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber(); + } + + /* We have collected STANDARD_VECTOR_SIZE */ + if (threadScanInfo.m_output_vector_size == STANDARD_VECTOR_SIZE) { + output.SetCardinality(threadScanInfo.m_output_vector_size); + threadScanInfo.m_output_vector_size = 0; + return true; + } + } + + /* Next assigned block number is InvalidBlockNumber so we check did we write any tuples in output vector */ + if (threadScanInfo.m_output_vector_size) { + output.SetCardinality(threadScanInfo.m_output_vector_size); + threadScanInfo.m_output_vector_size = 0; + } + + threadScanInfo.m_buffer = InvalidBuffer; + threadScanInfo.m_block_number = InvalidBlockNumber; + threadScanInfo.m_tuple.t_data = NULL; + threadScanInfo.m_read_next_page = false; + + return false; +} + +BlockNumber +PostgresHeapSeqScan::ParallelScanState::AssignNextBlockNumber() { + m_lock.lock(); + BlockNumber block_number = InvalidBlockNumber; + if (m_last_assigned_block_number == InvalidBlockNumber) { + block_number = m_last_assigned_block_number = 0; + } else if (m_last_assigned_block_number < m_nblocks - 1) { + block_number = ++m_last_assigned_block_number; + } + m_lock.unlock(); + return block_number; +} + +PostgresHeapSeqScanThreadInfo::PostgresHeapSeqScanThreadInfo() + : m_tuple_desc(NULL), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), + m_buffer(InvalidBuffer), m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) { + m_tuple.t_data = NULL; + ItemPointerSetInvalid(&m_tuple.t_self); +} + +PostgresHeapSeqScanThreadInfo::~PostgresHeapSeqScanThreadInfo() { +} + +void +PostgresHeapSeqScanThreadInfo::EndScan() { +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp new file mode 100644 index 0000000..93706a5 --- /dev/null +++ b/src/quack_hooks.cpp @@ -0,0 +1,33 @@ +extern "C" { +#include "postgres.h" +#include "commands/extension.h" +} + +#include "quack/quack.h" +#include "quack/quack_select.h" + +static ExecutorRun_hook_type PrevExecutorRunHook = NULL; + +static bool +is_quack_extension_registered() { + return get_extension_oid("quack", true) != InvalidOid; +} + +static void +quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { + if (is_quack_extension_registered() && queryDesc->operation == CMD_SELECT) { + if (quack_execute_select(queryDesc, direction, count)) { + return; + } + } + + if (PrevExecutorRunHook) { + PrevExecutorRunHook(queryDesc, direction, count, execute_once); + } +} + +void +quack_init_hooks(void) { + PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun; + ExecutorRun_hook = quack_executor_run; +} \ No newline at end of file diff --git a/src/quack_internal.cpp b/src/quack_internal.cpp deleted file mode 100644 index 5ff0a6a..0000000 --- a/src/quack_internal.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#include "duckdb.hpp" - -extern "C" const char * -quack_duckdb_version() { - return duckdb::DuckDB::LibraryVersion(); -} \ No newline at end of file diff --git a/src/quack_memory_allocator.cpp b/src/quack_memory_allocator.cpp new file mode 100644 index 0000000..9fde45b --- /dev/null +++ b/src/quack_memory_allocator.cpp @@ -0,0 +1,27 @@ +#include "duckdb/common/allocator.hpp" + +extern "C" { +#include "postgres.h" +} + +#include "quack/quack_memory_allocator.hpp" + +namespace quack { + +duckdb::data_ptr_t +QuackAllocate(duckdb::PrivateAllocatorData *private_data, duckdb::idx_t size) { + return reinterpret_cast(palloc(size)); +} + +void +QuackFree(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, duckdb::idx_t idx) { + return pfree(pointer); +} + +duckdb::data_ptr_t +QuackReallocate(duckdb::PrivateAllocatorData *private_data, duckdb::data_ptr_t pointer, duckdb::idx_t old_size, + duckdb::idx_t size) { + return reinterpret_cast(repalloc(pointer, size)); +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_select.cpp b/src/quack_select.cpp new file mode 100644 index 0000000..53b2c4d --- /dev/null +++ b/src/quack_select.cpp @@ -0,0 +1,109 @@ +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "fmgr.h" + +#include "access/genam.h" +#include "access/table.h" +#include "catalog/namespace.h" +#include "catalog/pg_proc.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/rel.h" + +#include "quack/quack_select.h" +} + +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" +#include "quack/quack_memory_allocator.hpp" + +namespace quack { + +static duckdb::unique_ptr +quack_open_database() { + duckdb::DBConfig config; + //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + return duckdb::make_uniq(nullptr, &config); +} + +} // namespace quack + +extern "C" bool +quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { + auto db = quack::quack_open_database(); + + /* Add heap tables */ + db->instance->config.replacement_scans.emplace_back( + quack::PostgresHeapReplacementScan, + duckdb::make_uniq_base(query_desc)); + auto connection = duckdb::make_uniq(*db); + + // Add the postgres_scan inserted by the replacement scan + auto &context = *connection->context; + quack::PostgresHeapScanFunction heap_scan_fun; + duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); + + auto &catalog = duckdb::Catalog::GetSystemCatalog(context); + context.transaction.BeginTransaction(); + catalog.CreateTableFunction(context, &heap_scan_info); + context.transaction.Commit(); + + idx_t column_count; + + CmdType operation; + DestReceiver *dest; + + TupleTableSlot *slot = NULL; + + // FIXME: try-catch ? + auto res = connection->Query(query_desc->sourceText); + if (res->HasError()) { + return false; + } + + operation = query_desc->operation; + dest = query_desc->dest; + + dest->rStartup(dest, operation, query_desc->tupDesc); + + slot = MakeTupleTableSlot(query_desc->tupDesc, &TTSOpsHeapTuple); + column_count = res->ColumnCount(); + + while (true) { + + auto chunk = res->Fetch(); + + if (!chunk || chunk->size() == 0) { + break; + } + + for (idx_t row = 0; row < chunk->size(); row++) { + ExecClearTuple(slot); + + for (idx_t col = 0; col < column_count; col++) { + auto value = chunk->GetValue(col, row); + if (value.IsNull()) { + slot->tts_isnull[col] = true; + } else { + slot->tts_isnull[col] = false; + quack::ConvertDuckToPostgresValue(slot, value, col); + } + } + + ExecStoreVirtualTuple(slot); + + dest->receiveSlot(slot, dest); + + for (idx_t i = 0; i < column_count; i++) { + if (slot->tts_tupleDescriptor->attrs[i].attbyval == false) { + pfree(DatumGetPointer(slot->tts_values[i])); + } + } + } + } + dest->rShutdown(dest); + return true; +} \ No newline at end of file diff --git a/src/quack_types.cpp b/src/quack_types.cpp new file mode 100644 index 0000000..11c9ac3 --- /dev/null +++ b/src/quack_types.cpp @@ -0,0 +1,254 @@ +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "catalog/pg_type.h" +#include "executor/tuptable.h" +} + +#include "quack/quack.h" + +namespace quack { + +// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 +constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; +constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; + +void +ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col) { + Oid oid = slot->tts_tupleDescriptor->attrs[col].atttypid; + + switch (oid) { + case BOOLOID: + slot->tts_values[col] = value.GetValue(); + break; + case CHAROID: + slot->tts_values[col] = value.GetValue(); + break; + case INT2OID: + slot->tts_values[col] = value.GetValue(); + break; + case INT4OID: + slot->tts_values[col] = value.GetValue(); + break; + case INT8OID: + slot->tts_values[col] = value.GetValue(); + break; + case BPCHAROID: + case TEXTOID: + case VARCHAROID: { + auto str = value.GetValue(); + auto varchar = str.c_str(); + auto varchar_len = str.size(); + + text *result = (text *)palloc0(varchar_len + VARHDRSZ); + SET_VARSIZE(result, varchar_len + VARHDRSZ); + memcpy(VARDATA(result), varchar, varchar_len); + slot->tts_values[col] = PointerGetDatum(result); + break; + } + case DATEOID: { + duckdb::date_t date = value.GetValue(); + slot->tts_values[col] = date.days - QUACK_DUCK_DATE_OFFSET; + break; + } + case TIMESTAMPOID: { + duckdb::dtime_t timestamp = value.GetValue(); + slot->tts_values[col] = timestamp.micros - QUACK_DUCK_TIMESTAMP_OFFSET; + break; + } + case FLOAT8OID: + case NUMERICOID: { + double result_double = value.GetValue(); + slot->tts_tupleDescriptor->attrs[col].atttypid = FLOAT8OID; + slot->tts_tupleDescriptor->attrs[col].attbyval = true; + memcpy(&slot->tts_values[col], (char *)&result_double, sizeof(double)); + break; + } + default: + elog(ERROR, "Unsuported quack type: %d", oid); + } +} + +duckdb::LogicalType +ConvertPostgresToDuckColumnType(Oid type) { + switch (type) { + case BOOLOID: + return duckdb::LogicalTypeId::BOOLEAN; + case CHAROID: + return duckdb::LogicalTypeId::TINYINT; + case INT2OID: + return duckdb::LogicalTypeId::SMALLINT; + case INT4OID: + return duckdb::LogicalTypeId::INTEGER; + case INT8OID: + return duckdb::LogicalTypeId::BIGINT; + case BPCHAROID: + case TEXTOID: + case VARCHAROID: + return duckdb::LogicalTypeId::VARCHAR; + case DATEOID: + return duckdb::LogicalTypeId::DATE; + case TIMESTAMPOID: + return duckdb::LogicalTypeId::TIMESTAMP; + default: + elog(ERROR, "Unsupported quack type: %d", type); + } +} + +template +static void +Append(duckdb::Vector &result, T value, idx_t offset) { + auto data = duckdb::FlatVector::GetData(result); + data[offset] = value; +} + +static void +AppendString(duckdb::Vector &result, Datum value, idx_t offset) { + const char *text = VARDATA_ANY(value); + int len = VARSIZE_ANY_EXHDR(value); + duckdb::string_t str(text, len); + + auto data = duckdb::FlatVector::GetData(result); + data[offset] = duckdb::StringVector::AddString(result, str); +} + +void +ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { + + switch (result.GetType().id()) { + case duckdb::LogicalTypeId::BOOLEAN: + Append(result, DatumGetBool(value), offset); + break; + case duckdb::LogicalTypeId::TINYINT: + Append(result, DatumGetChar(value), offset); + break; + case duckdb::LogicalTypeId::SMALLINT: + Append(result, DatumGetInt16(value), offset); + break; + case duckdb::LogicalTypeId::INTEGER: + Append(result, DatumGetInt32(value), offset); + break; + case duckdb::LogicalTypeId::BIGINT: + Append(result, DatumGetInt64(value), offset); + break; + case duckdb::LogicalTypeId::VARCHAR: + AppendString(result, value, offset); + break; + case duckdb::LogicalTypeId::DATE: + Append(result, duckdb::date_t(static_cast(value + QUACK_DUCK_DATE_OFFSET)), offset); + break; + case duckdb::LogicalTypeId::TIMESTAMP: + Append(result, duckdb::dtime_t(static_cast(value + QUACK_DUCK_TIMESTAMP_OFFSET)), + offset); + break; + default: + elog(ERROR, "Unsupported quack type: %d", static_cast(result.GetType().id())); + break; + } +} + +typedef struct HeapTuplePageReadState { + bool m_slow = 0; + int m_nvalid = 0; + uint32 m_offset = 0; +} HeapTuplePageReadState; + +static Datum +HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePageReadState &heapTupleReadState, + int natts, bool *isNull) { + + HeapTupleHeader tup = tuple->t_data; + bool hasnulls = HeapTupleHasNulls(tuple); + int attnum; + char *tp; + uint32 off; + bits8 *bp = tup->t_bits; + bool slow = false; + Datum value = (Datum)0; + + /* We can only fetch as many attributes as the tuple has. */ + natts = Min(HeapTupleHeaderGetNatts(tuple->t_data), natts); + + attnum = heapTupleReadState.m_nvalid; + if (attnum == 0) { + /* Start from the first attribute */ + off = 0; + heapTupleReadState.m_slow = false; + } else { + /* Restore state from previous execution */ + off = heapTupleReadState.m_offset; + slow = heapTupleReadState.m_slow; + } + + tp = (char *)tup + tup->t_hoff; + + for (; attnum < natts; attnum++) { + Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum); + + if (hasnulls && att_isnull(attnum, bp)) { + value = (Datum)0; + *isNull = true; + slow = true; /* can't use attcacheoff anymore */ + continue; + } + + *isNull = false; + + if (!slow && thisatt->attcacheoff >= 0) { + off = thisatt->attcacheoff; + } else if (thisatt->attlen == -1) { + + if (!slow && off == att_align_nominal(off, thisatt->attalign)) { + thisatt->attcacheoff = off; + } else { + off = att_align_pointer(off, thisatt->attalign, -1, tp + off); + slow = true; + } + } else { + off = att_align_nominal(off, thisatt->attalign); + + if (!slow) { + thisatt->attcacheoff = off; + } + } + + value = fetchatt(thisatt, tp + off); + + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + if (thisatt->attlen <= 0) { + slow = true; + } + } + + heapTupleReadState.m_nvalid = attnum; + heapTupleReadState.m_offset = off; + + if (slow) { + heapTupleReadState.m_slow = true; + } else { + heapTupleReadState.m_slow = false; + } + + return value; +} + +void +InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset) { + HeapTuplePageReadState heapTupleReadState = {}; + for (int i = 0; i < tupleDesc->natts; i++) { + auto &result = output.data[i]; + bool isNull = false; + Datum value = HeapTupleFetchNextDatumValue(tupleDesc, slot, heapTupleReadState, i + 1, &isNull); + if (isNull) { + auto &array_mask = duckdb::FlatVector::Validity(result); + array_mask.SetInvalid(offset); + } else { + ConvertPostgresToDuckValue(value, result, offset); + } + } +} + +} // namespace quack diff --git a/third_party/third_party.cmake b/third_party/third_party.cmake deleted file mode 100644 index 36ef483..0000000 --- a/third_party/third_party.cmake +++ /dev/null @@ -1,25 +0,0 @@ -if(POLICY CMP0077) - set(CMAKE_POLICY_DEFAULT_CMP0077 NEW) -endif() - -# -# duckdb -# -execute_process(COMMAND git submodule update --init -- third_party/duckdb - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) - -# Disable ASAN -SET(ENABLE_SANITIZER OFF) -SET(ENABLE_UBSAN OFF) - -# No DuckDB cli -SET(BUILD_SHELL OFF) - -# Disable unitest -SET(BUILD_UNITTESTS OFF) - -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/third_party/duckdb EXCLUDE_FROM_ALL) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/duckdb/src/include) - -SET(QUACK_THIRD_PARTY_LIBS duckdb) \ No newline at end of file