From cde3d567e14a6a004fd1b25769822170bf80053e Mon Sep 17 00:00:00 2001 From: duanmeng Date: Mon, 7 Oct 2024 17:26:57 +0800 Subject: [PATCH] Add HiveConnectorSplit Serde --- velox/connectors/Connector.h | 7 +- velox/connectors/hive/CMakeLists.txt | 1 + velox/connectors/hive/HiveConnectorSplit.cpp | 166 ++++++++++++++++++ velox/connectors/hive/HiveConnectorSplit.h | 23 +-- .../hive/tests/HiveConnectorSerDeTest.cpp | 76 ++++++++ 5 files changed, 257 insertions(+), 16 deletions(-) create mode 100644 velox/connectors/hive/HiveConnectorSplit.cpp diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 6aa1b55c9e2c..acbde20913d2 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -48,7 +48,7 @@ class DataSource; /// A split represents a chunk of data that a connector should load and return /// as a RowVectorPtr, potentially after processing pushdowns. -struct ConnectorSplit { +struct ConnectorSplit : public ISerializable { const std::string connectorId; const int64_t splitWeight{0}; @@ -59,6 +59,11 @@ struct ConnectorSplit { int64_t _splitWeight = 0) : connectorId(_connectorId), splitWeight(_splitWeight) {} + folly::dynamic serialize() const override { + VELOX_UNSUPPORTED(); + return nullptr; + } + virtual ~ConnectorSplit() {} virtual std::string toString() const { diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 8beee704f79e..2e137dbd6c29 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -24,6 +24,7 @@ velox_add_library( HiveConfig.cpp HiveConnector.cpp HiveConnectorUtil.cpp + HiveConnectorSplit.cpp HiveDataSink.cpp HiveDataSource.cpp HivePartitionUtil.cpp diff --git a/velox/connectors/hive/HiveConnectorSplit.cpp b/velox/connectors/hive/HiveConnectorSplit.cpp new file mode 100644 index 000000000000..ed69eb7d6303 --- /dev/null +++ b/velox/connectors/hive/HiveConnectorSplit.cpp @@ -0,0 +1,166 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +namespace facebook::velox::connector::hive { + +std::string HiveConnectorSplit::toString() const { + if (tableBucketNumber.has_value()) { + return fmt::format( + "Hive: {} {} - {} {}", + filePath, + start, + length, + tableBucketNumber.value()); + } + return fmt::format("Hive: {} {} - {}", filePath, start, length); +} + +std::string HiveConnectorSplit::getFileName() const { + const auto i = filePath.rfind('/'); + return i == std::string::npos ? filePath : filePath.substr(i + 1); +} + +folly::dynamic HiveConnectorSplit::serialize() const { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "HiveConnectorSplit"; + obj["connectorId"] = connectorId; + obj["splitWeight"] = splitWeight; + obj["filePath"] = filePath; + obj["fileFormat"] = dwio::common::toString(fileFormat); + obj["start"] = start; + obj["length"] = length; + + folly::dynamic partitionKeysObj = folly::dynamic::object; + for (const auto& [key, value] : partitionKeys) { + partitionKeysObj[key] = + value.has_value() ? folly::dynamic(value.value()) : nullptr; + } + obj["partitionKeys"] = partitionKeysObj; + + obj["tableBucketNumber"] = tableBucketNumber.has_value() + ? folly::dynamic(tableBucketNumber.value()) + : nullptr; + + folly::dynamic customSplitInfoObj = folly::dynamic::object; + for (const auto& [key, value] : customSplitInfo) { + customSplitInfoObj[key] = value; + } + obj["customSplitInfo"] = customSplitInfoObj; + obj["extraFileInfo"] = *extraFileInfo; + + folly::dynamic serdeParametersObj = folly::dynamic::object; + for (const auto& [key, value] : serdeParameters) { + serdeParametersObj[key] = value; + } + obj["serdeParameters"] = serdeParametersObj; + + folly::dynamic infoColumnsObj = folly::dynamic::object; + for (const auto& [key, value] : infoColumns) { + infoColumnsObj[key] = value; + } + obj["infoColumns"] = infoColumnsObj; + + if (properties.has_value()) { + folly::dynamic propertiesObj = folly::dynamic::object; + propertiesObj["fileSize"] = properties->fileSize.has_value() + ? folly::dynamic(properties->fileSize.value()) + : nullptr; + propertiesObj["modificationTime"] = properties->modificationTime.has_value() + ? folly::dynamic(properties->modificationTime.value()) + : nullptr; + obj["properties"] = propertiesObj; + } else { + obj["properties"] = nullptr; + } + + return obj; +} + +// static +std::shared_ptr HiveConnectorSplit::create( + const folly::dynamic& obj) { + const auto connectorId = obj["connectorId"].asString(); + const auto splitWeight = obj["splitWeight"].asInt(); + const auto filePath = obj["filePath"].asString(); + const auto fileFormat = + dwio::common::toFileFormat(obj["fileFormat"].asString()); + const auto start = static_cast(obj["start"].asInt()); + const auto length = static_cast(obj["length"].asInt()); + + std::unordered_map> partitionKeys; + for (const auto& [key, value] : obj["partitionKeys"].items()) { + partitionKeys[key.asString()] = value.isNull() + ? std::nullopt + : std::optional(value.asString()); + } + + const auto tableBucketNumber = obj["tableBucketNumber"].isNull() + ? std::nullopt + : std::optional(obj["tableBucketNumber"].asInt()); + + std::unordered_map customSplitInfo; + for (const auto& [key, value] : obj["customSplitInfo"].items()) { + customSplitInfo[key.asString()] = value.asString(); + } + + std::shared_ptr extraFileInfo = + std::make_shared(obj["extraFileInfo"].asString()); + std::unordered_map serdeParameters; + for (const auto& [key, value] : obj["serdeParameters"].items()) { + serdeParameters[key.asString()] = value.asString(); + } + + std::unordered_map infoColumns; + for (const auto& [key, value] : obj["infoColumns"].items()) { + infoColumns[key.asString()] = value.asString(); + } + + std::optional properties = std::nullopt; + const auto propertiesObj = obj["properties"]; + if (!propertiesObj.isNull()) { + properties = FileProperties{ + propertiesObj["fileSize"].isNull() + ? std::nullopt + : std::optional(propertiesObj["fileSize"].asInt()), + propertiesObj["modificationTime"].isNull() + ? std::nullopt + : std::optional(propertiesObj["modificationTime"].asInt())}; + } + + return std::make_shared( + connectorId, + filePath, + fileFormat, + start, + length, + partitionKeys, + tableBucketNumber, + customSplitInfo, + extraFileInfo, + serdeParameters, + splitWeight, + infoColumns, + properties); +} + +// static +void HiveConnectorSplit::registerSerDe() { + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("HiveConnectorSplit", HiveConnectorSplit::create); +} +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 9c011ef3aea8..c86c9ef9c2da 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -90,22 +90,15 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { infoColumns(_infoColumns), properties(_properties) {} - std::string toString() const override { - if (tableBucketNumber.has_value()) { - return fmt::format( - "Hive: {} {} - {} {}", - filePath, - start, - length, - tableBucketNumber.value()); - } - return fmt::format("Hive: {} {} - {}", filePath, start, length); - } + std::string toString() const override; - std::string getFileName() const { - auto i = filePath.rfind('/'); - return i == std::string::npos ? filePath : filePath.substr(i + 1); - } + std::string getFileName() const; + + folly::dynamic serialize() const override; + + static std::shared_ptr create(const folly::dynamic& obj); + + static void registerSerDe(); }; } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp index c21fa633c117..f44bf9348fe4 100644 --- a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp @@ -37,6 +37,7 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase { HiveInsertTableHandle::registerSerDe(); HiveBucketProperty::registerSerDe(); HiveSortingColumn::registerSerDe(); + HiveConnectorSplit::registerSerDe(); } template @@ -63,6 +64,44 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase { ASSERT_TRUE(filter->testingEquals(*cloneFilters.at(subfield))); } } + + static void testSerde(const HiveConnectorSplit& split) { + const auto str = split.toString(); + const auto obj = split.serialize(); + const auto clone = ISerializable::deserialize(obj); + ASSERT_EQ(clone->toString(), str); + ASSERT_EQ(split.partitionKeys.size(), clone->partitionKeys.size()); + for (const auto& [key, value] : split.partitionKeys) { + ASSERT_EQ(value, clone->partitionKeys.at(key)); + } + + ASSERT_EQ(split.tableBucketNumber, clone->tableBucketNumber); + ASSERT_EQ(split.customSplitInfo.size(), clone->customSplitInfo.size()); + for (const auto& [key, value] : split.customSplitInfo) { + ASSERT_EQ(value, clone->customSplitInfo.at(key)); + } + + ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size()); + for (const auto& [key, value] : split.serdeParameters) { + ASSERT_EQ(value, clone->serdeParameters.at(key)); + } + + ASSERT_EQ(split.infoColumns.size(), clone->infoColumns.size()); + for (const auto& [key, value] : split.infoColumns) { + ASSERT_EQ(value, clone->infoColumns.at(key)); + } + + if (split.properties.has_value()) { + ASSERT_TRUE(clone->properties.has_value()); + ASSERT_EQ(split.properties->fileSize, clone->properties->fileSize); + ASSERT_EQ( + split.properties->modificationTime, + clone->properties->modificationTime); + } else { + ASSERT_FALSE(clone->properties.has_value()); + } + } }; TEST_F(HiveConnectorSerDeTest, hiveTableHandle) { @@ -157,5 +196,42 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) { testSerde(*hiveInsertTableHandle); } +TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { + const auto connectorId = "testSerde"; + constexpr auto splitWeight = 1; + constexpr auto filePath = "/testSerde/p"; + constexpr auto fileFormat = dwio::common::FileFormat::DWRF; + constexpr auto start = 0; + constexpr auto length = 1024; + const std::unordered_map> + partitionKeys{{"p0", "0"}, {"p1", "1"}}; + constexpr auto tableBucketNumber = std::optional(4); + const std::unordered_map customSplitInfo{ + {"s0", "0"}, {"s1", "1"}}; + const auto extraFileInfo = std::make_shared("testSerdeFileInfo"); + const std::unordered_map serdeParameters{ + {"k1", "1"}, {"k2", "v2"}}; + const std::unordered_map infoColumns{ + {"c0", "0"}, {"c1", "1"}}; + FileProperties fileProperties{ + .fileSize = 2048, .modificationTime = std::nullopt}; + const auto properties = std::optional(fileProperties); + const auto split = HiveConnectorSplit( + connectorId, + filePath, + fileFormat, + start, + length, + partitionKeys, + tableBucketNumber, + customSplitInfo, + extraFileInfo, + serdeParameters, + splitWeight, + infoColumns, + std::nullopt); + testSerde(split); +} + } // namespace } // namespace facebook::velox::connector::hive::test