Skip to content

Commit

Permalink
Merge branch 'main' into pg_duckdb_15
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo-XM-Zeng authored Sep 19, 2024
2 parents 00eb688 + 51d3a30 commit 0130159
Show file tree
Hide file tree
Showing 25 changed files with 189 additions and 67 deletions.
1 change: 0 additions & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
[submodule "third_party/duckdb"]
path = third_party/duckdb
url = https://github.com/duckdb/duckdb.git
hash = 17d598fc4472c64969f47f86c30fce75c4d64ed4
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ coding styles.
## Error Handling

* Use exceptions **only** when an error is encountered that terminates a query (e.g. parser error, table not found). Exceptions should only be used for **exceptional** situations. For regular errors that do not break the execution flow (e.g. errors you **expect** might occur) use a return value instead.
* There are two distinct parts of the code where error handling is done very differently: The code that executes before we enter DuckDB execution engine (e.g. initial part of the planner hook) and the part that gets executed inside the duckdb execution engine. Below are rules for how to handle errors in both parts of the code. Not following these guidelines can cause crashes, memory leaks and other unexpected problems.
* Before we enter the DuckDB exection engine no exceptions should ever be thrown here. In cases where you would want to throw an exception here, use `elog(ERROR, ...)`. Any C++ code that might throw an exception is also problematic. Since C++ throws exceptions on allocation failures, this covers lots of C++ APIs. So try to use Postgres datastructures instead of C++ ones whenever possible (e.g. use `List` instead of `Vec`)
* Inside the duckdb execution engine the opposite is true. `elog(ERROR, ...)` should never be used there, use exceptions instead.
* Use PostgreSQL *elog* API can be used to report non-fatal messages back to user. Using *ERROR* is strictly forbiden to use in code that is executed inside the duckdb engine.
* Calling PostgreSQL native functions from within DuckDB execution needs **extreme care**. Pretty much non of these functions are thread-safe, and they might throw errors using `elog(ERROR, ...)`. If you've solved the thread-safety issue by taking a lock (or by carefully asserting that the actual code is thread safe), then you can use *PostgresFunctionGuard* to solve the `elog(ERROR, ...) problem. *PostgresFunctionGuard* will correctly handle *ERROR* log messages that could be emmited from these functions.
* Try to add test cases that trigger exceptions. If an exception cannot be easily triggered using a test case then it should probably be an assertion. This is not always true (e.g. out of memory errors are exceptions, but are very hard to trigger).
* Use `D_ASSERT` to assert. Use **assert** only when failing the assert means a programmer error. Assert should never be triggered by user input. Avoid code like `D_ASSERT(a > b + 3);` without comments or context.
* Assert liberally, but make it clear with comments next to the assert what went wrong when the assert is triggered.
Expand Down
2 changes: 1 addition & 1 deletion include/pgduckdb/pgduckdb_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ constexpr int64_t PGDUCKDB_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER
duckdb::LogicalType ConvertPostgresToDuckColumnType(Form_pg_attribute &attribute);
Oid GetPostgresDuckDBType(duckdb::LogicalType type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, HeapTupleData *tuple);

Expand Down
63 changes: 61 additions & 2 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#pragma once

extern "C" {
#include "postgres.h"
}

#include "duckdb/common/exception.hpp"

#include <vector>
#include <string>
#include <sstream>

#include <cstdio>

namespace pgduckdb {

inline std::vector<std::string>
Expand All @@ -19,4 +23,59 @@ TokenizeString(char *str, const char delimiter) {
return v;
};

/*
* DuckdbGlobalLock should be held before calling.
*/
template <typename T, typename FuncType, typename... FuncArgs>
T
PostgresFunctionGuard(FuncType postgres_function, FuncArgs... args) {
T return_value;
bool error = false;
MemoryContext ctx = CurrentMemoryContext;
ErrorData *edata = nullptr;
// clang-format off
PG_TRY();
{
return_value = postgres_function(args...);
}
PG_CATCH();
{
MemoryContextSwitchTo(ctx);
edata = CopyErrorData();
FlushErrorState();
error = true;
}
PG_END_TRY();
// clang-format on
if (error) {
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, edata->message);
}
return return_value;
}

template <typename FuncType, typename... FuncArgs>
void
PostgresFunctionGuard(FuncType postgres_function, FuncArgs... args) {
bool error = false;
MemoryContext ctx = CurrentMemoryContext;
ErrorData *edata = nullptr;
// clang-format off
PG_TRY();
{
postgres_function(args...);
}
PG_CATCH();
{
MemoryContextSwitchTo(ctx);
edata = CopyErrorData();
FlushErrorState();
error = true;
}
PG_END_TRY();
// clang-format on
if (error) {
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, edata->message);
}
}

} // namespace pgduckdb
7 changes: 5 additions & 2 deletions include/pgduckdb/types/decimal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ CreateNumeric(const NumericVar &var, bool *have_error) {
* but it seems worthwhile to expend a few cycles to ensure that we
* never write any nonzero reserved bits to disk.
*/
if (!(sign == NUMERIC_NAN || sign == NUMERIC_PINF || sign == NUMERIC_NINF))
elog(ERROR, "invalid numeric sign value 0x%x", sign);
if (!(sign == NUMERIC_NAN || sign == NUMERIC_PINF || sign == NUMERIC_NINF)) {
elog(WARNING, "(PGDuckdDB/CreateNumeric) Invalid numeric sign value 0x%x", sign);
*have_error = true;
return NULL;
}

result = (Numeric)palloc(NUMERIC_HDRSZ_SHORT);

Expand Down
1 change: 0 additions & 1 deletion src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ extern "C" {

#include "pgduckdb/pgduckdb.h"
#include "pgduckdb/pgduckdb_node.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

static void DuckdbInitGUC(void);

Expand Down
41 changes: 27 additions & 14 deletions src/pgduckdb_detoast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern "C" {
#include "pgduckdb/pgduckdb_process_lock.hpp"
#include "pgduckdb/pgduckdb_types.hpp"
#include "pgduckdb/pgduckdb_detoast.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

/*
* Following functions are direct logic found in postgres code but for duckdb execution they are needed to be thread
Expand All @@ -41,8 +42,9 @@ PglzDecompressDatum(const struct varlena *value) {

raw_size = pglz_decompress((char *)value + VARHDRSZ_COMPRESSED, VARSIZE(value) - VARHDRSZ_COMPRESSED,
VARDATA(result), VARDATA_COMPRESSED_GET_EXTSIZE(value), true);
if (raw_size < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed pglz data is corrupt")));
if (raw_size < 0) {
throw duckdb::InvalidInputException("(PGDuckDB/PglzDecompressDatum) Compressed pglz data is corrupt");
}

SET_VARSIZE(result, raw_size + VARHDRSZ);

Expand All @@ -61,8 +63,9 @@ Lz4DecompresDatum(const struct varlena *value) {

raw_size = LZ4_decompress_safe((char *)value + VARHDRSZ_COMPRESSED, VARDATA(result),
VARSIZE(value) - VARHDRSZ_COMPRESSED, VARDATA_COMPRESSED_GET_EXTSIZE(value));
if (raw_size < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed lz4 data is corrupt")));
if (raw_size < 0) {
throw duckdb::InvalidInputException("(PGDuckDB/Lz4DecompresDatum) Compressed lz4 data is corrupt");
}

SET_VARSIZE(result, raw_size + VARHDRSZ);

Expand All @@ -80,20 +83,22 @@ ToastDecompressDatum(struct varlena *attr) {
case TOAST_LZ4_COMPRESSION_ID:
return Lz4DecompresDatum(attr);
default:
elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr));
throw duckdb::InvalidInputException("(PGDuckDB/ToastDecompressDatum) Invalid compression method id %d",
TOAST_COMPRESS_METHOD(attr));
return NULL; /* keep compiler quiet */
}
}

static struct varlena *
ToastFetchDatum(struct varlena *attr) {
Relation toastrel;
Relation toast_rel;
struct varlena *result;
struct varatt_external toast_pointer;
int32 attrsize;

if (!VARATT_IS_EXTERNAL_ONDISK(attr))
elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
if (!VARATT_IS_EXTERNAL_ONDISK(attr)) {
throw duckdb::InvalidInputException("(PGDuckDB/ToastFetchDatum) Shouldn't be called for non-ondisk datums");
}

/* Must copy to access aligned fields */
VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
Expand All @@ -108,14 +113,22 @@ ToastFetchDatum(struct varlena *attr) {
SET_VARSIZE(result, attrsize + VARHDRSZ);
}

if (attrsize == 0)
if (attrsize == 0) {
return result;
}

std::lock_guard<std::mutex> lock(DuckdbProcessLock::GetLock());

toast_rel = PostgresFunctionGuard<Relation>(try_table_open, toast_pointer.va_toastrelid, AccessShareLock);

if (toast_rel == NULL) {
throw duckdb::InternalException("(PGDuckDB/ToastFetchDatum) Error toast relation is NULL");
}

PostgresFunctionGuard(table_relation_fetch_toast_slice, toast_rel, toast_pointer.va_valueid, attrsize, 0, attrsize,
result);

DuckdbProcessLock::GetLock().lock();
toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, attrsize, 0, attrsize, result);
table_close(toastrel, AccessShareLock);
DuckdbProcessLock::GetLock().unlock();
PostgresFunctionGuard(table_close, toast_rel, AccessShareLock);

return result;
}
Expand Down
21 changes: 12 additions & 9 deletions src/pgduckdb_duckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ CheckDataDirectory(const char *data_directory) {

if (lstat(data_directory, &info) != 0) {
if (errno == ENOENT) {
elog(DEBUG2, "Directory `%s` doesn't exists.", data_directory);
elog(DEBUG2, "(PGDuckDB/CheckDataDirectory) Directory `%s` doesn't exists", data_directory);
return false;
} else if (errno == EACCES) {
elog(ERROR, "Can't access `%s` directory.", data_directory);
elog(ERROR, "(PGDuckDB/CheckDataDirectory) Can't access `%s` directory", data_directory);
} else {
elog(ERROR, "Other error when reading `%s`.", data_directory);
elog(ERROR, "(PGDuckDB/CheckDataDirectory) Other error when reading `%s`", data_directory);
}
}

if (!S_ISDIR(info.st_mode)) {
elog(WARNING, "`%s` is not directory.", data_directory);
elog(WARNING, "(PGDuckDB/CheckDataDirectory) `%s` is not directory", data_directory);
}

if (access(data_directory, R_OK | W_OK)) {
elog(ERROR, "Directory `%s` permission problem.", data_directory);
elog(ERROR, "(PGDuckDB/CheckDataDirectory) Directory `%s` permission problem", data_directory);
}

return true;
Expand All @@ -53,9 +53,12 @@ GetExtensionDirectory() {
if (mkdir(duckdb_extension_data_directory->data, S_IRWXU | S_IRWXG | S_IRWXO) == -1) {
int error = errno;
pfree(duckdb_extension_data_directory->data);
elog(ERROR, "Creating duckdb extensions directory failed with reason `%s`\n", strerror(error));
elog(ERROR,
"(PGDuckDB/GetExtensionDirectory) Creating duckdb extensions directory failed with reason `%s`\n",
strerror(error));
}
elog(DEBUG2, "Created %s as `duckdb.data_dir`", duckdb_extension_data_directory->data);
elog(DEBUG2, "(PGDuckDB/GetExtensionDirectory) Created %s as `duckdb.data_dir`",
duckdb_extension_data_directory->data);
};

std::string duckdb_extension_directory(duckdb_extension_data_directory->data);
Expand All @@ -64,7 +67,7 @@ GetExtensionDirectory() {
}

DuckDBManager::DuckDBManager() {
elog(DEBUG2, "Creating DuckDB instance");
elog(DEBUG2, "(PGDuckDB/DuckDBManager) Creating DuckDB instance");

duckdb::DBConfig config;
config.SetOptionByName("extension_directory", GetExtensionDirectory());
Expand Down Expand Up @@ -139,7 +142,7 @@ DuckDBManager::LoadExtensions(duckdb::ClientContext &context) {
appendStringInfo(duckdb_extension, "LOAD %s;", extension.name.c_str());
auto res = context.Query(duckdb_extension->data, false);
if (res->HasError()) {
elog(ERROR, "Extension `%s` could not be loaded with DuckDB", extension.name.c_str());
elog(ERROR, "(PGDuckDB/LoadExtensions) `%s` could not be loaded with DuckDB", extension.name.c_str());
}
}
pfree(duckdb_extension->data);
Expand Down
3 changes: 2 additions & 1 deletion src/pgduckdb_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid type_oid) {
case VARCHAROID:
return StringFilterOperation<OP>(value, constant);
default:
elog(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported duckdb type: %d", type_oid);
throw duckdb::InvalidTypeException(
duckdb::string("(DuckDB/FilterOperationSwitch) Unsupported duckdb type: %d", type_oid));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/pgduckdb_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ DuckdbPlannerHook(Query *parse, const char *query_string, int cursor_options, Pa

if (NeedsDuckdbExecution(parse)) {
if (!IsAllowedStatement(parse)) {
elog(ERROR, "only SELECT statements involving DuckDB are supported");
elog(ERROR, "(PGDuckDB/DuckdbPlannerHook) Only SELECT statements involving DuckDB are supported.");
}
PlannedStmt *duckdbPlan = DuckdbPlanNode(parse, cursor_options, bound_params);
if (duckdbPlan) {
Expand Down
5 changes: 4 additions & 1 deletion src/pgduckdb_metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ InvalidateCaches(Datum arg, int cache_id, uint32 hash_value) {
return;
}
cache.valid = false;
list_free(cache.duckdb_only_functions);
if (cache.installed) {
list_free(cache.duckdb_only_functions);
cache.duckdb_only_functions = NIL;
}
}

/*
Expand Down
9 changes: 6 additions & 3 deletions src/pgduckdb_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ ExecuteQuery(DuckdbScanState *state) {
auto &executor = duckdb::Executor::Get(*connection->context);
// Wait for all tasks to terminate
executor.CancelTasks();

// Delete the scan state
CleanupDuckdbScanState(state);
// Process the interrupt on the Postgres side
Expand All @@ -86,7 +85,8 @@ ExecuteQuery(DuckdbScanState *state) {
}
} while (!duckdb::PendingQueryResult::IsResultReady(execution_result));
if (execution_result == duckdb::PendingExecutionResult::EXECUTION_ERROR) {
elog(ERROR, "Duckdb execute returned an error: %s", pending->GetError().c_str());
CleanupDuckdbScanState(state);
elog(ERROR, "(PGDuckDB/ExecuteQuery) %s", pending->GetError().c_str());
}
query_results = pending->Execute();
state->column_count = query_results->ColumnCount();
Expand Down Expand Up @@ -127,7 +127,10 @@ Duckdb_ExecCustomScan(CustomScanState *node) {
slot->tts_isnull[col] = true;
} else {
slot->tts_isnull[col] = false;
pgduckdb::ConvertDuckToPostgresValue(slot, value, col);
if (!pgduckdb::ConvertDuckToPostgresValue(slot, value, col)) {
CleanupDuckdbScanState(duckdb_scan_state);
elog(ERROR, "(PGDuckDB/Duckdb_ExecCustomScan) Value conversion failed");
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/pgduckdb_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ DuckdbInstallExtension(Datum name) {
pfree(install_extension_command->data);

if (res->HasError()) {
elog(WARNING, "(duckdb_install_extension) %s", res->GetError().c_str());
elog(WARNING, "(PGDuckDB/DuckdbInstallExtension) %s", res->GetError().c_str());
return false;
}

Expand Down
14 changes: 11 additions & 3 deletions src/pgduckdb_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ CreatePlan(Query *query, const char *query_string, ParamListInfo bound_params) {
auto prepared_query = context->Prepare(query_string);

if (prepared_query->HasError()) {
elog(WARNING, "(DuckDB) %s", prepared_query->GetError().c_str());
elog(WARNING, "(PGDuckDB/CreatePlan) Prepared query returned an error: '%s",
prepared_query->GetError().c_str());
return nullptr;
}

Expand All @@ -82,12 +83,19 @@ CreatePlan(Query *query, const char *query_string, ParamListInfo bound_params) {
auto &column = prepared_result_types[i];
Oid postgresColumnOid = pgduckdb::GetPostgresDuckDBType(column);

if (!OidIsValid(postgresColumnOid)) {
elog(WARNING, "(PGDuckDB/CreatePlan) Cache lookup failed for type %u", postgresColumnOid);
return nullptr;
}

HeapTuple tp;
Form_pg_type typtup;

tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(postgresColumnOid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for type %u", postgresColumnOid);
if (!HeapTupleIsValid(tp)) {
elog(WARNING, "(PGDuckDB/CreatePlan) Cache lookup failed for type %u", postgresColumnOid);
return nullptr;
}

typtup = (Form_pg_type)GETSTRUCT(tp);

Expand Down
Loading

0 comments on commit 0130159

Please sign in to comment.