diff --git a/Makefile b/Makefile index 2bbb45b0..2d1177d3 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ SRCS = src/scan/heap_reader.cpp \ src/pgduckdb_options.cpp \ src/pgduckdb_planner.cpp \ src/pgduckdb_types.cpp \ + src/pgduckdb_utils.cpp \ src/pgduckdb.cpp OBJS = $(subst .cpp,.o, $(SRCS)) diff --git a/include/pgduckdb/pgduckdb_options.hpp b/include/pgduckdb/pgduckdb_options.hpp index a20b071a..25239de9 100644 --- a/include/pgduckdb/pgduckdb_options.hpp +++ b/include/pgduckdb/pgduckdb_options.hpp @@ -1,40 +1,19 @@ #pragma once +#include "duckdb.hpp" + #include #include namespace pgduckdb { -/* constants for duckdb.secrets */ -#define Natts_duckdb_secret 6 -#define Anum_duckdb_secret_type 1 -#define Anum_duckdb_secret_id 2 -#define Anum_duckdb_secret_secret 3 -#define Anum_duckdb_secret_region 4 -#define Anum_duckdb_secret_endpoint 5 -#define Anum_duckdb_secret_r2_account_id 6 - -typedef struct DuckdbSecret { - std::string type; - std::string id; - std::string secret; - std::string region; - std::string endpoint; - std::string r2_account_id; -} DuckdbSecret; - -extern std::vector ReadDuckdbSecrets(); - -/* constants for duckdb.extensions */ -#define Natts_duckdb_extension 2 -#define Anum_duckdb_extension_name 1 -#define Anum_duckdb_extension_enable 2 +void AddSecretsToDuckdbContext(duckdb::ClientContext &context); -typedef struct DuckdbExension { +typedef struct DuckdbExtension { std::string name; bool enabled; -} DuckdbExension; +} DuckdbExtension; -extern std::vector ReadDuckdbExtensions(); +extern std::vector ReadDuckdbExtensions(); } // namespace pgduckdb diff --git a/include/pgduckdb/pgduckdb_utils.hpp b/include/pgduckdb/pgduckdb_utils.hpp index 3e16c6cb..569f86bb 100644 --- a/include/pgduckdb/pgduckdb_utils.hpp +++ b/include/pgduckdb/pgduckdb_utils.hpp @@ -19,4 +19,8 @@ TokenizeString(char *str, const char delimiter) { return v; }; +std::string CreateOrGetDirectoryPath(std::string directoryName); + } // namespace pgduckdb + +void DuckdbCreateCacheDirectory(void); diff --git a/sql/pg_duckdb--0.0.1.sql b/sql/pg_duckdb--0.0.1.sql index f261bf63..4b75ac67 100644 --- a/sql/pg_duckdb--0.0.1.sql +++ b/sql/pg_duckdb--0.0.1.sql @@ -75,6 +75,9 @@ CREATE TABLE extensions ( CREATE OR REPLACE FUNCTION install_extension(extension_name TEXT) RETURNS bool LANGUAGE C AS 'MODULE_PATHNAME', 'install_extension'; +CREATE OR REPLACE FUNCTION cache(path TEXT) RETURNS bool + LANGUAGE C AS 'MODULE_PATHNAME', 'cache'; + DO $$ BEGIN RAISE WARNING 'To actually execute queries using DuckDB you need to run "SET duckdb.execution TO true;"'; diff --git a/src/pgduckdb.cpp b/src/pgduckdb.cpp index f7a829ab..9791c86b 100644 --- a/src/pgduckdb.cpp +++ b/src/pgduckdb.cpp @@ -18,6 +18,7 @@ PG_MODULE_MAGIC; void _PG_init(void) { DuckdbInitGUC(); + DuckdbCreateCacheDirectory(); DuckdbInitHooks(); DuckdbInitNode(); } diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index 95252090..08c065df 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -9,64 +9,11 @@ #include "pgduckdb/scan/postgres_seq_scan.hpp" #include "pgduckdb/pgduckdb_utils.hpp" -#include - -#include -#include -#include -#include - namespace pgduckdb { - -static bool -CheckDataDirectory(const char *data_directory) { - struct stat info; - - if (lstat(data_directory, &info) != 0) { - if (errno == ENOENT) { - elog(DEBUG2, "Directory `%s` doesn't exists.", data_directory); - return false; - } else if (errno == EACCES) { - elog(ERROR, "Can't access `%s` directory.", data_directory); - } else { - elog(ERROR, "Other error when reading `%s`.", data_directory); - } - } - - if (!S_ISDIR(info.st_mode)) { - elog(WARNING, "`%s` is not directory.", data_directory); - } - - if (access(data_directory, R_OK | W_OK)) { - elog(ERROR, "Directory `%s` permission problem.", data_directory); - } - - return true; -} - -static std::string -GetExtensionDirectory() { - StringInfo duckdb_extension_data_directory = makeStringInfo(); - appendStringInfo(duckdb_extension_data_directory, "%s/duckdb_extensions", DataDir); - - if (!CheckDataDirectory(duckdb_extension_data_directory->data)) { - if (mkdir(duckdb_extension_data_directory->data, S_IRWXU | S_IRWXG | S_IRWXO) == -1) { - int error = errno; - pfree(duckdb_extension_data_directory->data); - elog(ERROR, "Creating duckdb extensions directory failed with reason `%s`\n", strerror(error)); - } - elog(DEBUG2, "Created %s as `duckdb.data_dir`", duckdb_extension_data_directory->data); - }; - - std::string duckdb_extension_directory(duckdb_extension_data_directory->data); - pfree(duckdb_extension_data_directory->data); - return duckdb_extension_directory; -} - duckdb::unique_ptr DuckdbOpenDatabase() { duckdb::DBConfig config; - config.SetOptionByName("extension_directory", GetExtensionDirectory()); + config.SetOptionByName("extension_directory", CreateOrGetDirectoryPath("duckdb_extensions")); return duckdb::make_uniq(nullptr, &config); } @@ -99,29 +46,7 @@ DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_co catalog.CreateTableFunction(context, &index_scan_info); context.transaction.Commit(); - auto duckdb_secrets = ReadDuckdbSecrets(); - - int secret_id = 0; - for (auto &secret : duckdb_secrets) { - StringInfo secret_key = makeStringInfo(); - bool is_r2_cloud_secret = (secret.type.rfind("R2", 0) == 0); - appendStringInfo(secret_key, "CREATE SECRET duckdbSecret_%d ", secret_id); - appendStringInfo(secret_key, "(TYPE %s, KEY_ID '%s', SECRET '%s'", secret.type.c_str(), secret.id.c_str(), - secret.secret.c_str()); - if (secret.region.length() && !is_r2_cloud_secret) { - appendStringInfo(secret_key, ", REGION '%s'", secret.region.c_str()); - } - if (secret.endpoint.length() && !is_r2_cloud_secret) { - appendStringInfo(secret_key, ", ENDPOINT '%s'", secret.endpoint.c_str()); - } - if (is_r2_cloud_secret) { - appendStringInfo(secret_key, ", ACCOUNT_ID '%s'", secret.endpoint.c_str()); - } - appendStringInfo(secret_key, ");"); - context.Query(secret_key->data, false); - pfree(secret_key->data); - secret_id++; - } + AddSecretsToDuckdbContext(context); auto duckdb_extensions = ReadDuckdbExtensions(); @@ -137,6 +62,10 @@ DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_co pfree(duckdb_extension->data); } + auto http_file_cache_set_dir_query = + duckdb::StringUtil::Format("SET http_file_cache_dir TO '%s';", CreateOrGetDirectoryPath("duckdb_cache")); + context.Query(http_file_cache_set_dir_query, false); + return connection; } diff --git a/src/pgduckdb_options.cpp b/src/pgduckdb_options.cpp index 2ae61348..ea340b89 100644 --- a/src/pgduckdb_options.cpp +++ b/src/pgduckdb_options.cpp @@ -16,9 +16,33 @@ extern "C" { #include "pgduckdb/pgduckdb_options.hpp" #include "pgduckdb/pgduckdb_duckdb.hpp" +#include "pgduckdb/pgduckdb_utils.hpp" namespace pgduckdb { +/* constants for duckdb.secrets */ +#define Natts_duckdb_secret 6 +#define Anum_duckdb_secret_type 1 +#define Anum_duckdb_secret_id 2 +#define Anum_duckdb_secret_secret 3 +#define Anum_duckdb_secret_region 4 +#define Anum_duckdb_secret_endpoint 5 +#define Anum_duckdb_secret_r2_account_id 6 + +typedef struct DuckdbSecret { + std::string type; + std::string id; + std::string secret; + std::string region; + std::string endpoint; + std::string r2_account_id; +} DuckdbSecret; + +/* constants for duckdb.extensions */ +#define Natts_duckdb_extension 2 +#define Anum_duckdb_extension_name 1 +#define Anum_duckdb_extension_enable 2 + static Oid GetDuckdbNamespace(void) { return get_namespace_oid("duckdb", false); @@ -43,8 +67,8 @@ DatumToString(Datum datum) { return column_value; } -std::vector -ReadDuckdbSecrets() { +static std::vector +DuckdbReadSecrets() { HeapTuple tuple = NULL; Oid duckdb_secret_table_relation_id = SecretsTableRelationId(); Relation duckdb_secret_relation = table_open(duckdb_secret_table_relation_id, AccessShareLock); @@ -79,21 +103,47 @@ ReadDuckdbSecrets() { return duckdb_secrets; } -std::vector +void +AddSecretsToDuckdbContext(duckdb::ClientContext &context) { + auto duckdb_secrets = DuckdbReadSecrets(); + int secret_id = 0; + for (auto &secret : duckdb_secrets) { + StringInfo secret_key = makeStringInfo(); + bool is_r2_cloud_secret = (secret.type.rfind("R2", 0) == 0); + appendStringInfo(secret_key, "CREATE SECRET pg_duckdb_secret_%d ", secret_id); + appendStringInfo(secret_key, "(TYPE %s, KEY_ID '%s', SECRET '%s'", secret.type.c_str(), secret.id.c_str(), + secret.secret.c_str()); + if (secret.region.length() && !is_r2_cloud_secret) { + appendStringInfo(secret_key, ", REGION '%s'", secret.region.c_str()); + } + if (secret.endpoint.length() && !is_r2_cloud_secret) { + appendStringInfo(secret_key, ", ENDPOINT '%s'", secret.endpoint.c_str()); + } + if (is_r2_cloud_secret) { + appendStringInfo(secret_key, ", ACCOUNT_ID '%s'", secret.endpoint.c_str()); + } + appendStringInfo(secret_key, ");"); + context.Query(secret_key->data, false); + pfree(secret_key->data); + secret_id++; + } +} + +std::vector ReadDuckdbExtensions() { HeapTuple tuple = NULL; Oid duckdb_extension_table_relation_id = ExtensionsTableRelationId(); Relation duckdb_extension_relation = table_open(duckdb_extension_table_relation_id, AccessShareLock); SysScanDescData *scan = systable_beginscan(duckdb_extension_relation, InvalidOid, false, GetActiveSnapshot(), 0, NULL); - std::vector duckdb_extensions; + std::vector duckdb_extensions; while (HeapTupleIsValid(tuple = systable_getnext(scan))) { Datum datum_array[Natts_duckdb_secret]; bool is_null_array[Natts_duckdb_secret]; heap_deform_tuple(tuple, RelationGetDescr(duckdb_extension_relation), datum_array, is_null_array); - DuckdbExension secret; + DuckdbExtension secret; secret.name = DatumToString(datum_array[Anum_duckdb_extension_name - 1]); secret.enabled = DatumGetBool(datum_array[Anum_duckdb_extension_enable - 1]); @@ -112,16 +162,11 @@ DuckdbInstallExtension(Datum name) { auto &context = *connection->context; auto extension_name = DatumToString(name); - - StringInfo install_extension_command = makeStringInfo(); - appendStringInfo(install_extension_command, "INSTALL %s;", extension_name.c_str()); - - auto res = context.Query(install_extension_command->data, false); - - pfree(install_extension_command->data); + auto install_extension_command = duckdb::StringUtil::Format("INSTALL %s;", extension_name.c_str()); + auto res = context.Query(install_extension_command, false); if (res->HasError()) { - elog(WARNING, "(duckdb_install_extension) %s", res->GetError().c_str()); + elog(WARNING, "(DuckdbInstallExtension) %s", res->GetError().c_str()); return false; } @@ -145,6 +190,31 @@ DuckdbInstallExtension(Datum name) { return true; } +static bool +DuckdbCacheObject(Datum object) { + auto db = DuckdbOpenDatabase(); + auto connection = duckdb::make_uniq(*db); + auto &context = *connection->context; + + context.Query("SET enable_http_file_cache TO true;", false); + auto http_file_cache_set_dir_query = + duckdb::StringUtil::Format("SET http_file_cache_dir TO '%s';", CreateOrGetDirectoryPath("duckdb_cache")); + context.Query(http_file_cache_set_dir_query, false); + + AddSecretsToDuckdbContext(context); + + auto object_name = DatumToString(object); + auto cache_object_query = duckdb::StringUtil::Format("SELECT 1 FROM '%s';", object_name.c_str()); + auto res = context.Query(cache_object_query, false); + + if (res->HasError()) { + elog(WARNING, "(DuckdbCacheObject) %s", res->GetError().c_str()); + return false; + } + + return true; +} + } // namespace pgduckdb extern "C" { @@ -157,4 +227,12 @@ install_extension(PG_FUNCTION_ARGS) { PG_RETURN_BOOL(result); } +PG_FUNCTION_INFO_V1(cache); +Datum +cache(PG_FUNCTION_ARGS) { + Datum object = PG_GETARG_DATUM(0); + bool result = pgduckdb::DuckdbCacheObject(object); + PG_RETURN_BOOL(result); +} + } // extern "C" diff --git a/src/pgduckdb_utils.cpp b/src/pgduckdb_utils.cpp new file mode 100644 index 00000000..174e0259 --- /dev/null +++ b/src/pgduckdb_utils.cpp @@ -0,0 +1,67 @@ + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +} + +#include + +#include +#include +#include +#include + +namespace pgduckdb { + +static bool +CheckDirectory(const char *directory) { + struct stat info; + + if (lstat(directory, &info) != 0) { + if (errno == ENOENT) { + elog(DEBUG2, "Directory `%s` doesn't exists.", directory); + return false; + } else if (errno == EACCES) { + elog(ERROR, "Can't access `%s` directory.", directory); + } else { + elog(ERROR, "Other error when reading `%s`.", directory); + } + } + + if (!S_ISDIR(info.st_mode)) { + elog(WARNING, "`%s` is not directory.", directory); + } + + if (access(directory, R_OK | W_OK)) { + elog(ERROR, "Directory `%s` permission problem.", directory); + } + + return true; +} + +std::string +CreateOrGetDirectoryPath(std::string directory_name) { + StringInfo duckdb_data_directory = makeStringInfo(); + appendStringInfo(duckdb_data_directory, "%s/%s", DataDir, directory_name.c_str()); + + if (!CheckDirectory(duckdb_data_directory->data)) { + if (mkdir(duckdb_data_directory->data, S_IRWXU | S_IRWXG | S_IRWXO) == -1) { + int error = errno; + elog(ERROR, "Creating %s directory failed with reason `%s`\n", duckdb_data_directory->data, + strerror(error)); + pfree(duckdb_data_directory->data); + } + elog(DEBUG2, "Created %s directory", duckdb_data_directory->data); + }; + + std::string directory(duckdb_data_directory->data); + pfree(duckdb_data_directory->data); + return directory; +} + +} // namespace pgduckdb + +void +DuckdbCreateCacheDirectory() { + pgduckdb::CreateOrGetDirectoryPath("duckdb_cache"); +} diff --git a/third_party/cached_httpfs/CMakeLists.txt b/third_party/cached_httpfs/CMakeLists.txt index df912394..543359f9 100644 --- a/third_party/cached_httpfs/CMakeLists.txt +++ b/third_party/cached_httpfs/CMakeLists.txt @@ -11,6 +11,7 @@ build_static_extension( hffs.cpp s3fs.cpp httpfs.cpp + http_file_cache.cpp http_state.cpp crypto.cpp create_secret_functions.cpp @@ -23,6 +24,7 @@ build_loadable_extension( hffs.cpp s3fs.cpp httpfs.cpp + http_file_cache.cpp http_state.cpp crypto.cpp create_secret_functions.cpp diff --git a/third_party/cached_httpfs/README.md b/third_party/cached_httpfs/README.md deleted file mode 100644 index 9b214b17..00000000 --- a/third_party/cached_httpfs/README.md +++ /dev/null @@ -1 +0,0 @@ -Documentation on S3 tests setup can be found [here](../../test/sql/copy/s3/README.md) \ No newline at end of file diff --git a/third_party/cached_httpfs/cached_httpfs_extension.cpp b/third_party/cached_httpfs/cached_httpfs_extension.cpp index e856e622..47edaf81 100644 --- a/third_party/cached_httpfs/cached_httpfs_extension.cpp +++ b/third_party/cached_httpfs/cached_httpfs_extension.cpp @@ -62,6 +62,10 @@ static void LoadInternal(DatabaseInstance &instance) { config.AddExtensionOption("hf_max_per_page", "Debug option to limit number of items returned in list requests", LogicalType::UBIGINT, Value::UBIGINT(0)); + // HTTP file cache options + config.AddExtensionOption("enable_http_file_cache", "Enable http file cache", LogicalType::BOOLEAN, Value(false)); + config.AddExtensionOption("http_file_cache_dir", "HTTP file cache directory", LogicalType::VARCHAR); + auto provider = make_uniq(config); provider->SetAll(); diff --git a/third_party/cached_httpfs/http_file_cache.cpp b/third_party/cached_httpfs/http_file_cache.cpp new file mode 100644 index 00000000..56b7daa1 --- /dev/null +++ b/third_party/cached_httpfs/http_file_cache.cpp @@ -0,0 +1,113 @@ +#include "http_file_cache.hpp" + +namespace duckdb { + +CachedFile::CachedFile(const string &cache_dir, FileSystem &fs, const std::string &key, bool cache_file) : fs(fs) { + file_name = cache_dir + "/" + key; + + GetDirectoryCacheLock(cache_dir); + + FileOpenFlags flags = + FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_NULL_IF_NOT_EXISTS | FileLockType::READ_LOCK; + handle = fs.OpenFile(file_name, flags); + if (handle) { + initialized = true; + size = handle->GetFileSize(); + } else if (handle == nullptr && cache_file) { + flags = FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE | FileLockType::WRITE_LOCK; + handle = fs.OpenFile(file_name, flags); + } + + ReleaseDirectoryCacheLock(); +} + +CachedFile::~CachedFile() { + if (!initialized && handle) { + fs.RemoveFile(file_name); + } +} + +void CachedFile::GetDirectoryCacheLock(const string &cache_dir) { + std::string lock_file = cache_dir + "/.lock"; + FileOpenFlags flags = FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE | + FileFlags::FILE_FLAGS_EXCLUSIVE_CREATE | FileFlags::FILE_FLAGS_NULL_IF_EXISTS | + FileLockType::WRITE_LOCK; + directory_lock_handle = fs.OpenFile(lock_file, flags); + if (directory_lock_handle == nullptr) { + flags = FileFlags::FILE_FLAGS_WRITE | FileLockType::WRITE_LOCK; + directory_lock_handle = fs.OpenFile(lock_file, flags); + } +} + + +void CachedFile::ReleaseDirectoryCacheLock() { + directory_lock_handle->Close(); + directory_lock_handle.reset(); +} + + +CachedFileHandle::CachedFileHandle(shared_ptr &file_p) { + // If the file was not yet initialized, we need to grab a lock. + if (!file_p->initialized) { + lock = make_uniq>(file_p->lock); + } + file = file_p; +} + +void CachedFileHandle::SetInitialized(idx_t total_size) { + if (file->initialized) { + throw InternalException("Cannot set initialized on cached file that was already initialized"); + } + if (!lock) { + throw InternalException("Cannot set initialized on cached file without lock"); + } + file->size = total_size; + file->initialized = true; + file->handle->Close(); + lock = nullptr; + + FileOpenFlags flags = FileFlags::FILE_FLAGS_READ | FileLockType::READ_LOCK; + file->handle = file->fs.OpenFile(file->file_name, flags); +} + +void CachedFileHandle::Allocate(idx_t size) { + if (file->initialized) { + throw InternalException("Cannot allocate cached file that was already initialized"); + } + file->handle->Trim(0, size); + file->capacity = size; +} + +void CachedFileHandle::GrowFile(idx_t new_capacity, idx_t bytes_to_copy) { + file->handle->Trim(bytes_to_copy, new_capacity); +} + +void CachedFileHandle::Write(const char *buffer, idx_t length, idx_t offset) { + //! Only write to non-initialized files with a lock; + D_ASSERT(!file->initialized && lock); + file->handle->Write((void *)buffer, length, offset); +} + +void CachedFileHandle::Read(void *buffer, idx_t length, idx_t offset) { + //! Only read to initialized files without a lock; + D_ASSERT(file->initialized && !lock); + file->handle->Read((void *)buffer, length, offset); +} + +//! Get cache entry, create if not exists only if caching is enabled +shared_ptr HTTPFileCache::GetCachedFile(const string &cache_dir, const string &key, bool cache_file) { + lock_guard lock(cached_files_mutex); + auto it = cached_files.find(key); + if (it != cached_files.end()) { + return it->second; + } + auto cache_entry = make_shared_ptr(cache_dir, db->GetFileSystem(), key, cache_file); + if (cache_entry->Initialized() || cache_file) { + cached_files[key] = cache_entry; + return cache_entry; + } else { + return nullptr; + } +} + +} // namespace duckdb diff --git a/third_party/cached_httpfs/http_state.cpp b/third_party/cached_httpfs/http_state.cpp index fd0a729a..28626ce4 100644 --- a/third_party/cached_httpfs/http_state.cpp +++ b/third_party/cached_httpfs/http_state.cpp @@ -3,48 +3,6 @@ namespace duckdb { -CachedFileHandle::CachedFileHandle(shared_ptr &file_p) { - // If the file was not yet initialized, we need to grab a lock. - if (!file_p->initialized) { - lock = make_uniq>(file_p->lock); - } - file = file_p; -} - -void CachedFileHandle::SetInitialized(idx_t total_size) { - if (file->initialized) { - throw InternalException("Cannot set initialized on cached file that was already initialized"); - } - if (!lock) { - throw InternalException("Cannot set initialized on cached file without lock"); - } - file->size = total_size; - file->initialized = true; - lock = nullptr; -} - -void CachedFileHandle::AllocateBuffer(idx_t size) { - if (file->initialized) { - throw InternalException("Cannot allocate a buffer for a cached file that was already initialized"); - } - file->data = shared_ptr(new char[size], std::default_delete()); - file->capacity = size; -} - -void CachedFileHandle::GrowBuffer(idx_t new_capacity, idx_t bytes_to_copy) { - // copy shared ptr to old data - auto old_data = file->data; - // allocate new buffer that can hold the new capacity - AllocateBuffer(new_capacity); - // copy the old data - Write(old_data.get(), bytes_to_copy); -} - -void CachedFileHandle::Write(const char *buffer, idx_t length, idx_t offset) { - //! Only write to non-initialized files with a lock; - D_ASSERT(!file->initialized && lock); - memcpy(file->data.get() + offset, buffer, length); -} void HTTPState::Reset() { // Reset Counters @@ -54,9 +12,6 @@ void HTTPState::Reset() { post_count = 0; total_bytes_received = 0; total_bytes_sent = 0; - - // Reset cached files - cached_files.clear(); } shared_ptr HTTPState::TryGetState(ClientContext &context) { @@ -94,14 +49,4 @@ void HTTPState::WriteProfilingInformation(std::ostream &ss) { ss << "└─────────────────────────────────────┘\n"; } -//! Get cache entry, create if not exists -shared_ptr &HTTPState::GetCachedFile(const string &path) { - lock_guard lock(cached_files_mutex); - auto &cache_entry_ref = cached_files[path]; - if (!cache_entry_ref) { - cache_entry_ref = make_shared_ptr(); - } - return cache_entry_ref; -} - } // namespace duckdb diff --git a/third_party/cached_httpfs/httpfs.cpp b/third_party/cached_httpfs/httpfs.cpp index 2851bf41..85c19e8f 100644 --- a/third_party/cached_httpfs/httpfs.cpp +++ b/third_party/cached_httpfs/httpfs.cpp @@ -1,6 +1,7 @@ #include "httpfs.hpp" #include "duckdb/common/atomic.hpp" +#include "duckdb/common/crypto/md5.hpp" #include "duckdb/common/exception/http_exception.hpp" #include "duckdb/common/file_opener.hpp" #include "http_state.hpp" @@ -42,6 +43,8 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr opener) { bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; std::string ca_cert_file; uint64_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; + bool enable_http_file_cache = DEFAULT_HTTP_CACHE; + std::string http_file_cache_dir = DEFAULT_HTTP_FILE_CACHE_DIR; Value value; if (FileOpener::TryGetCurrentSetting(opener, "http_timeout", value)) { @@ -71,6 +74,12 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr opener) { if (FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", value)) { hf_max_per_page = value.GetValue(); } + if (FileOpener::TryGetCurrentSetting(opener, "enable_http_file_cache", value)) { + enable_http_file_cache = value.GetValue(); + } + if (FileOpener::TryGetCurrentSetting(opener, "http_file_cache_dir", value)) { + http_file_cache_dir = value.ToString(); + } return {timeout, retries, @@ -81,7 +90,9 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr opener) { enable_server_cert_verification, ca_cert_file, "", - hf_max_per_page}; + hf_max_per_page, + enable_http_file_cache, + http_file_cache_dir}; } unique_ptr HTTPClientCache::GetClient() { @@ -323,7 +334,7 @@ unique_ptr HTTPFileSystem::GetRequest(FileHandle &handle, strin hfh.state->total_bytes_received += data_length; } if (!hfh.cached_file_handle->GetCapacity()) { - hfh.cached_file_handle->AllocateBuffer(data_length); + hfh.cached_file_handle->Allocate(data_length); hfh.length = data_length; hfh.cached_file_handle->Write(data, data_length); } else { @@ -331,9 +342,9 @@ unique_ptr HTTPFileSystem::GetRequest(FileHandle &handle, strin while (new_capacity < hfh.length + data_length) { new_capacity *= 2; } - // Grow buffer when running out of space + // Grow file when running out of space if (new_capacity != hfh.cached_file_handle->GetCapacity()) { - hfh.cached_file_handle->GrowBuffer(new_capacity, hfh.length); + hfh.cached_file_handle->GrowFile(new_capacity, hfh.length); } // We can just copy stuff hfh.cached_file_handle->Write(data, data_length, hfh.length); @@ -476,7 +487,7 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id if (!hfh.cached_file_handle->Initialized()) { throw InternalException("Cached file not initialized properly"); } - memcpy(buffer, hfh.cached_file_handle->GetData() + location, nr_bytes); + hfh.cached_file_handle->Read(buffer, nr_bytes, location); hfh.file_offset = location + nr_bytes; return; } @@ -607,6 +618,14 @@ optional_ptr HTTPFileSystem::GetGlobalCache() { return global_metadata_cache.get(); } +optional_ptr HTTPFileSystem::GetGlobalFileCache(ClientContext &context) { + lock_guard lock(global_cache_lock); + if (!glob_file_cache) { + glob_file_cache = make_uniq(context); + } + return glob_file_cache.get(); +} + // Get either the local, global, or no cache depending on settings static optional_ptr TryGetMetadataCache(optional_ptr opener, HTTPFileSystem &httpfs) { auto db = FileOpener::TryGetDatabase(opener); @@ -624,6 +643,18 @@ static optional_ptr TryGetMetadataCache(optional_ptr TryGetFileCache(optional_ptr opener, HTTPFileSystem &httpfs) { + auto db = FileOpener::TryGetDatabase(opener); + auto client_context = FileOpener::TryGetClientContext(opener); + if (!db) { + return nullptr; + } + if (client_context) { + return httpfs.GetGlobalFileCache(*client_context); + } + return nullptr; +} + void HTTPFileHandle::Initialize(optional_ptr opener) { auto &hfs = file_system.Cast(); state = HTTPState::TryGetState(opener); @@ -632,6 +663,7 @@ void HTTPFileHandle::Initialize(optional_ptr opener) { } auto current_cache = TryGetMetadataCache(opener, hfs); + auto current_file_cache = TryGetFileCache(opener, hfs); bool should_write_cache = false; if (!http_params.force_download && current_cache && !flags.OpenForWriting()) { @@ -716,25 +748,37 @@ void HTTPFileHandle::Initialize(optional_ptr opener) { throw IOException("Invalid Content-Length header received: %s", res->headers["Content-Length"]); } } - if (state && (length == 0 || http_params.force_download)) { - auto &cache_entry = state->GetCachedFile(path); - cached_file_handle = cache_entry->GetHandle(); - if (!cached_file_handle->Initialized()) { - // Try to fully download the file first - auto full_download_result = hfs.GetRequest(*this, path, {}); - if (full_download_result->code != 200) { - throw HTTPException(*res, "Full download failed to to URL \"%s\": %s (%s)", - full_download_result->http_url, to_string(full_download_result->code), - full_download_result->error); - } - - // Mark the file as initialized, set its final length, and unlock it to allowing parallel reads - cached_file_handle->SetInitialized(length); - // We shouldn't write these to cache - should_write_cache = false; + if (current_file_cache) { + MD5Context md5_context; + // NO ETag header + if (res->headers.find("ETag") == res->headers.end() || res->headers["ETag"].empty()) { + md5_context.Add(path); } else { - length = cached_file_handle->GetSize(); + md5_context.Add(res->headers["ETag"]); + } + auto cache_entry = current_file_cache->GetCachedFile(http_params.http_file_cache_dir, md5_context.FinishHex(), + http_params.enable_http_file_cache); + if (cache_entry) { + //! Cache found or created + cached_file_handle = cache_entry->GetHandle(); + if (!cached_file_handle->Initialized()) { + // Try to fully download the file first + auto full_download_result = hfs.GetRequest(*this, path, {}); + if (full_download_result->code != 200) { + throw HTTPException(*res, "Full download failed to to URL \"%s\": %s (%s)", + full_download_result->http_url, to_string(full_download_result->code), + full_download_result->error); + } + + // Mark the file as initialized, set its final length, and unlock it to allowing parallel reads + cached_file_handle->SetInitialized(length); + + // We shouldn't write these to cache + should_write_cache = false; + } else { + length = cached_file_handle->GetSize(); + } } } diff --git a/third_party/cached_httpfs/httpfs_config.py b/third_party/cached_httpfs/httpfs_config.py deleted file mode 100644 index 791dcb1c..00000000 --- a/third_party/cached_httpfs/httpfs_config.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -# list all include directories -include_directories = [ - os.path.sep.join(x.split('/')) - for x in ['extension/httpfs/include', 'third_party/httplib', 'extension/parquet/include'] -] -# source files -source_files = [ - os.path.sep.join(x.split('/')) - for x in [ - 'extension/httpfs/' + s - for s in [ - 'create_secret_functions.cpp', - 'crypto.cpp', - 'hffs.cpp', - 'httpfs.cpp', - 'httpfs_extension.cpp', - 's3fs.cpp', - ] - ] -] diff --git a/third_party/cached_httpfs/include/http_file_cache.hpp b/third_party/cached_httpfs/include/http_file_cache.hpp new file mode 100644 index 00000000..5374f74f --- /dev/null +++ b/third_party/cached_httpfs/include/http_file_cache.hpp @@ -0,0 +1,99 @@ +#pragma once + +#include "duckdb/main/client_data.hpp" + +namespace duckdb { + +class CachedFileHandle; + +//! Represents a file that is intended to be fully downloaded, then used in parallel by multiple threads +class CachedFile : public enable_shared_from_this { + friend class CachedFileHandle; + +public: + CachedFile(const string &cache_dir, FileSystem &fs, const std::string &key, bool cache_file); + ~CachedFile(); + + unique_ptr GetHandle() { + auto this_ptr = shared_from_this(); + return make_uniq(this_ptr); + } + + bool Initialized() { + return initialized; + } + +private: + void GetDirectoryCacheLock(const string &cache_dir); + void ReleaseDirectoryCacheLock(); + +private: + // FileSystem + FileSystem &fs; + // File name + std::string file_name; + // Cache file FileDescriptor + unique_ptr handle; + // Lock file + unique_ptr directory_lock_handle; + //! Data capacity + uint64_t capacity = 0; + //! Size of file + idx_t size; + //! Lock for initializing the file + mutex lock; + //! When initialized is set to true, the file is safe for parallel reading without holding the lock + atomic initialized = {false}; +}; + +//! Handle to a CachedFile +class CachedFileHandle { +public: + explicit CachedFileHandle(shared_ptr &file_p); + //! Allocate file size + void Allocate(idx_t size); + //! Grow file to new size, copying over `bytes_to_copy` to the new buffer + void GrowFile(idx_t new_capacity, idx_t bytes_to_copy); + //! Indicate the file is fully downloaded and safe for parallel reading without lock + void SetInitialized(idx_t total_size); + //! Write to the buffer + void Write(const char *buffer, idx_t length, idx_t offset = 0); + //! Read data to buffer + void Read(void *buffer, idx_t length, idx_t offset); + + bool Initialized() { + return file->initialized; + } + uint64_t GetCapacity() { + return file->capacity; + } + //! Return the size of the initialized file + idx_t GetSize() { + D_ASSERT(file->initialized); + return file->size; + } + +private: + unique_ptr> lock; + shared_ptr file; +}; + +class HTTPFileCache : public ClientContextState { +public: + explicit HTTPFileCache(ClientContext &context) { + db = context.db; + } + + //! Get cache, create if not exists only if caching is enabled + shared_ptr GetCachedFile(const string &cache_dir, const string &key, bool create_cache); + +private: + //! Database Instance + shared_ptr db; + //! Mutex to lock when getting the cached file (Parallel Only) + mutex cached_files_mutex; + //! In case of fully downloading the file, the cached files of this query + unordered_map> cached_files; +}; + +} // namespace duckdb diff --git a/third_party/cached_httpfs/include/http_state.hpp b/third_party/cached_httpfs/include/http_state.hpp index 93f4abfc..8828f620 100644 --- a/third_party/cached_httpfs/include/http_state.hpp +++ b/third_party/cached_httpfs/include/http_state.hpp @@ -9,71 +9,10 @@ namespace duckdb { -class CachedFileHandle; - -//! Represents a file that is intended to be fully downloaded, then used in parallel by multiple threads -class CachedFile : public enable_shared_from_this { - friend class CachedFileHandle; - -public: - unique_ptr GetHandle() { - auto this_ptr = shared_from_this(); - return make_uniq(this_ptr); - } - -private: - //! Cached Data - shared_ptr data; - //! Data capacity - uint64_t capacity = 0; - //! Size of file - idx_t size; - //! Lock for initializing the file - mutex lock; - //! When initialized is set to true, the file is safe for parallel reading without holding the lock - atomic initialized = {false}; -}; - -//! Handle to a CachedFile -class CachedFileHandle { -public: - explicit CachedFileHandle(shared_ptr &file_p); - - //! allocate a buffer for the file - void AllocateBuffer(idx_t size); - //! Indicate the file is fully downloaded and safe for parallel reading without lock - void SetInitialized(idx_t total_size); - //! Grow buffer to new size, copying over `bytes_to_copy` to the new buffer - void GrowBuffer(idx_t new_capacity, idx_t bytes_to_copy); - //! Write to the buffer - void Write(const char *buffer, idx_t length, idx_t offset = 0); - - bool Initialized() { - return file->initialized; - } - const char *GetData() { - return file->data.get(); - } - uint64_t GetCapacity() { - return file->capacity; - } - //! Return the size of the initialized file - idx_t GetSize() { - D_ASSERT(file->initialized); - return file->size; - } - -private: - unique_ptr> lock; - shared_ptr file; -}; - class HTTPState : public ClientContextState { public: //! Reset all counters and cached files void Reset(); - //! Get cache entry, create if not exists - shared_ptr &GetCachedFile(const string &path); //! Helper functions to get the HTTP state static shared_ptr TryGetState(ClientContext &context); static shared_ptr TryGetState(optional_ptr opener); @@ -95,12 +34,6 @@ class HTTPState : public ClientContextState { Reset(); } void WriteProfilingInformation(std::ostream &ss) override; - -private: - //! Mutex to lock when getting the cached file(Parallel Only) - mutex cached_files_mutex; - //! In case of fully downloading the file, the cached files of this query - unordered_map> cached_files; }; } // namespace duckdb diff --git a/third_party/cached_httpfs/include/httpfs.hpp b/third_party/cached_httpfs/include/httpfs.hpp index 6e94c2f0..102d96af 100644 --- a/third_party/cached_httpfs/include/httpfs.hpp +++ b/third_party/cached_httpfs/include/httpfs.hpp @@ -2,6 +2,7 @@ #include "duckdb/common/case_insensitive_map.hpp" #include "duckdb/common/file_system.hpp" +#include "http_file_cache.hpp" #include "http_state.hpp" #include "duckdb/common/pair.hpp" #include "duckdb/common/unordered_map.hpp" @@ -41,6 +42,8 @@ struct HTTPParams { static constexpr bool DEFAULT_KEEP_ALIVE = true; static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; + static constexpr const char *DEFAULT_HTTP_FILE_CACHE_DIR = "/tmp"; + static constexpr bool DEFAULT_HTTP_CACHE = false; uint64_t timeout; uint64_t retries; @@ -55,6 +58,9 @@ struct HTTPParams { idx_t hf_max_per_page; + bool enable_http_file_cache; + std::string http_file_cache_dir; + static HTTPParams ReadFrom(optional_ptr opener); }; @@ -181,6 +187,7 @@ class HTTPFileSystem : public FileSystem { static void Verify(); optional_ptr GetGlobalCache(); + optional_ptr GetGlobalFileCache(ClientContext &context); protected: virtual duckdb::unique_ptr CreateHandle(const string &path, FileOpenFlags flags, @@ -194,6 +201,7 @@ class HTTPFileSystem : public FileSystem { // Global cache mutex global_cache_lock; duckdb::unique_ptr global_metadata_cache; + duckdb::unique_ptr glob_file_cache; }; } // namespace duckdb