Skip to content

Commit

Permalink
Local object cache for cached_httpfs extension
Browse files Browse the repository at this point in the history
* To cache remote object, user need to explicilty use
  `duckdb.cache(path, type)` function. Path is remote HTTPFS/S3/GCS/R2
  object path and type is either `parquet` or `csv` indicating remote
  object type. Caching will not be triggered on normal `SELECT` queries.

* When file is currently being cached and if we run in parallel another
  query targeting same remote object - second query will fail because it
  will not be able to take read lock.

* PG_DATADIR/duckdb_cache will be used as cache directory
  • Loading branch information
mkaruza committed Aug 9, 2024
1 parent 73d1d64 commit 8a4c63e
Show file tree
Hide file tree
Showing 19 changed files with 486 additions and 284 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ SRCS = src/scan/heap_reader.cpp \
src/pgduckdb_options.cpp \
src/pgduckdb_planner.cpp \
src/pgduckdb_types.cpp \
src/pgduckdb_utils.cpp \
src/pgduckdb.cpp

OBJS = $(subst .cpp,.o, $(SRCS))
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ CREATE EXTENSION pg_duckdb;
- Install DuckDB extensions using `SELECT duckdb.install_extension('extension_name');`
- Toggle DuckDB execution on/off with a setting:
- `SET duckdb.execution = true|false`
- Cache remote object localy for faster execution using `SELECT duckdb.cache('path', 'type');` where
- 'path' is HTTPFS/S3/GCS/R2 remote object
- 'type' specify remote object type: 'parquet' or 'csv'

## Getting Started

Expand Down
33 changes: 6 additions & 27 deletions include/pgduckdb/pgduckdb_options.hpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,19 @@
#pragma once

#include "duckdb.hpp"

#include <string>
#include <vector>

namespace pgduckdb {

/* constants for duckdb.secrets */
#define Natts_duckdb_secret 6
#define Anum_duckdb_secret_type 1
#define Anum_duckdb_secret_id 2
#define Anum_duckdb_secret_secret 3
#define Anum_duckdb_secret_region 4
#define Anum_duckdb_secret_endpoint 5
#define Anum_duckdb_secret_r2_account_id 6

typedef struct DuckdbSecret {
std::string type;
std::string id;
std::string secret;
std::string region;
std::string endpoint;
std::string r2_account_id;
} DuckdbSecret;

extern std::vector<DuckdbSecret> ReadDuckdbSecrets();

/* constants for duckdb.extensions */
#define Natts_duckdb_extension 2
#define Anum_duckdb_extension_name 1
#define Anum_duckdb_extension_enable 2
void AddSecretsToDuckdbContext(duckdb::ClientContext &context);

typedef struct DuckdbExension {
typedef struct DuckdbExtension {
std::string name;
bool enabled;
} DuckdbExension;
} DuckdbExtension;

extern std::vector<DuckdbExension> ReadDuckdbExtensions();
extern std::vector<DuckdbExtension> ReadDuckdbExtensions();

} // namespace pgduckdb
4 changes: 4 additions & 0 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ TokenizeString(char *str, const char delimiter) {
return v;
};

std::string CreateOrGetDirectoryPath(std::string directoryName);

} // namespace pgduckdb

void DuckdbCreateCacheDirectory(void);
3 changes: 3 additions & 0 deletions sql/pg_duckdb--0.0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ CREATE TABLE extensions (
CREATE OR REPLACE FUNCTION install_extension(extension_name TEXT) RETURNS bool
LANGUAGE C AS 'MODULE_PATHNAME', 'install_extension';

CREATE OR REPLACE FUNCTION cache(object_path TEXT, type TEXT) RETURNS bool
LANGUAGE C AS 'MODULE_PATHNAME', 'cache';

DO $$
BEGIN
RAISE WARNING 'To actually execute queries using DuckDB you need to run "SET duckdb.execution TO true;"';
Expand Down
1 change: 1 addition & 0 deletions src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PG_MODULE_MAGIC;
void
_PG_init(void) {
DuckdbInitGUC();
DuckdbCreateCacheDirectory();
DuckdbInitHooks();
DuckdbInitNode();
}
Expand Down
83 changes: 6 additions & 77 deletions src/pgduckdb_duckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,64 +9,11 @@
#include "pgduckdb/scan/postgres_seq_scan.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

#include <string>

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>

namespace pgduckdb {

static bool
CheckDataDirectory(const char *data_directory) {
struct stat info;

if (lstat(data_directory, &info) != 0) {
if (errno == ENOENT) {
elog(DEBUG2, "Directory `%s` doesn't exists.", data_directory);
return false;
} else if (errno == EACCES) {
elog(ERROR, "Can't access `%s` directory.", data_directory);
} else {
elog(ERROR, "Other error when reading `%s`.", data_directory);
}
}

if (!S_ISDIR(info.st_mode)) {
elog(WARNING, "`%s` is not directory.", data_directory);
}

if (access(data_directory, R_OK | W_OK)) {
elog(ERROR, "Directory `%s` permission problem.", data_directory);
}

return true;
}

static std::string
GetExtensionDirectory() {
StringInfo duckdb_extension_data_directory = makeStringInfo();
appendStringInfo(duckdb_extension_data_directory, "%s/duckdb_extensions", DataDir);

if (!CheckDataDirectory(duckdb_extension_data_directory->data)) {
if (mkdir(duckdb_extension_data_directory->data, S_IRWXU | S_IRWXG | S_IRWXO) == -1) {
int error = errno;
pfree(duckdb_extension_data_directory->data);
elog(ERROR, "Creating duckdb extensions directory failed with reason `%s`\n", strerror(error));
}
elog(DEBUG2, "Created %s as `duckdb.data_dir`", duckdb_extension_data_directory->data);
};

std::string duckdb_extension_directory(duckdb_extension_data_directory->data);
pfree(duckdb_extension_data_directory->data);
return duckdb_extension_directory;
}

duckdb::unique_ptr<duckdb::DuckDB>
DuckdbOpenDatabase() {
duckdb::DBConfig config;
config.SetOptionByName("extension_directory", GetExtensionDirectory());
config.SetOptionByName("extension_directory", CreateOrGetDirectoryPath("duckdb_extensions"));
return duckdb::make_uniq<duckdb::DuckDB>(nullptr, &config);
}

Expand Down Expand Up @@ -99,29 +46,7 @@ DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_co
catalog.CreateTableFunction(context, &index_scan_info);
context.transaction.Commit();

auto duckdb_secrets = ReadDuckdbSecrets();

int secret_id = 0;
for (auto &secret : duckdb_secrets) {
StringInfo secret_key = makeStringInfo();
bool is_r2_cloud_secret = (secret.type.rfind("R2", 0) == 0);
appendStringInfo(secret_key, "CREATE SECRET duckdbSecret_%d ", secret_id);
appendStringInfo(secret_key, "(TYPE %s, KEY_ID '%s', SECRET '%s'", secret.type.c_str(), secret.id.c_str(),
secret.secret.c_str());
if (secret.region.length() && !is_r2_cloud_secret) {
appendStringInfo(secret_key, ", REGION '%s'", secret.region.c_str());
}
if (secret.endpoint.length() && !is_r2_cloud_secret) {
appendStringInfo(secret_key, ", ENDPOINT '%s'", secret.endpoint.c_str());
}
if (is_r2_cloud_secret) {
appendStringInfo(secret_key, ", ACCOUNT_ID '%s'", secret.endpoint.c_str());
}
appendStringInfo(secret_key, ");");
context.Query(secret_key->data, false);
pfree(secret_key->data);
secret_id++;
}
AddSecretsToDuckdbContext(context);

auto duckdb_extensions = ReadDuckdbExtensions();

Expand All @@ -137,6 +62,10 @@ DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_co
pfree(duckdb_extension->data);
}

auto http_file_cache_set_dir_query =
duckdb::StringUtil::Format("SET http_file_cache_dir TO '%s';", CreateOrGetDirectoryPath("duckdb_cache"));
context.Query(http_file_cache_set_dir_query, false);

return connection;
}

Expand Down
129 changes: 116 additions & 13 deletions src/pgduckdb_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,33 @@ extern "C" {

#include "pgduckdb/pgduckdb_options.hpp"
#include "pgduckdb/pgduckdb_duckdb.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

namespace pgduckdb {

/* constants for duckdb.secrets */
#define Natts_duckdb_secret 6
#define Anum_duckdb_secret_type 1
#define Anum_duckdb_secret_id 2
#define Anum_duckdb_secret_secret 3
#define Anum_duckdb_secret_region 4
#define Anum_duckdb_secret_endpoint 5
#define Anum_duckdb_secret_r2_account_id 6

typedef struct DuckdbSecret {
std::string type;
std::string id;
std::string secret;
std::string region;
std::string endpoint;
std::string r2_account_id;
} DuckdbSecret;

/* constants for duckdb.extensions */
#define Natts_duckdb_extension 2
#define Anum_duckdb_extension_name 1
#define Anum_duckdb_extension_enable 2

static Oid
GetDuckdbNamespace(void) {
return get_namespace_oid("duckdb", false);
Expand All @@ -43,8 +67,8 @@ DatumToString(Datum datum) {
return column_value;
}

std::vector<DuckdbSecret>
ReadDuckdbSecrets() {
static std::vector<DuckdbSecret>
DuckdbReadSecrets() {
HeapTuple tuple = NULL;
Oid duckdb_secret_table_relation_id = SecretsTableRelationId();
Relation duckdb_secret_relation = table_open(duckdb_secret_table_relation_id, AccessShareLock);
Expand Down Expand Up @@ -79,21 +103,47 @@ ReadDuckdbSecrets() {
return duckdb_secrets;
}

std::vector<DuckdbExension>
void
AddSecretsToDuckdbContext(duckdb::ClientContext &context) {
auto duckdb_secrets = DuckdbReadSecrets();
int secret_id = 0;
for (auto &secret : duckdb_secrets) {
StringInfo secret_key = makeStringInfo();
bool is_r2_cloud_secret = (secret.type.rfind("R2", 0) == 0);
appendStringInfo(secret_key, "CREATE SECRET pg_duckdb_secret_%d ", secret_id);
appendStringInfo(secret_key, "(TYPE %s, KEY_ID '%s', SECRET '%s'", secret.type.c_str(), secret.id.c_str(),
secret.secret.c_str());
if (secret.region.length() && !is_r2_cloud_secret) {
appendStringInfo(secret_key, ", REGION '%s'", secret.region.c_str());
}
if (secret.endpoint.length() && !is_r2_cloud_secret) {
appendStringInfo(secret_key, ", ENDPOINT '%s'", secret.endpoint.c_str());
}
if (is_r2_cloud_secret) {
appendStringInfo(secret_key, ", ACCOUNT_ID '%s'", secret.endpoint.c_str());
}
appendStringInfo(secret_key, ");");
context.Query(secret_key->data, false);
pfree(secret_key->data);
secret_id++;
}
}

std::vector<DuckdbExtension>
ReadDuckdbExtensions() {
HeapTuple tuple = NULL;
Oid duckdb_extension_table_relation_id = ExtensionsTableRelationId();
Relation duckdb_extension_relation = table_open(duckdb_extension_table_relation_id, AccessShareLock);
SysScanDescData *scan =
systable_beginscan(duckdb_extension_relation, InvalidOid, false, GetActiveSnapshot(), 0, NULL);
std::vector<DuckdbExension> duckdb_extensions;
std::vector<DuckdbExtension> duckdb_extensions;

while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
Datum datum_array[Natts_duckdb_secret];
bool is_null_array[Natts_duckdb_secret];

heap_deform_tuple(tuple, RelationGetDescr(duckdb_extension_relation), datum_array, is_null_array);
DuckdbExension secret;
DuckdbExtension secret;

secret.name = DatumToString(datum_array[Anum_duckdb_extension_name - 1]);
secret.enabled = DatumGetBool(datum_array[Anum_duckdb_extension_enable - 1]);
Expand All @@ -112,16 +162,11 @@ DuckdbInstallExtension(Datum name) {
auto &context = *connection->context;

auto extension_name = DatumToString(name);

StringInfo install_extension_command = makeStringInfo();
appendStringInfo(install_extension_command, "INSTALL %s;", extension_name.c_str());

auto res = context.Query(install_extension_command->data, false);

pfree(install_extension_command->data);
auto install_extension_command = duckdb::StringUtil::Format("INSTALL %s;", extension_name.c_str());
auto res = context.Query(install_extension_command, false);

if (res->HasError()) {
elog(WARNING, "(duckdb_install_extension) %s", res->GetError().c_str());
elog(WARNING, "(DuckdbInstallExtension) %s", res->GetError().c_str());
return false;
}

Expand All @@ -145,6 +190,55 @@ DuckdbInstallExtension(Datum name) {
return true;
}

static bool
CanCacheRemoteObject(std::string remote_object) {
return remote_object.rfind("https://", 0) * remote_object.rfind("http://", 0) * remote_object.rfind("s3://", 0) *
remote_object.rfind("s3a://", 0) * remote_object.rfind("s3n://", 0) * remote_object.rfind("gcs://", 0) *
remote_object.rfind("gs://", 0) * remote_object.rfind("r2://", 0) ==
0;
}

static bool
DuckdbCacheObject(Datum object, Datum type) {
auto db = DuckdbOpenDatabase();
auto connection = duckdb::make_uniq<duckdb::Connection>(*db);
auto &context = *connection->context;

auto object_type = DatumToString(type);

if (object_type != "parquet" && object_type != "csv") {
elog(WARNING, "(DuckdbCacheObject) Cache object type should be 'parquet' or 'csv'.");
return false;
}

auto object_type_fun = object_type == "parquet" ? "read_parquet" : "read_csv";

context.Query("SET enable_http_file_cache TO true;", false);
auto http_file_cache_set_dir_query =
duckdb::StringUtil::Format("SET http_file_cache_dir TO '%s';", CreateOrGetDirectoryPath("duckdb_cache"));
context.Query(http_file_cache_set_dir_query, false);

AddSecretsToDuckdbContext(context);

auto object_path = DatumToString(object);

if (!CanCacheRemoteObject(object_path)) {
elog(WARNING, "(DuckdbCacheObject) Object path '%s' can't be cached.", object_path.c_str());
return false;
}

auto cache_object_query =
duckdb::StringUtil::Format("SELECT 1 FROM %s('%s');", object_type_fun, object_path.c_str());
auto res = context.Query(cache_object_query, false);

if (res->HasError()) {
elog(WARNING, "(DuckdbCacheObject) %s", res->GetError().c_str());
return false;
}

return true;
}

} // namespace pgduckdb

extern "C" {
Expand All @@ -157,4 +251,13 @@ install_extension(PG_FUNCTION_ARGS) {
PG_RETURN_BOOL(result);
}

PG_FUNCTION_INFO_V1(cache);
Datum
cache(PG_FUNCTION_ARGS) {
Datum object = PG_GETARG_DATUM(0);
Datum type = PG_GETARG_DATUM(1);
bool result = pgduckdb::DuckdbCacheObject(object, type);
PG_RETURN_BOOL(result);
}

} // extern "C"
Loading

0 comments on commit 8a4c63e

Please sign in to comment.