From 0d87d6968b9e5e2cfe707065c60e10caf732dec7 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Fri, 22 Sep 2023 11:14:06 -0700 Subject: [PATCH] Improve S3InsertTest tableWrite Plan (#6655) Summary: Simplify the test using the recent `tableWrite(const std::string& outputDirectoryPath)` API in PlanBuilder. Add fileFormat as a parameter to `tableWrite(const std::string& outputDirectoryPath)` Pull Request resolved: https://github.com/facebookincubator/velox/pull/6655 Reviewed By: laithsakka Differential Revision: D49537715 Pulled By: mbasmanova fbshipit-source-id: beb0a55104d98b44296f411c342afdf82e3450d1 --- .../s3fs/tests/S3InsertTest.cpp | 123 ++++++------------ velox/exec/tests/utils/PlanBuilder.cpp | 6 +- velox/exec/tests/utils/PlanBuilder.h | 10 +- 3 files changed, 48 insertions(+), 91 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp index 837a13fad7f4..57dfcded14c0 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp @@ -72,36 +72,6 @@ std::shared_ptr S3InsertTest::minioServer_ = nullptr; std::unique_ptr S3InsertTest::ioExecutor_ = nullptr; -PlanNodePtr createInsertPlan( - PlanBuilder& inputPlan, - const RowTypePtr& outputRowType, - const std::string_view& outputDirectoryPath, - const std::vector& partitionedBy = {}, - const std::shared_ptr& bucketProperty = {}, - const connector::hive::LocationHandle::TableType& outputTableType = - connector::hive::LocationHandle::TableType::kNew, - const CommitStrategy& outputCommitStrategy = CommitStrategy::kNoCommit) { - auto insertTableHandle = std::make_shared( - kHiveConnectorId, - HiveConnectorTestBase::makeHiveInsertTableHandle( - outputRowType->names(), - outputRowType->children(), - partitionedBy, - bucketProperty, - HiveConnectorTestBase::makeLocationHandle( - outputDirectoryPath.data(), std::nullopt, outputTableType), - FileFormat::PARQUET)); - - auto insertPlan = inputPlan.tableWrite( - inputPlan.planNode()->outputType(), - outputRowType->names(), - nullptr, - insertTableHandle, - bucketProperty != nullptr, - outputCommitStrategy); - return insertPlan.planNode(); -} - TEST_F(S3InsertTest, s3InsertTest) { const int64_t kExpectedRows = 1'000; const std::string_view kOutputDirectory{"s3://writedata/"}; @@ -118,63 +88,46 @@ TEST_F(S3InsertTest, s3InsertTest) { minioServer_->addBucket("writedata"); // Insert into s3 with one writer. - auto plan = createInsertPlan( - PlanBuilder().values({input}), rowType, kOutputDirectory); + auto plan = + PlanBuilder() + .values({input}) + .tableWrite( + kOutputDirectory.data(), dwio::common::FileFormat::PARQUET) + .planNode(); // Execute the write plan. - auto result = AssertQueryBuilder(plan).copyResults(pool()); - - // Get the fragment from the TableWriter output. - auto fragmentVector = result->childAt(TableWriteTraits::kFragmentChannel) - ->asFlatVector(); - - ASSERT(fragmentVector); - - // The fragment contains data provided by the DataSink#finish. - // This includes the target filename, rowCount, etc... - // Extract the filename, row counts, filesize. - std::vector writeFiles; - int64_t numRows{0}; - int64_t writeFileSize{0}; - for (int i = 0; i < result->size(); ++i) { - if (!fragmentVector->isNullAt(i)) { - folly::dynamic obj = folly::parseJson(fragmentVector->valueAt(i)); - ASSERT_EQ(obj["targetPath"], kOutputDirectory); - ASSERT_EQ(obj["writePath"], kOutputDirectory); - numRows += obj["rowCount"].asInt(); - - folly::dynamic writerInfoObj = obj["fileWriteInfos"][0]; - const std::string writeFileName = - writerInfoObj["writeFileName"].asString(); - const std::string writeFileFullPath = - obj["writePath"].asString() + "/" + writeFileName; - writeFiles.push_back(writeFileFullPath); - writeFileSize += writerInfoObj["fileSize"].asInt(); - } - } - - ASSERT_EQ(numRows, kExpectedRows); - ASSERT_EQ(writeFiles.size(), 1); - - // Verify that the data is written to S3 correctly by scanning the file. - auto tableScan = PlanBuilder(pool_.get()).tableScan(rowType).planNode(); - CursorParameters params; - params.planNode = tableScan; - const int numSplitsPerFile = 1; - bool noMoreSplits = false; - auto addSplits = [&](exec::Task* task) { - if (!noMoreSplits) { - auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits( - writeFiles[0], numSplitsPerFile, dwio::common::FileFormat::PARQUET); - for (const auto& split : splits) { - task->addSplit("0", exec::Split(split)); - } - task->noMoreSplits("0"); - } - noMoreSplits = true; - }; - auto scanResult = readCursor(params, addSplits); - assertEqualResults(scanResult.second, {input}); + auto results = AssertQueryBuilder(plan).copyResults(pool()); + + // First column has number of rows written in the first row and nulls in other + // rows. + auto rowCount = results->childAt(TableWriteTraits::kRowCountChannel) + ->as>(); + ASSERT_FALSE(rowCount->isNullAt(0)); + ASSERT_EQ(kExpectedRows, rowCount->valueAt(0)); + ASSERT_TRUE(rowCount->isNullAt(1)); + + // Second column contains details about written files. + auto details = results->childAt(TableWriteTraits::kFragmentChannel) + ->as>(); + ASSERT_TRUE(details->isNullAt(0)); + ASSERT_FALSE(details->isNullAt(1)); + folly::dynamic obj = folly::parseJson(details->valueAt(1)); + + ASSERT_EQ(kExpectedRows, obj["rowCount"].asInt()); + auto fileWriteInfos = obj["fileWriteInfos"]; + ASSERT_EQ(1, fileWriteInfos.size()); + + auto writeFileName = fileWriteInfos[0]["writeFileName"].asString(); + + // Read from 'writeFileName' and verify the data matches the original. + plan = PlanBuilder().tableScan(rowType).planNode(); + + auto splits = HiveConnectorTestBase::makeHiveConnectorSplits( + fmt::format("{}/{}", kOutputDirectory, writeFileName), + 1, + dwio::common::FileFormat::PARQUET); + auto copy = AssertQueryBuilder(plan).split(splits[0]).copyResults(pool()); + assertEqualResults({input}, {copy}); } int main(int argc, char** argv) { diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index ae76d502e674..4f9dec51db13 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -301,7 +301,9 @@ PlanBuilder& PlanBuilder::filter(const std::string& filter) { return *this; } -PlanBuilder& PlanBuilder::tableWrite(const std::string& outputDirectoryPath) { +PlanBuilder& PlanBuilder::tableWrite( + const std::string& outputDirectoryPath, + dwio::common::FileFormat fileFormat) { auto rowType = planNode_->outputType(); std::vector> @@ -321,7 +323,7 @@ PlanBuilder& PlanBuilder::tableWrite(const std::string& outputDirectoryPath) { auto hiveHandle = std::make_shared( columnHandles, locationHandle, - dwio::common::FileFormat::DWRF, + fileFormat, nullptr, // bucketProperty, common::CompressionKind_NONE); diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index e887947b0e93..de56e696e218 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -248,12 +248,14 @@ class PlanBuilder { /// function will skip creating a FilterNode in that case. PlanBuilder& optionalFilter(const std::string& optionalFilter); - /// Adds a TableWriteNode to write all input columns into an unpartitioned - /// unbucketed Hive table without collecting statistics using DWRF file format - /// without compression. + /// Adds a TableWriteNode to write all input columns into an un-partitioned + /// un-bucketed Hive table without collecting statistics without compression. /// /// @param outputDirectoryPath Path to a directory to write data to. - PlanBuilder& tableWrite(const std::string& outputDirectoryPath); + /// @param fileFormat File format to use for the written data. + PlanBuilder& tableWrite( + const std::string& outputDirectoryPath, + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF); /// Adds a TableWriteNode. ///