Skip to content

Commit

Permalink
apacheGH-38309: [C++] build filesystems as separate modules
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Dec 4, 2023
1 parent 84c15da commit 2cca700
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 98 deletions.
26 changes: 15 additions & 11 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
#include "arrow/util/uri.h"

namespace Azure {
namespace Core {
namespace Credentials {
namespace Core::Credentials {

class TokenCredential;

} // namespace Credentials
} // namespace Core
namespace Storage {

Expand All @@ -40,8 +38,7 @@ class StorageSharedKeyCredential;
} // namespace Storage
} // namespace Azure

namespace arrow {
namespace fs {
namespace arrow::fs {

enum class AzureCredentialsKind : int8_t {
/// Anonymous access (no credentials used), public
Expand Down Expand Up @@ -116,15 +113,23 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {

bool Equals(const FileSystem& other) const override;

/// \cond FALSE
using FileSystem::CreateDir;
using FileSystem::DeleteDirContents;
using FileSystem::GetFileInfo;
using FileSystem::OpenAppendStream;
using FileSystem::OpenOutputStream;
/// \endcond

Result<FileInfo> GetFileInfo(const std::string& path) override;

Result<FileInfoVector> GetFileInfo(const FileSelector& select) override;

Status CreateDir(const std::string& path, bool recursive = true) override;
Status CreateDir(const std::string& path, bool recursive) override;

Status DeleteDir(const std::string& path) override;

Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override;
Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;

Status DeleteRootDirContents() override;

Expand All @@ -147,11 +152,11 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
const std::shared_ptr<const KeyValueMetadata>& metadata) override;

Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override;
const std::shared_ptr<const KeyValueMetadata>& metadata) override;

static Result<std::shared_ptr<AzureFileSystem>> Make(
const AzureOptions& options, const io::IOContext& = io::default_io_context());
Expand All @@ -163,5 +168,4 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {
std::unique_ptr<Impl> impl_;
};

} // namespace fs
} // namespace arrow
} // namespace arrow::fs
130 changes: 128 additions & 2 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include <dlfcn.h>
#include <mutex>
#include <sstream>
#include <utility>

#include "arrow/type_fwd.h"
#include "arrow/util/config.h"

#include "arrow/filesystem/filesystem.h"
Expand Down Expand Up @@ -128,7 +131,7 @@ std::string FileInfo::extension() const {
//////////////////////////////////////////////////////////////////////////
// FileSystem default method implementations

FileSystem::~FileSystem() {}
FileSystem::~FileSystem() = default;

Result<std::string> FileSystem::NormalizePath(std::string path) { return path; }

Expand Down Expand Up @@ -203,6 +206,10 @@ Future<> FileSystem::DeleteDirContentsAsync(const std::string& path,
});
}

Future<> FileSystem::DeleteDirContentsAsync(const std::string& path) {
return DeleteDirContentsAsync(path, false);
}

Result<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStream(
const FileInfo& info) {
RETURN_NOT_OK(ValidateInputFileInfo(info));
Expand Down Expand Up @@ -279,7 +286,7 @@ SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path,
base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()),
base_fs_(base_fs) {}

SubTreeFileSystem::~SubTreeFileSystem() {}
SubTreeFileSystem::~SubTreeFileSystem() = default;

Result<std::string> SubTreeFileSystem::NormalizeBasePath(
std::string base_path, const std::shared_ptr<FileSystem>& base_fs) {
Expand Down Expand Up @@ -677,12 +684,28 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs,

namespace {

auto& FileSystemFactoryRegistry() {
static struct {
std::recursive_mutex mutex;
std::unordered_map<std::string, FileSystem::Factory*> scheme_to_factory;
} registry;
return registry;
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
const auto scheme = uri.scheme();

{
auto& [mutex, scheme_to_factory] = FileSystemFactoryRegistry();
std::unique_lock lock{mutex};
if (auto it = scheme_to_factory.find(scheme); it != scheme_to_factory.end()) {
return it->second(uri, io_context, out_path);
}
}

if (scheme == "file") {
std::string path;
ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
Expand Down Expand Up @@ -738,6 +761,109 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,

} // namespace

Status RegisterFileSystemFactory(std::vector<std::string> schemes,
FileSystem::Factory factory) {
auto& [mutex, scheme_to_factory] = FileSystemFactoryRegistry();
std::unique_lock lock{mutex};

for (auto&& scheme : schemes) {
if (auto& ref = scheme_to_factory[scheme]) {
return Status::KeyError(
"Tried to add a factory for ", scheme,
":// uris, but a factory was already registered for that scheme");
} else {
ref = factory;
}
}
return Status::OK();
}

// XXX alternative approach:
// If we distrust global constructors more than we distrust
// symbol visibility, we could export specially named functions
// and retrieve them via dlsym.
Status RegisterFileSystemFactoryModule(std::string module_name) {
static auto* handle = dlopen(nullptr, RTLD_NOW | RTLD_LOCAL);
if (!handle) {
return Status::Invalid("dlopen failed: ", dlerror());
}

dlerror();

module_name = "ArrowFileSystemModule_" + module_name;
auto* module = reinterpret_cast<void (*)(Status*)>(dlsym(handle, module_name.c_str()));
if (!module) {
return Status::Invalid("dlsym failed: ", dlerror());
}

Status status;
module(&status);
return status;
}

// Does not result in a visible symbol on my machine:
extern "C" {
ARROW_EXPORT void ArrowFileSystemModule_slow(Status* status) {
*status = RegisterFileSystemFactory(
{"slow+file"},
[](const Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
auto local_uri = uri.ToString().substr(sizeof("slow+") - 1);
ARROW_ASSIGN_OR_RAISE(auto base_fs,
FileSystemFromUri(local_uri, io_context, out_path));
double average_latency = 1;
int32_t seed = 0xDEADBEEF;
ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items());
for (const auto& [key, value] : params) {
if (key == "average_latency") {
average_latency = std::stod(value);
}
if (key == "seed") {
seed = std::stoi(value, nullptr, /*base=*/16);
}
}
return std::make_shared<SlowFileSystem>(base_fs, average_latency, seed);
});
}
}

struct Registrar {
Registrar(std::vector<std::string> schemes, FileSystem::Factory factory) {
Status st = RegisterFileSystemFactory(std::move(schemes), std::move(factory));
if (!st.ok()) {
st.Abort();
}
}
};

// Global constructors will finish execution before main() starts or before
// dlopen()/LoadLibrary() returns if dynamically loaded.
Registrar kSlowFileSystemModule{
// XXX probably overengineering, but we could allow registering factories for
// wildcard/composable schemes like "slow+" which handle any URI whose scheme begins
// with that prefix. Downsides: possibly narrow utility, potential collision of
// param names.
{"slow+file"},
[](const Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
auto local_uri = uri.ToString().substr(sizeof("slow+") - 1);
ARROW_ASSIGN_OR_RAISE(auto base_fs,
FileSystemFromUri(local_uri, io_context, out_path));
double average_latency = 1;
int32_t seed = 0xDEADBEEF;
ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items());
for (const auto& [key, value] : params) {
if (key == "average_latency") {
average_latency = std::stod(value);
}
if (key == "seed") {
seed = std::stoi(value, nullptr, /*base=*/16);
}
}
return std::make_shared<SlowFileSystem>(base_fs, average_latency, seed);
},
};

Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
std::string* out_path) {
return FileSystemFromUri(uri_string, io::default_io_context(), out_path);
Expand Down
Loading

0 comments on commit 2cca700

Please sign in to comment.