Skip to content

Commit

Permalink
Fix memory leaks in custom executor (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-- authored Oct 10, 2024
1 parent 37e8c40 commit 1adc730
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
27 changes: 27 additions & 0 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern "C" {
}

#include "duckdb/common/exception.hpp"
#include "duckdb/common/error_data.hpp"

#include <vector>
#include <string>
Expand Down Expand Up @@ -74,4 +75,30 @@ PostgresFunctionGuard(FuncType postgres_function, FuncArgs... args) {
}
}

template <typename FuncRetT, typename FuncType, typename... FuncArgs>
FuncRetT
DuckDBFunctionGuard(FuncType duckdb_function, const char* function_name, FuncArgs... args) {
const char *error_message = nullptr;
try {
return duckdb_function(args...);
} catch (duckdb::Exception &ex) {
duckdb::ErrorData edata(ex.what());
error_message = pstrdup(edata.Message().c_str());
} catch (std::exception &ex) {
const auto msg = ex.what();
if (msg[0] == '{') {
duckdb::ErrorData edata(ex.what());
error_message = pstrdup(edata.Message().c_str());
} else {
error_message = pstrdup(ex.what());
}
}

if (error_message) {
elog(ERROR, "(PGDuckDB/%s) %s", function_name, error_message);
}

std::abort(); // Cannot reach.
}

} // namespace pgduckdb
32 changes: 22 additions & 10 deletions src/pgduckdb_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {
#include "pgduckdb/pgduckdb_types.hpp"
#include "pgduckdb/pgduckdb_duckdb.hpp"
#include "pgduckdb/pgduckdb_planner.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

/* global variables */
CustomScanMethods duckdb_scan_scan_methods;
Expand All @@ -36,9 +37,21 @@ typedef struct DuckdbScanState {

static void
CleanupDuckdbScanState(DuckdbScanState *state) {
MemoryContextReset(state->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
ExecClearTuple(state->css.ss.ss_ScanTupleSlot);

state->query_results.reset();
delete state->prepared_statement;
delete state->duckdb_connection;
state->current_data_chunk.reset();

if (state->prepared_statement) {
delete state->prepared_statement;
state->prepared_statement = nullptr;
}

if (state->duckdb_connection) {
delete state->duckdb_connection;
state->duckdb_connection = nullptr;
}
}

/* static callbacks */
Expand Down Expand Up @@ -105,13 +118,13 @@ ExecuteQuery(DuckdbScanState *state) {
} else {
std::ostringstream oss;
oss << "parameter '" << i << "' has an invalid type (" << pg_param->ptype << ") during query execution";
elog(ERROR, oss.str().c_str());
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, oss.str().c_str());
}
}

auto pending = prepared.PendingQuery(duckdb_params, true);
if (pending->HasError()) {
elog(ERROR, "DuckDB execute returned an error: %s", pending->GetError().c_str());
return pending->ThrowError();
}

duckdb::PendingExecutionResult execution_result;
Expand All @@ -124,16 +137,14 @@ ExecuteQuery(DuckdbScanState *state) {
// Wait for all tasks to terminate
executor.CancelTasks();
// Delete the scan state
CleanupDuckdbScanState(state);
// Process the interrupt on the Postgres side
ProcessInterrupts();
elog(ERROR, "Query cancelled");
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, "Query cancelled");
}
} while (!duckdb::PendingQueryResult::IsResultReady(execution_result));

if (execution_result == duckdb::PendingExecutionResult::EXECUTION_ERROR) {
CleanupDuckdbScanState(state);
elog(ERROR, "(PGDuckDB/ExecuteQuery) %s", pending->GetError().c_str());
return pending->ThrowError();
}

query_results = pending->Execute();
Expand All @@ -149,7 +160,7 @@ Duckdb_ExecCustomScan(CustomScanState *node) {

bool already_executed = duckdb_scan_state->is_executed;
if (!already_executed) {
ExecuteQuery(duckdb_scan_state);
pgduckdb::DuckDBFunctionGuard<void>(ExecuteQuery, "ExecuteQuery", duckdb_scan_state);
}

if (duckdb_scan_state->fetch_next) {
Expand Down Expand Up @@ -209,7 +220,8 @@ Duckdb_ReScanCustomScan(CustomScanState *node) {
void
Duckdb_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es) {
DuckdbScanState *duckdb_scan_state = (DuckdbScanState *)node;
ExecuteQuery(duckdb_scan_state);
pgduckdb::DuckDBFunctionGuard<void>(ExecuteQuery, "ExecuteQuery", duckdb_scan_state);

auto chunk = duckdb_scan_state->query_results->Fetch();
if (!chunk || chunk->size() == 0) {
return;
Expand Down
3 changes: 2 additions & 1 deletion src/pgduckdb_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ CreatePlan(Query *query) {
PlannedStmt *
DuckdbPlanNode(Query *parse, int cursor_options) {
/* We need to check can we DuckDB create plan */
Plan *duckdb_plan = (Plan *)castNode(CustomScan, CreatePlan(parse));
Plan *plan = pgduckdb::DuckDBFunctionGuard<Plan*>(CreatePlan, "CreatePlan", parse);
Plan *duckdb_plan = (Plan *)castNode(CustomScan, plan);

if (!duckdb_plan) {
return nullptr;
Expand Down

0 comments on commit 1adc730

Please sign in to comment.