Skip to content

Commit

Permalink
feat: Register write filesink for abfs connector
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Dec 27, 2024
1 parent 9e418b1 commit 6e475e2
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 43 deletions.
52 changes: 47 additions & 5 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@

namespace facebook::velox::filesystems {

std::unique_ptr<AzureDataLakeFileClient> AbfsConfig::fakeWriteClient_;

class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
DataLakeFileClientWrapper(std::unique_ptr<DataLakeFileClient> client)
: client_(std::move(client)) {}

void create() override {
client_->Create();
}

Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

void append(const uint8_t* buffer, size_t size, uint64_t offset) override {
auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size);
client_->Append(bodyStream, offset);
}

void flush(uint64_t position) override {
client_->Flush(position);
}

void close() override {
// do nothing.
}

std::string getUrl() const override {
return client_->GetUrl();
}

private:
const std::unique_ptr<DataLakeFileClient> client_;
};

AbfsConfig::AbfsConfig(
std::string_view path,
const config::ConfigBase& config) {
Expand Down Expand Up @@ -123,19 +160,24 @@ std::unique_ptr<BlobClient> AbfsConfig::getReadFileClient() {
}
}

std::unique_ptr<DataLakeFileClient> AbfsConfig::getWriteFileClient() {
std::unique_ptr<AzureDataLakeFileClient> AbfsConfig::getWriteFileClient() {
if (fakeWriteClient_) {
return std::move(fakeWriteClient_);
}
std::unique_ptr<DataLakeFileClient> client;
if (authType_ == kAzureSASAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(
fmt::format("{}?{}", url, sas_));
client =
std::make_unique<DataLakeFileClient>(fmt::format("{}?{}", url, sas_));
} else if (authType_ == kAzureOAuthAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(url, tokenCredential_);
client = std::make_unique<DataLakeFileClient>(url, tokenCredential_);
} else {
return std::make_unique<DataLakeFileClient>(
client = std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
return std::make_unique<DataLakeFileClientWrapper>(std::move(client));
}

std::string AbfsConfig::getUrl(bool withblobSuffix) {
Expand Down
11 changes: 10 additions & 1 deletion velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <azure/storage/files/datalake.hpp>
#include <folly/hash/Hash.h>
#include <string>
#include "velox/connectors/hive/storage_adapters/abfs/AzureDataLakeFileClient.h"

using namespace Azure::Storage::Blobs;
using namespace Azure::Storage::Files::DataLake;
Expand Down Expand Up @@ -66,7 +67,7 @@ class AbfsConfig {

std::unique_ptr<BlobClient> getReadFileClient();

std::unique_ptr<DataLakeFileClient> getWriteFileClient();
std::unique_ptr<AzureDataLakeFileClient> getWriteFileClient();

std::string filePath() const {
return filePath_;
Expand All @@ -92,6 +93,12 @@ class AbfsConfig {
return authorityHost_;
}

/// Test only.
static void registerFakeWriteFileClient(
std::unique_ptr<AzureDataLakeFileClient> fakeClient) {
fakeWriteClient_ = std::move(fakeClient);
}

private:
std::string getUrl(bool withblobSuffix);

Expand All @@ -110,6 +117,8 @@ class AbfsConfig {
std::string tenentId_;
std::string authorityHost_;
std::shared_ptr<Azure::Core::Credentials::TokenCredential> tokenCredential_;

static std::unique_ptr<AzureDataLakeFileClient> fakeWriteClient_;
};

} // namespace facebook::velox::filesystems
36 changes: 2 additions & 34 deletions velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,6 @@
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

namespace facebook::velox::filesystems {
class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
DataLakeFileClientWrapper(std::unique_ptr<DataLakeFileClient> client)
: client_(std::move(client)) {}

void create() override {
client_->Create();
}

Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

void append(const uint8_t* buffer, size_t size, uint64_t offset) override {
auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size);
client_->Append(bodyStream, offset);
}

void flush(uint64_t position) override {
client_->Flush(position);
}

void close() override {
// do nothing.
}

private:
const std::unique_ptr<DataLakeFileClient> client_;
};

class AbfsWriteFile::Impl {
public:
Expand Down Expand Up @@ -119,10 +89,8 @@ AbfsWriteFile::AbfsWriteFile(
std::string_view path,
const config::ConfigBase& config) {
auto abfsConfig = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> clientWrapper =
std::make_unique<DataLakeFileClientWrapper>(
abfsConfig.getWriteFileClient());
impl_ = std::make_unique<Impl>(path, clientWrapper);
auto client = abfsConfig.getWriteFileClient();
impl_ = std::make_unique<Impl>(path, client);
}

AbfsWriteFile::AbfsWriteFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <stdint.h>
#include <cstddef>
#include <string>

namespace Azure::Storage::Files::DataLake::Models {
class PathProperties;
Expand All @@ -43,5 +44,6 @@ class AzureDataLakeFileClient {
virtual void append(const uint8_t* buffer, size_t size, uint64_t offset) = 0;
virtual void flush(uint64_t position) = 0;
virtual void close() = 0;
virtual std::string getUrl() const = 0;
};
} // namespace facebook::velox::filesystems
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" // @manual
#include "velox/dwio/common/FileSink.h"
#endif

namespace facebook::velox::filesystems {
Expand All @@ -34,12 +35,28 @@ std::shared_ptr<FileSystem> abfsFileSystemGenerator(
});
return filesystem;
}

std::unique_ptr<velox::dwio::common::FileSink> abfsWriteFileSinkGenerator(
const std::string& fileURI,
const velox::dwio::common::FileSink::Options& options) {
if (isAbfsFile(fileURI)) {
auto fileSystem =
filesystems::getFileSystem(fileURI, options.connectorProperties);
return std::make_unique<dwio::common::WriteFileSink>(
fileSystem->openFileForWrite(fileURI),
fileURI,
options.metricLogger,
options.stats);
}
return nullptr;
}
#endif

void registerAbfsFileSystem() {
#ifdef VELOX_ENABLE_ABFS
LOG(INFO) << "Register ABFS";
registerFileSystem(isAbfsFile, std::function(abfsFileSystemGenerator));
dwio::common::FileSink::registerFactory(
std::function(abfsWriteFileSinkGenerator));
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TEST(AbfsConfigTest, clientSecretOAuth) {
// GetUrl retrieves the value from the internal blob client, which represents
// the blob's path as well.
EXPECT_EQ(
writeClient->GetUrl(),
writeClient->getUrl(),
"https://efg.blob.core.windows.net/abc/file/test.txt");
}

Expand All @@ -108,7 +108,7 @@ TEST(AbfsConfigTest, sasToken) {
// GetUrl retrieves the value from the internal blob client, which represents
// the blob's path as well.
EXPECT_EQ(
writeClient->GetUrl(),
writeClient->getUrl(),
"http://bar.blob.core.windows.net/abc/file?sas=test");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
#include <random>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h"
#include "velox/connectors/hive/storage_adapters/abfs/tests/MockDataLakeFileClient.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/exec/tests/utils/PortUtil.h"
#include "velox/exec/tests/utils/TempFilePath.h"

Expand Down Expand Up @@ -264,3 +267,23 @@ TEST_F(AbfsFileSystemTest, credNotFOund) {
abfs_->openFileForRead(abfsFile),
"Config fs.azure.account.key.test1.dfs.core.windows.net not found");
}

TEST_F(AbfsFileSystemTest, registerAbfsFileSink) {
static const std::vector<std::string> paths = {
"abfs://[email protected]/test",
"abfss://[email protected]/test"};
std::unordered_map<std::string, std::string> config(
{{"fs.azure.account.key.test.dfs.core.windows.net", "NDU2"}});
auto hiveConfig =
std::make_shared<const config::ConfigBase>(std::move(config));
for (const auto& path : paths) {
AbfsConfig::registerFakeWriteFileClient(
std::make_unique<MockDataLakeFileClient>());
auto sink = dwio::common::FileSink::create(
path, {.connectorProperties = hiveConfig});
auto writeFileSink = dynamic_cast<dwio::common::WriteFileSink*>(sink.get());
auto writeFile =
dynamic_cast<AbfsWriteFile*>(writeFileSink->toWriteFile().get());
ASSERT_TRUE(writeFile != nullptr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class MockDataLakeFileClient : public AzureDataLakeFileClient {

void close() override;

std::string getUrl() const override {
return "fakeUrl";
}

// for testing purpose to verify the written content if correct.
std::string readContent() {
std::ifstream inputFile(filePath_);
Expand Down

0 comments on commit 6e475e2

Please sign in to comment.