From 002b4c8fb598d4dc0f42adce58a67dc424382fb7 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 30 Sep 2024 17:23:02 +0200 Subject: [PATCH] Add support for CREATE TABLE AS --- include/pgduckdb/pgduckdb_duckdb.hpp | 4 ++ include/pgduckdb/pgduckdb_planner.hpp | 2 + src/pgduckdb_ddl.cpp | 54 +++++++++++++++++-- src/pgduckdb_duckdb.cpp | 12 +++++ src/pgduckdb_planner.cpp | 28 ++++++++++ test/pycheck/explain_test.py | 31 +++++++++++ test/pycheck/prepared_test.py | 26 +++++++++ test/regression/expected/temporary_tables.out | 41 +++++++++++++- test/regression/sql/temporary_tables.sql | 18 +++++++ 9 files changed, 210 insertions(+), 6 deletions(-) diff --git a/include/pgduckdb/pgduckdb_duckdb.hpp b/include/pgduckdb/pgduckdb_duckdb.hpp index 08605ea8..848a7a60 100644 --- a/include/pgduckdb/pgduckdb_duckdb.hpp +++ b/include/pgduckdb/pgduckdb_duckdb.hpp @@ -37,4 +37,8 @@ class DuckDBManager { duckdb::unique_ptr DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_columns, const char *query); +void DuckdbSetPostgresState(duckdb::ClientContext &context, List *rtables, PlannerInfo *planner_info, + List *needed_columns, const char *query); +void DuckdbResetPostgresState(duckdb::ClientContext &context); + } // namespace pgduckdb diff --git a/include/pgduckdb/pgduckdb_planner.hpp b/include/pgduckdb/pgduckdb_planner.hpp index 1784a46f..77ff2e45 100644 --- a/include/pgduckdb/pgduckdb_planner.hpp +++ b/include/pgduckdb/pgduckdb_planner.hpp @@ -14,3 +14,5 @@ extern bool duckdb_explain_analyze; PlannedStmt *DuckdbPlanNode(Query *parse, int cursor_options, ParamListInfo bound_params); std::tuple, duckdb::unique_ptr> DuckdbPrepare(const Query *query, ParamListInfo bound_params); +duckdb::unique_ptr DuckdbInsertIntoSelect(duckdb::ClientContext &context, + const Query *select_query, Oid target_table); diff --git a/src/pgduckdb_ddl.cpp b/src/pgduckdb_ddl.cpp index a7d69793..fb34dc75 100644 --- a/src/pgduckdb_ddl.cpp +++ b/src/pgduckdb_ddl.cpp @@ -18,13 +18,22 @@ extern "C" { #include "executor/spi.h" #include "miscadmin.h" +#include "pgduckdb/vendor/pg_ruleutils.h" #include "pgduckdb/pgduckdb_ruleutils.h" } #include "pgduckdb/pgduckdb_duckdb.hpp" +#include "pgduckdb/pgduckdb_planner.hpp" #include "pgduckdb/vendor/pg_list.hpp" #include +/* + * ctas_skip_data stores the original value of the skipData field of the + * CreateTableAsStmt of the query that's currently being executed. For duckdb + * tables we force this value to false. + */ +static bool ctas_skip_data = false; + /* * Truncates the given table in DuckDB. */ @@ -50,7 +59,14 @@ DuckdbHandleDDL(Node *parsetree, const char *queryString) { return; } - elog(ERROR, "DuckDB does not support CREATE TABLE AS yet"); + /* + * Force skipData to false for duckdb tables, so that Postgres does + * not execute the query, and save the original value in ctas_skip_data + * so we can use it later in duckdb_create_table_trigger to choose + * whether to execute the query in DuckDB or not. + */ + ctas_skip_data = stmt->into->skipData; + stmt->into->skipData = true; } } @@ -84,8 +100,8 @@ duckdb_create_table_trigger(PG_FUNCTION_ARGS) { if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) /* internal error */ elog(ERROR, "not fired by event trigger manager"); - EventTriggerData *trigdata = (EventTriggerData *)fcinfo->context; - Node *parsetree = trigdata->parsetree; + EventTriggerData *trigger_data = (EventTriggerData *)fcinfo->context; + Node *parsetree = trigger_data->parsetree; SPI_connect(); @@ -161,15 +177,43 @@ duckdb_create_table_trigger(PG_FUNCTION_ARGS) { elog(ERROR, "Unexpected parsetree type: %d", nodeTag(parsetree)); } - std::string query_string(pgduckdb_get_tabledef(relid)); + std::string create_table_string(pgduckdb_get_tabledef(relid)); auto db = pgduckdb::DuckDBManager::Get().GetDatabase(); auto connection = duckdb::make_uniq(db); auto &context = *connection->context; - auto result = context.Query(query_string, false); + Query *ctas_query = nullptr; + + if (IsA(parsetree, CreateTableAsStmt) && !ctas_skip_data) { + auto stmt = castNode(CreateTableAsStmt, parsetree); + ctas_query = (Query *)stmt->query; + } + + if (ctas_query) { + auto result = context.Query("BEGIN TRANSACTION", false); + if (result->HasError()) { + elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not start transaction: %s", + result->GetError().c_str()); + } + } + + auto result = context.Query(create_table_string, false); if (result->HasError()) { elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not create table: %s", result->GetError().c_str()); } + if (ctas_query) { + auto result = DuckdbInsertIntoSelect(context, ctas_query, relid); + + if (result->HasError()) { + elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not insert data: %s", result->GetError().c_str()); + } + + result = context.Query("COMMIT", false); + if (result->HasError()) { + elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not commit transaction: %s", + result->GetError().c_str()); + } + } PG_RETURN_NULL(); } diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index 58cb72bb..875121e3 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -169,6 +169,18 @@ DuckDBManager::LoadExtensions(duckdb::ClientContext &context) { } } +void +DuckdbSetPostgresState(duckdb::ClientContext &context, List *rtables, PlannerInfo *planner_info, List *needed_columns, + const char *query) { + context.registered_state->Insert( + "postgres_state", duckdb::make_shared_ptr(rtables, planner_info, needed_columns, query)); +} + +void +DuckdbResetPostgresState(duckdb::ClientContext &context) { + context.registered_state->Remove("postgres_state"); +} + duckdb::unique_ptr DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_columns, const char *query) { auto &db = DuckDBManager::Get().GetDatabase(); diff --git a/src/pgduckdb_planner.cpp b/src/pgduckdb_planner.cpp index 031464d9..affb5599 100644 --- a/src/pgduckdb_planner.cpp +++ b/src/pgduckdb_planner.cpp @@ -13,6 +13,7 @@ extern "C" { #include "utils/guc.h" #include "pgduckdb/vendor/pg_ruleutils.h" +#include "pgduckdb/pgduckdb_ruleutils.h" } #include "pgduckdb/pgduckdb_duckdb.hpp" @@ -95,6 +96,33 @@ DuckdbPrepare(const Query *query, ParamListInfo bound_params) { return {std::move(prepared_query), std::move(duckdb_connection)}; } +duckdb::unique_ptr +DuckdbInsertIntoSelect(duckdb::ClientContext &context, const Query *select_query, Oid target_table) { + + /* + * Copy the query, so the original one is not modified by the + * subquery_planner call that PlanQuery does. + */ + Query *copied_query = (Query *)copyObjectImpl(select_query); + const char *query_string = pgduckdb_pg_get_querydef(copied_query, false); + + // XXX: Should we do something for EXPLAIN here? + + List *rtables = copied_query->rtable; + /* Extract required vars for table */ + int flags = PVC_RECURSE_AGGREGATES | PVC_RECURSE_WINDOWFUNCS | PVC_RECURSE_PLACEHOLDERS; + List *vars = list_concat(pull_var_clause((Node *)copied_query->targetList, flags), + pull_var_clause((Node *)copied_query->jointree->quals, flags)); + PlannerInfo *query_planner_info = PlanQuery(copied_query, NULL); + + pgduckdb::DuckdbSetPostgresState(context, rtables, query_planner_info, vars, query_string); + + std::string insert_string = std::string("INSERT INTO ") + pgduckdb_relation_name(target_table) + " " + query_string; + auto result = context.Query(insert_string, false); + pgduckdb::DuckdbResetPostgresState(context); + return result; +} + static Plan * CreatePlan(Query *query, ParamListInfo bound_params) { /* diff --git a/test/pycheck/explain_test.py b/test/pycheck/explain_test.py index 3569ba11..bf2f9d42 100644 --- a/test/pycheck/explain_test.py +++ b/test/pycheck/explain_test.py @@ -1,5 +1,8 @@ from .utils import Cursor +import pytest +import psycopg.errors + def test_explain(cur: Cursor): cur.sql("CREATE TABLE test_table (id int, name text)") @@ -27,3 +30,31 @@ def test_explain(cur: Cursor): assert "UNGROUPED_AGGREGATE" in plan assert "id=1 AND id IS NOT NULL" in plan assert "Total Time:" in plan + + +def test_explain_ctas(cur: Cursor): + cur.sql("CREATE TEMP TABLE heap1(id) AS SELECT 1") + result = cur.sql("EXPLAIN CREATE TEMP TABLE heap2(id) AS SELECT * from heap1") + plan = "\n".join(result) + assert "POSTGRES_SEQ_SCAN" in plan + assert "Total Time:" not in plan + + result = cur.sql( + "EXPLAIN ANALYZE CREATE TEMP TABLE heap2(id) AS SELECT * from heap1" + ) + plan = "\n".join(result) + assert "POSTGRES_SEQ_SCAN" in plan + assert "Total Time:" in plan + + result = cur.sql( + "EXPLAIN CREATE TEMP TABLE duckdb1(id) USING duckdb AS SELECT * from heap1" + ) + plan = "\n".join(result) + assert "POSTGRES_SEQ_SCAN" in plan + assert "Total Time:" not in plan + + # EXPLAIN ANALYZE is not supported for DuckDB CTAS (yet) + with pytest.raises(psycopg.errors.FeatureNotSupported): + cur.sql( + "EXPLAIN ANALYZE CREATE TEMP TABLE duckdb2(id) USING duckdb AS SELECT * from heap1" + ) diff --git a/test/pycheck/prepared_test.py b/test/pycheck/prepared_test.py index 4f7b8389..a7152434 100644 --- a/test/pycheck/prepared_test.py +++ b/test/pycheck/prepared_test.py @@ -130,3 +130,29 @@ def test_prepared_pipeline(conn: Connection): p.sync() assert cur.sql("SELECT * FROM duckt ORDER BY id") == [1, 3, 4] assert cur.sql("SELECT * FROM heapt ORDER BY id") == [] + + +def test_prepared_ctas(cur: Cursor): + cur.sql("CREATE TABLE heapt (id int, number int)") + cur.sql("INSERT INTO heapt VALUES (1, 2), (2, 4), (3, 6)") + cur.sql("CREATE TEMP TABLE t USING duckdb AS SELECT * FROM heapt") + assert cur.sql("SELECT * FROM t ORDER BY id") == [(1, 2), (2, 4), (3, 6)] + + # We don't support CTAS with parameters yet. The error message and code + # could be better, but this is what we have right now. At least we don't + # crash. + with pytest.raises( + psycopg.errors.InternalError, + match="Expected 1 parameters, but none were supplied", + ): + cur.sql( + "CREATE TEMP TABLE t2 USING duckdb AS SELECT * FROM heapt where id = %s", + (2,), + ) + + prepared_query = "CREATE TEMP TABLE t3 USING duckdb AS SELECT * FROM heapt" + cur.sql(prepared_query, prepare=True) + assert cur.sql("SELECT count(*) FROM t3") == 3 + cur.sql("DROP TABLE t3") + cur.sql(prepared_query) + assert cur.sql("SELECT count(*) FROM t3") == 3 diff --git a/test/regression/expected/temporary_tables.out b/test/regression/expected/temporary_tables.out index 044e2a69..6311dd19 100644 --- a/test/regression/expected/temporary_tables.out +++ b/test/regression/expected/temporary_tables.out @@ -180,8 +180,47 @@ DROP TABLE t; -- unsupported CREATE TEMP TABLE t(a int) ON COMMIT DROP; ERROR: DuckDB does not support ON COMMIT DROP +-- CTAS fully in Duckdb CREATE TEMP TABLE webpages USING duckdb AS SELECT * FROM read_csv('../../data/web_page.csv') as (column00 int, column01 text, column02 date); -ERROR: DuckDB does not support CREATE TABLE AS yet +SELECT * FROM webpages ORDER BY column00 LIMIT 2; + column00 | column01 | column02 +----------+------------------+------------ + 1 | AAAAAAAABAAAAAAA | 1997-09-03 + 2 | AAAAAAAACAAAAAAA | 1997-09-03 +(2 rows) + +CREATE TEMP TABLE t_heap(a int) USING heap; +INSERT INTO t_heap VALUES (1); +-- CTAS from postgres table to duckdb table +CREATE TEMP TABLE t(b) USING duckdb AS SELECT * FROM t_heap; +SELECT * FROM t; + b +--- + 1 +(1 row) + +-- CTAS from DuckDB table to postgres table +CREATE TEMP TABLE t_heap2(c) USING heap AS SELECT * FROM t_heap; +SELECT * FROM t_heap2; + c +--- + 1 +(1 row) + +SELECT duckdb.raw_query($$ SELECT database_name, schema_name, sql FROM duckdb_tables() $$); +NOTICE: result: database_name schema_name sql +VARCHAR VARCHAR VARCHAR +[ Rows: 2] +memory pg_temp CREATE TABLE pg_temp.t(b INTEGER); +memory pg_temp CREATE TABLE pg_temp.webpages(column00 INTEGER, column01 VARCHAR, column02 DATE); + + + raw_query +----------- + +(1 row) + +DROP TABLE webpages, t, t_heap, t_heap2; CREATE TEMP TABLE t(a int); ALTER TABLE t ADD COLUMN b int; ERROR: DuckDB does not support ALTER TABLE yet diff --git a/test/regression/sql/temporary_tables.sql b/test/regression/sql/temporary_tables.sql index 9fb27827..dd9ab48f 100644 --- a/test/regression/sql/temporary_tables.sql +++ b/test/regression/sql/temporary_tables.sql @@ -117,7 +117,25 @@ DROP TABLE t; -- unsupported CREATE TEMP TABLE t(a int) ON COMMIT DROP; +-- CTAS fully in Duckdb CREATE TEMP TABLE webpages USING duckdb AS SELECT * FROM read_csv('../../data/web_page.csv') as (column00 int, column01 text, column02 date); +SELECT * FROM webpages ORDER BY column00 LIMIT 2; + +CREATE TEMP TABLE t_heap(a int) USING heap; +INSERT INTO t_heap VALUES (1); + +-- CTAS from postgres table to duckdb table +CREATE TEMP TABLE t(b) USING duckdb AS SELECT * FROM t_heap; +SELECT * FROM t; + +-- CTAS from DuckDB table to postgres table +CREATE TEMP TABLE t_heap2(c) USING heap AS SELECT * FROM t_heap; +SELECT * FROM t_heap2; + +SELECT duckdb.raw_query($$ SELECT database_name, schema_name, sql FROM duckdb_tables() $$); + +DROP TABLE webpages, t, t_heap, t_heap2; + CREATE TEMP TABLE t(a int); ALTER TABLE t ADD COLUMN b int;