diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 755887314d110..4c3d3a00e14bc 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -414,6 +414,12 @@ takes precedence over ccache if a storage backend is configured" ON) DEPENDS ARROW_FILESYSTEM) + define_option(ARROW_S3_MODULE + "Build the Arrow S3 filesystem as a dynamic module" + OFF + DEPENDS + ARROW_S3) + define_option(ARROW_SKYHOOK "Build the Skyhook libraries" OFF diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index e77a02d0c0800..3a74e8cc5fddc 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -876,6 +876,18 @@ if(ARROW_FILESYSTEM) foreach(ARROW_FILESYSTEM_TARGET ${ARROW_FILESYSTEM_TARGETS}) target_link_libraries(${ARROW_FILESYSTEM_TARGET} PRIVATE ${AWSSDK_LINK_LIBRARIES}) endforeach() + + if(ARROW_S3_MODULE) + if(NOT ARROW_BUILD_SHARED) + message(FATAL_ERROR "ARROW_S3_MODULE without shared libarrow is not supported") + endif() + + add_library(arrow_s3fs MODULE filesystem/s3fs_module.cc filesystem/s3fs.cc) + target_link_libraries(arrow_s3fs PRIVATE ${AWSSDK_LINK_LIBRARIES} arrow_shared) + set_source_files_properties(filesystem/s3fs.cc filesystem/s3fs_module.cc + PROPERTIES SKIP_PRECOMPILE_HEADERS ON + SKIP_UNITY_BUILD_INCLUSION ON) + endif() endif() list(APPEND ARROW_TESTING_SHARED_LINK_LIBS ${ARROW_GTEST_GMOCK}) diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index 5bfe257ac7bad..6974faae59b54 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -18,6 +18,7 @@ #include "arrow/adapters/orc/util.h" #include +#include #include #include #include diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index 7afdf566f2fb5..5250ed2a8879e 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -121,6 +121,24 @@ if(ARROW_S3) target_link_libraries(arrow-filesystem-s3fs-benchmark PRIVATE parquet_shared) endif() endif() + + if(ARROW_S3_MODULE) + add_arrow_test(s3fs_module_test + SOURCES + s3fs_module_test.cc + s3_test_util.cc + EXTRA_LABELS + filesystem + DEFINITIONS + ARROW_S3_LIBPATH="$" + EXTRA_LINK_LIBS + Boost::filesystem + Boost::system) + target_compile_definitions(arrow-filesystem-test + PUBLIC ARROW_S3_LIBPATH="$") + target_sources(arrow-filesystem-test PUBLIC s3fs_module_test.cc s3_test_util.cc) + target_link_libraries(arrow-filesystem-test PUBLIC Boost::filesystem Boost::system) + endif() endif() if(ARROW_HDFS) diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index b5765010ec7e9..29040fb2f0657 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -34,9 +34,6 @@ #ifdef ARROW_HDFS # include "arrow/filesystem/hdfs.h" #endif -#ifdef ARROW_S3 -# include "arrow/filesystem/s3fs.h" -#endif #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" @@ -700,6 +697,29 @@ class FileSystemFactoryRegistry { return ®istry; } + Status Unregister(const std::string& scheme) { + std::shared_lock lock{mutex_}; + RETURN_NOT_OK(CheckValid()); + + auto it = scheme_to_factory_.find(scheme); + if (it == scheme_to_factory_.end()) { + return Status::KeyError("No factories found for scheme ", scheme, + ", can't unregister"); + } + + std::function finalizer; + if (it->second.ok()) { + finalizer = it->second.ValueOrDie().finalizer; + } + scheme_to_factory_.erase(it); + lock.unlock(); + + if (finalizer) { + finalizer(); + } + return Status::OK(); + } + Result FactoryForScheme(const std::string& scheme) { std::shared_lock lock{mutex_}; RETURN_NOT_OK(CheckValid()); @@ -749,7 +769,7 @@ class FileSystemFactoryRegistry { if (finalized_) return; for (const auto& [_, registered_or_error] : scheme_to_factory_) { - if (!registered_or_error.ok()) continue; + if (!registered_or_error.ok() || !registered_or_error->finalizer) continue; registered_or_error->finalizer(); } finalized_ = true; @@ -819,6 +839,10 @@ FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory f namespace internal { void* GetFileSystemRegistry() { return FileSystemFactoryRegistry::GetInstance(); } + +Status UnregisterFileSystemFactory(const std::string& scheme) { + return FileSystemFactoryRegistry::GetInstance()->Unregister(scheme); +} } // namespace internal Status LoadFileSystemFactories(const char* libpath) { @@ -896,18 +920,6 @@ Result> FileSystemFromUriReal(const Uri& uri, "without HDFS support"); #endif } - if (scheme == "s3") { -#ifdef ARROW_S3 - RETURN_NOT_OK(EnsureS3Initialized()); - ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); - ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context)); - return s3fs; -#else - return Status::NotImplemented( - "Got S3 URI but Arrow compiled " - "without S3 support"); -#endif - } if (scheme == "mock") { // MockFileSystem does not have an diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index d4f62f86a7482..3a47eb62f5245 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -198,7 +198,7 @@ class ARROW_EXPORT FileSystem virtual Result PathFromUri(const std::string& uri_string) const; /// \brief Make a URI from which FileSystemFromUri produces an equivalent filesystem - /// \param path The path component to use in the resulting URI + /// \param path The path component to use in the resulting URI. Must be absolute. /// \return A URI string, or an error if an equivalent URI cannot be produced virtual Result MakeUri(std::string path) const; diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 9fe19cbf25058..0b19cc74b14a2 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -288,7 +288,14 @@ Result LocalFileSystem::PathFromUri(const std::string& uri_string) Result LocalFileSystem::MakeUri(std::string path) const { ARROW_ASSIGN_OR_RAISE(path, DoNormalizePath(std::move(path))); - return "file://" + path + (options_.use_mmap ? "?use_mmap" : ""); + if (!internal::DetectAbsolutePath(path)) { + return Status::Invalid("MakeUri requires an absolute path, got ", path); + } + ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path)); + if (uri[0] == '/') { + uri = "file://" + uri; + } + return uri + (options_.use_mmap ? "?use_mmap" : ""); } bool LocalFileSystem::Equals(const FileSystem& other) const { diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index 6dd7a8c75586c..2e91783c92dcf 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "arrow/filesystem/filesystem.h" @@ -428,9 +429,15 @@ TYPED_TEST(TestLocalFS, FileSystemFromUriFile) { this->TestLocalUri("file:///_?use_mmap", "/_"); if (this->path_formatter_.supports_uri()) { + EXPECT_THAT(this->fs_->MakeUri(""), Raises(StatusCode::Invalid)); + EXPECT_THAT(this->fs_->MakeUri("a/b"), Raises(StatusCode::Invalid)); + ASSERT_TRUE(this->local_fs_->options().use_mmap); ASSERT_OK_AND_ASSIGN(auto uri, this->fs_->MakeUri("/_")); EXPECT_EQ(uri, "file:///_?use_mmap"); + + ASSERT_OK_AND_ASSIGN(uri, this->fs_->MakeUri("/hello world/b/c")); + EXPECT_EQ(uri, "file:///hello%20world/b/c?use_mmap"); } #ifdef _WIN32 diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 96c771aeb61b8..ea1f7649f6970 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -138,6 +138,7 @@ #include "arrow/util/string.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/value_parsing.h" namespace arrow::fs { @@ -168,6 +169,8 @@ static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL"; static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3"; static constexpr const char kAwsDirectoryContentType[] = "application/x-directory"; +using namespace std::string_literals; + // ----------------------------------------------------------------------- // S3ProxyOptions implementation @@ -3031,6 +3034,30 @@ Result S3FileSystem::PathFromUri(const std::string& uri_string) con internal::AuthorityHandlingBehavior::kPrepend); } +Result S3FileSystem::MakeUri(std::string path) const { + if (path.length() <= 1 || path[0] != '/') { + return Status::Invalid("MakeUri requires an absolute, non-root path, got ", path); + } + ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path)); + if (!options().GetAccessKey().empty()) { + uri = "s3://" + options().GetAccessKey() + ":" + options().GetSecretKey() + "@" + + uri.substr("file:///"s.size()); + } else { + uri = "s3" + uri.substr("file"s.size()); + } + uri += "?"; + uri += "region=" + util::UriEscape(options().region); + uri += "&"; + uri += "scheme=" + options().scheme; + uri += "&"; + uri += "endpoint_override=" + util::UriEscape(options().endpoint_override); + uri += "&"; + uri += "allow_bucket_creation="s + (options().allow_bucket_creation ? "1" : "0"); + uri += "&"; + uri += "allow_bucket_deletion="s + (options().allow_bucket_deletion ? "1" : "0"); + return uri; +} + S3Options S3FileSystem::options() const { return impl_->options(); } std::string S3FileSystem::region() const { return impl_->region(); } @@ -3492,32 +3519,33 @@ bool IsS3Finalized() { return GetAwsInstance()->IsFinalized(); } S3GlobalOptions S3GlobalOptions::Defaults() { auto log_level = S3LogLevel::Fatal; - - auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL"); - - if (result.ok()) { - // Extract, trim, and downcase the value of the environment variable - auto value = - arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe())); - - if (value == "fatal") { - log_level = S3LogLevel::Fatal; - } else if (value == "error") { - log_level = S3LogLevel::Error; - } else if (value == "warn") { - log_level = S3LogLevel::Warn; - } else if (value == "info") { - log_level = S3LogLevel::Info; - } else if (value == "debug") { - log_level = S3LogLevel::Debug; - } else if (value == "trace") { - log_level = S3LogLevel::Trace; - } else if (value == "off") { - log_level = S3LogLevel::Off; - } - } - - return S3GlobalOptions{log_level}; + int num_event_loop_threads = 1; + // Extract, trim, and downcase the value of the environment variable + auto value = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL") + .Map(arrow::internal::AsciiToLower) + .Map(arrow::internal::TrimString) + .ValueOr("fatal"); + if (value == "fatal") { + log_level = S3LogLevel::Fatal; + } else if (value == "error") { + log_level = S3LogLevel::Error; + } else if (value == "warn") { + log_level = S3LogLevel::Warn; + } else if (value == "info") { + log_level = S3LogLevel::Info; + } else if (value == "debug") { + log_level = S3LogLevel::Debug; + } else if (value == "trace") { + log_level = S3LogLevel::Trace; + } else if (value == "off") { + log_level = S3LogLevel::Off; + } + + value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1"); + if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) { + num_event_loop_threads = u; + } + return S3GlobalOptions{log_level, num_event_loop_threads}; } // ----------------------------------------------------------------------- @@ -3535,4 +3563,14 @@ Result ResolveS3BucketRegion(const std::string& bucket) { return resolver->ResolveRegion(bucket); } +auto kS3FileSystemModule = ARROW_REGISTER_FILESYSTEM( + "s3", + [](const arrow::util::Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + RETURN_NOT_OK(EnsureS3Initialized()); + ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); + return S3FileSystem::Make(options, io_context); + }, + [] { DCHECK_OK(EnsureS3Finalized()); }); + } // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 85d5ff8fed553..a6f8334908f1f 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -25,20 +25,16 @@ #include "arrow/util/macros.h" #include "arrow/util/uri.h" -namespace Aws { -namespace Auth { - +namespace Aws::Auth { class AWSCredentialsProvider; class STSAssumeRoleCredentialsProvider; +} // namespace Aws::Auth -} // namespace Auth -namespace STS { +namespace Aws::STS { class STSClient; -} -} // namespace Aws +} // namespace Aws::STS -namespace arrow { -namespace fs { +namespace arrow::fs { /// Options for using a proxy for S3 struct ARROW_EXPORT S3ProxyOptions { @@ -277,6 +273,7 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; + Result MakeUri(std::string path) const override; /// \cond FALSE using FileSystem::CreateDir; @@ -418,5 +415,4 @@ Status EnsureS3Finalized(); ARROW_EXPORT Result ResolveS3BucketRegion(const std::string& bucket); -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/s3fs_module.cc b/cpp/src/arrow/filesystem/s3fs_module.cc new file mode 100644 index 0000000000000..d91413138b454 --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs_module.cc @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/filesystem_library.h" diff --git a/cpp/src/arrow/filesystem/s3fs_module_test.cc b/cpp/src/arrow/filesystem/s3fs_module_test.cc new file mode 100644 index 0000000000000..987a8979b271f --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs_module_test.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/s3_test_util.h" +#include "arrow/filesystem/s3fs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/io_util.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/range.h" +#include "arrow/util/string.h" + +namespace arrow::fs { + +auto* minio_env = ::testing::AddGlobalTestEnvironment(new MinioTestEnvironment); + +MinioTestEnvironment* GetMinioEnv() { + return ::arrow::internal::checked_cast(minio_env); +} + +class RegistrationTestEnvironment : public ::testing::Environment { + public: + void SetUp() override { + // Unregister the s3 filesystem factory so that we can be sure the module loading and + // the factories from the module are actually working + ASSERT_OK(internal::UnregisterFileSystemFactory("s3")); + ASSERT_OK(LoadFileSystemFactories(ARROW_S3_LIBPATH)); + } + void TearDown() override { EnsureFinalized(); } +}; + +auto* lib_env = ::testing::AddGlobalTestEnvironment(new RegistrationTestEnvironment); + +TEST(S3Test, FromUri) { + ASSERT_OK_AND_ASSIGN(auto minio, GetMinioEnv()->GetOneServer()); + + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("s3://" + minio->access_key() + ":" + + minio->secret_key() + + "@bucket/somedir/subdir/subfile", + &path)); + + EXPECT_EQ(fs->MakeUri("/" + path), + "s3://minio:miniopass@bucket/somedir/subdir/subfile" + "?region=us-east-1&scheme=https&endpoint_override=" + "&allow_bucket_creation=0&allow_bucket_deletion=0"); +} + +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index be43e14e84337..5898868408428 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -26,14 +26,13 @@ #include "arrow/status.h" #include "arrow/util/io_util.h" #include "arrow/util/string.h" +#include "arrow/util/uri.h" namespace arrow { using internal::StatusDetailFromErrno; -using util::Uri; -namespace fs { -namespace internal { +namespace fs::internal { TimePoint CurrentTimePoint() { auto now = std::chrono::system_clock::now(); @@ -262,6 +261,6 @@ Result GlobFiles(const std::shared_ptr& filesystem, FileSystemGlobalOptions global_options; -} // namespace internal -} // namespace fs +} // namespace fs::internal + } // namespace arrow diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index 74ddf015432d8..62e253ebc1a1d 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -24,13 +24,9 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/io/interfaces.h" #include "arrow/status.h" -#include "arrow/util/uri.h" #include "arrow/util/visibility.h" -namespace arrow { -using util::Uri; -namespace fs { -namespace internal { +namespace arrow::fs::internal { ARROW_EXPORT TimePoint CurrentTimePoint(); @@ -101,6 +97,12 @@ Result GlobFiles(const std::shared_ptr& filesystem, extern FileSystemGlobalOptions global_options; -} // namespace internal -} // namespace fs -} // namespace arrow +/// \brief Unregister filesystem factories +/// +/// For testing purposes, it can be useful to remove filesystem factories from +/// the registry. This allows a test to emulate loading an unknown filesystem +/// module even if the library has built-in support for the schemes in the module. +ARROW_EXPORT +Status UnregisterFileSystemFactory(const std::string& scheme); + +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 27ded52861eaf..327e4600825b0 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -370,7 +370,8 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length)); if (metadata->size() < metadata_length) { return Status::Invalid("Expected to read ", metadata_length, - " metadata bytes but got ", metadata->size()); + " metadata bytes at offset ", offset, " but got ", + metadata->size()); } ARROW_RETURN_NOT_OK(decoder.Consume(metadata)); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 8c4d925dac541..a48e91813e7e9 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -2232,7 +2232,7 @@ Result LoadDynamicLibrary(const char* path) { constexpr int kFlags = // All undefined symbols in the shared object are resolved before dlopen() returns. RTLD_NOW - // Symbols defined in this shared object are not made available to + // Symbols defined in this shared object are not made available to // resolve references in subsequently loaded shared objects. | RTLD_LOCAL; if (void* handle = dlopen(path, kFlags)) return handle; diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 0a082b0a5d859..6a8a022548479 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -108,6 +108,12 @@ that changing their value later will have an effect. `Logging - AWS SDK For C++ `__ +.. envvar:: ARROW_S3_THREADS + + The number of threads to configure when creating AWS' I/O event loop. + + Defaults to 1 as recommended by AWS' doc when the # of connections is + expected to be, at most, in the hundreds. .. envvar:: ARROW_TRACING_BACKEND