Skip to content

Commit

Permalink
Add support for CREATE TABLE AS
Browse files Browse the repository at this point in the history
  • Loading branch information
JelteF committed Oct 4, 2024
1 parent 24ebd67 commit 002b4c8
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 6 deletions.
4 changes: 4 additions & 0 deletions include/pgduckdb/pgduckdb_duckdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ class DuckDBManager {
duckdb::unique_ptr<duckdb::Connection> DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info,
List *needed_columns, const char *query);

void DuckdbSetPostgresState(duckdb::ClientContext &context, List *rtables, PlannerInfo *planner_info,
List *needed_columns, const char *query);
void DuckdbResetPostgresState(duckdb::ClientContext &context);

} // namespace pgduckdb
2 changes: 2 additions & 0 deletions include/pgduckdb/pgduckdb_planner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ extern bool duckdb_explain_analyze;
PlannedStmt *DuckdbPlanNode(Query *parse, int cursor_options, ParamListInfo bound_params);
std::tuple<duckdb::unique_ptr<duckdb::PreparedStatement>, duckdb::unique_ptr<duckdb::Connection>>
DuckdbPrepare(const Query *query, ParamListInfo bound_params);
duckdb::unique_ptr<duckdb::QueryResult> DuckdbInsertIntoSelect(duckdb::ClientContext &context,
const Query *select_query, Oid target_table);
54 changes: 49 additions & 5 deletions src/pgduckdb_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ extern "C" {
#include "executor/spi.h"
#include "miscadmin.h"

#include "pgduckdb/vendor/pg_ruleutils.h"
#include "pgduckdb/pgduckdb_ruleutils.h"
}

#include "pgduckdb/pgduckdb_duckdb.hpp"
#include "pgduckdb/pgduckdb_planner.hpp"
#include "pgduckdb/vendor/pg_list.hpp"
#include <inttypes.h>

/*
* ctas_skip_data stores the original value of the skipData field of the
* CreateTableAsStmt of the query that's currently being executed. For duckdb
* tables we force this value to false.
*/
static bool ctas_skip_data = false;

/*
* Truncates the given table in DuckDB.
*/
Expand All @@ -50,7 +59,14 @@ DuckdbHandleDDL(Node *parsetree, const char *queryString) {
return;
}

elog(ERROR, "DuckDB does not support CREATE TABLE AS yet");
/*
* Force skipData to false for duckdb tables, so that Postgres does
* not execute the query, and save the original value in ctas_skip_data
* so we can use it later in duckdb_create_table_trigger to choose
* whether to execute the query in DuckDB or not.
*/
ctas_skip_data = stmt->into->skipData;
stmt->into->skipData = true;
}
}

Expand Down Expand Up @@ -84,8 +100,8 @@ duckdb_create_table_trigger(PG_FUNCTION_ARGS) {
if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) /* internal error */
elog(ERROR, "not fired by event trigger manager");

EventTriggerData *trigdata = (EventTriggerData *)fcinfo->context;
Node *parsetree = trigdata->parsetree;
EventTriggerData *trigger_data = (EventTriggerData *)fcinfo->context;
Node *parsetree = trigger_data->parsetree;

SPI_connect();

Expand Down Expand Up @@ -161,15 +177,43 @@ duckdb_create_table_trigger(PG_FUNCTION_ARGS) {
elog(ERROR, "Unexpected parsetree type: %d", nodeTag(parsetree));
}

std::string query_string(pgduckdb_get_tabledef(relid));
std::string create_table_string(pgduckdb_get_tabledef(relid));

auto db = pgduckdb::DuckDBManager::Get().GetDatabase();
auto connection = duckdb::make_uniq<duckdb::Connection>(db);
auto &context = *connection->context;
auto result = context.Query(query_string, false);
Query *ctas_query = nullptr;

if (IsA(parsetree, CreateTableAsStmt) && !ctas_skip_data) {
auto stmt = castNode(CreateTableAsStmt, parsetree);
ctas_query = (Query *)stmt->query;
}

if (ctas_query) {
auto result = context.Query("BEGIN TRANSACTION", false);
if (result->HasError()) {
elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not start transaction: %s",
result->GetError().c_str());
}
}

auto result = context.Query(create_table_string, false);
if (result->HasError()) {
elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not create table: %s", result->GetError().c_str());
}
if (ctas_query) {
auto result = DuckdbInsertIntoSelect(context, ctas_query, relid);

if (result->HasError()) {
elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not insert data: %s", result->GetError().c_str());
}

result = context.Query("COMMIT", false);
if (result->HasError()) {
elog(ERROR, "(PGDuckDB/duckdb_create_table_trigger) Could not commit transaction: %s",
result->GetError().c_str());
}
}

PG_RETURN_NULL();
}
Expand Down
12 changes: 12 additions & 0 deletions src/pgduckdb_duckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ DuckDBManager::LoadExtensions(duckdb::ClientContext &context) {
}
}

void
DuckdbSetPostgresState(duckdb::ClientContext &context, List *rtables, PlannerInfo *planner_info, List *needed_columns,
const char *query) {
context.registered_state->Insert(
"postgres_state", duckdb::make_shared_ptr<PostgresContextState>(rtables, planner_info, needed_columns, query));
}

void
DuckdbResetPostgresState(duckdb::ClientContext &context) {
context.registered_state->Remove("postgres_state");
}

duckdb::unique_ptr<duckdb::Connection>
DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_columns, const char *query) {
auto &db = DuckDBManager::Get().GetDatabase();
Expand Down
28 changes: 28 additions & 0 deletions src/pgduckdb_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {
#include "utils/guc.h"

#include "pgduckdb/vendor/pg_ruleutils.h"
#include "pgduckdb/pgduckdb_ruleutils.h"
}

#include "pgduckdb/pgduckdb_duckdb.hpp"
Expand Down Expand Up @@ -95,6 +96,33 @@ DuckdbPrepare(const Query *query, ParamListInfo bound_params) {
return {std::move(prepared_query), std::move(duckdb_connection)};
}

duckdb::unique_ptr<duckdb::QueryResult>
DuckdbInsertIntoSelect(duckdb::ClientContext &context, const Query *select_query, Oid target_table) {

/*
* Copy the query, so the original one is not modified by the
* subquery_planner call that PlanQuery does.
*/
Query *copied_query = (Query *)copyObjectImpl(select_query);
const char *query_string = pgduckdb_pg_get_querydef(copied_query, false);

// XXX: Should we do something for EXPLAIN here?

List *rtables = copied_query->rtable;
/* Extract required vars for table */
int flags = PVC_RECURSE_AGGREGATES | PVC_RECURSE_WINDOWFUNCS | PVC_RECURSE_PLACEHOLDERS;
List *vars = list_concat(pull_var_clause((Node *)copied_query->targetList, flags),
pull_var_clause((Node *)copied_query->jointree->quals, flags));
PlannerInfo *query_planner_info = PlanQuery(copied_query, NULL);

pgduckdb::DuckdbSetPostgresState(context, rtables, query_planner_info, vars, query_string);

std::string insert_string = std::string("INSERT INTO ") + pgduckdb_relation_name(target_table) + " " + query_string;
auto result = context.Query(insert_string, false);
pgduckdb::DuckdbResetPostgresState(context);
return result;
}

static Plan *
CreatePlan(Query *query, ParamListInfo bound_params) {
/*
Expand Down
31 changes: 31 additions & 0 deletions test/pycheck/explain_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from .utils import Cursor

import pytest
import psycopg.errors


def test_explain(cur: Cursor):
cur.sql("CREATE TABLE test_table (id int, name text)")
Expand Down Expand Up @@ -27,3 +30,31 @@ def test_explain(cur: Cursor):
assert "UNGROUPED_AGGREGATE" in plan
assert "id=1 AND id IS NOT NULL" in plan
assert "Total Time:" in plan


def test_explain_ctas(cur: Cursor):
cur.sql("CREATE TEMP TABLE heap1(id) AS SELECT 1")
result = cur.sql("EXPLAIN CREATE TEMP TABLE heap2(id) AS SELECT * from heap1")
plan = "\n".join(result)
assert "POSTGRES_SEQ_SCAN" in plan
assert "Total Time:" not in plan

result = cur.sql(
"EXPLAIN ANALYZE CREATE TEMP TABLE heap2(id) AS SELECT * from heap1"
)
plan = "\n".join(result)
assert "POSTGRES_SEQ_SCAN" in plan
assert "Total Time:" in plan

result = cur.sql(
"EXPLAIN CREATE TEMP TABLE duckdb1(id) USING duckdb AS SELECT * from heap1"
)
plan = "\n".join(result)
assert "POSTGRES_SEQ_SCAN" in plan
assert "Total Time:" not in plan

# EXPLAIN ANALYZE is not supported for DuckDB CTAS (yet)
with pytest.raises(psycopg.errors.FeatureNotSupported):
cur.sql(
"EXPLAIN ANALYZE CREATE TEMP TABLE duckdb2(id) USING duckdb AS SELECT * from heap1"
)
26 changes: 26 additions & 0 deletions test/pycheck/prepared_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,29 @@ def test_prepared_pipeline(conn: Connection):
p.sync()
assert cur.sql("SELECT * FROM duckt ORDER BY id") == [1, 3, 4]
assert cur.sql("SELECT * FROM heapt ORDER BY id") == []


def test_prepared_ctas(cur: Cursor):
cur.sql("CREATE TABLE heapt (id int, number int)")
cur.sql("INSERT INTO heapt VALUES (1, 2), (2, 4), (3, 6)")
cur.sql("CREATE TEMP TABLE t USING duckdb AS SELECT * FROM heapt")
assert cur.sql("SELECT * FROM t ORDER BY id") == [(1, 2), (2, 4), (3, 6)]

# We don't support CTAS with parameters yet. The error message and code
# could be better, but this is what we have right now. At least we don't
# crash.
with pytest.raises(
psycopg.errors.InternalError,
match="Expected 1 parameters, but none were supplied",
):
cur.sql(
"CREATE TEMP TABLE t2 USING duckdb AS SELECT * FROM heapt where id = %s",
(2,),
)

prepared_query = "CREATE TEMP TABLE t3 USING duckdb AS SELECT * FROM heapt"
cur.sql(prepared_query, prepare=True)
assert cur.sql("SELECT count(*) FROM t3") == 3
cur.sql("DROP TABLE t3")
cur.sql(prepared_query)
assert cur.sql("SELECT count(*) FROM t3") == 3
41 changes: 40 additions & 1 deletion test/regression/expected/temporary_tables.out
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,47 @@ DROP TABLE t;
-- unsupported
CREATE TEMP TABLE t(a int) ON COMMIT DROP;
ERROR: DuckDB does not support ON COMMIT DROP
-- CTAS fully in Duckdb
CREATE TEMP TABLE webpages USING duckdb AS SELECT * FROM read_csv('../../data/web_page.csv') as (column00 int, column01 text, column02 date);
ERROR: DuckDB does not support CREATE TABLE AS yet
SELECT * FROM webpages ORDER BY column00 LIMIT 2;
column00 | column01 | column02
----------+------------------+------------
1 | AAAAAAAABAAAAAAA | 1997-09-03
2 | AAAAAAAACAAAAAAA | 1997-09-03
(2 rows)

CREATE TEMP TABLE t_heap(a int) USING heap;
INSERT INTO t_heap VALUES (1);
-- CTAS from postgres table to duckdb table
CREATE TEMP TABLE t(b) USING duckdb AS SELECT * FROM t_heap;
SELECT * FROM t;
b
---
1
(1 row)

-- CTAS from DuckDB table to postgres table
CREATE TEMP TABLE t_heap2(c) USING heap AS SELECT * FROM t_heap;
SELECT * FROM t_heap2;
c
---
1
(1 row)

SELECT duckdb.raw_query($$ SELECT database_name, schema_name, sql FROM duckdb_tables() $$);
NOTICE: result: database_name schema_name sql
VARCHAR VARCHAR VARCHAR
[ Rows: 2]
memory pg_temp CREATE TABLE pg_temp.t(b INTEGER);
memory pg_temp CREATE TABLE pg_temp.webpages(column00 INTEGER, column01 VARCHAR, column02 DATE);


raw_query
-----------

(1 row)

DROP TABLE webpages, t, t_heap, t_heap2;
CREATE TEMP TABLE t(a int);
ALTER TABLE t ADD COLUMN b int;
ERROR: DuckDB does not support ALTER TABLE yet
Expand Down
18 changes: 18 additions & 0 deletions test/regression/sql/temporary_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,25 @@ DROP TABLE t;
-- unsupported
CREATE TEMP TABLE t(a int) ON COMMIT DROP;

-- CTAS fully in Duckdb
CREATE TEMP TABLE webpages USING duckdb AS SELECT * FROM read_csv('../../data/web_page.csv') as (column00 int, column01 text, column02 date);
SELECT * FROM webpages ORDER BY column00 LIMIT 2;

CREATE TEMP TABLE t_heap(a int) USING heap;
INSERT INTO t_heap VALUES (1);

-- CTAS from postgres table to duckdb table
CREATE TEMP TABLE t(b) USING duckdb AS SELECT * FROM t_heap;
SELECT * FROM t;

-- CTAS from DuckDB table to postgres table
CREATE TEMP TABLE t_heap2(c) USING heap AS SELECT * FROM t_heap;
SELECT * FROM t_heap2;

SELECT duckdb.raw_query($$ SELECT database_name, schema_name, sql FROM duckdb_tables() $$);

DROP TABLE webpages, t, t_heap, t_heap2;


CREATE TEMP TABLE t(a int);
ALTER TABLE t ADD COLUMN b int;
Expand Down

0 comments on commit 002b4c8

Please sign in to comment.