Skip to content

Commit

Permalink
Improve S3InsertTest tableWrite Plan (facebookincubator#6655)
Browse files Browse the repository at this point in the history
Summary:
Simplify the test using the recent `tableWrite(const std::string& outputDirectoryPath)` API in PlanBuilder.
Add fileFormat as a parameter to `tableWrite(const std::string& outputDirectoryPath)`

Pull Request resolved: facebookincubator#6655

Reviewed By: laithsakka

Differential Revision: D49537715

Pulled By: mbasmanova

fbshipit-source-id: beb0a55104d98b44296f411c342afdf82e3450d1
  • Loading branch information
majetideepak authored and facebook-github-bot committed Sep 22, 2023
1 parent b7a4d3a commit 0d87d69
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 91 deletions.
123 changes: 38 additions & 85 deletions velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,36 +72,6 @@ std::shared_ptr<MinioServer> S3InsertTest::minioServer_ = nullptr;
std::unique_ptr<folly::IOThreadPoolExecutor> S3InsertTest::ioExecutor_ =
nullptr;

PlanNodePtr createInsertPlan(
PlanBuilder& inputPlan,
const RowTypePtr& outputRowType,
const std::string_view& outputDirectoryPath,
const std::vector<std::string>& partitionedBy = {},
const std::shared_ptr<HiveBucketProperty>& bucketProperty = {},
const connector::hive::LocationHandle::TableType& outputTableType =
connector::hive::LocationHandle::TableType::kNew,
const CommitStrategy& outputCommitStrategy = CommitStrategy::kNoCommit) {
auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
HiveConnectorTestBase::makeHiveInsertTableHandle(
outputRowType->names(),
outputRowType->children(),
partitionedBy,
bucketProperty,
HiveConnectorTestBase::makeLocationHandle(
outputDirectoryPath.data(), std::nullopt, outputTableType),
FileFormat::PARQUET));

auto insertPlan = inputPlan.tableWrite(
inputPlan.planNode()->outputType(),
outputRowType->names(),
nullptr,
insertTableHandle,
bucketProperty != nullptr,
outputCommitStrategy);
return insertPlan.planNode();
}

TEST_F(S3InsertTest, s3InsertTest) {
const int64_t kExpectedRows = 1'000;
const std::string_view kOutputDirectory{"s3://writedata/"};
Expand All @@ -118,63 +88,46 @@ TEST_F(S3InsertTest, s3InsertTest) {
minioServer_->addBucket("writedata");

// Insert into s3 with one writer.
auto plan = createInsertPlan(
PlanBuilder().values({input}), rowType, kOutputDirectory);
auto plan =
PlanBuilder()
.values({input})
.tableWrite(
kOutputDirectory.data(), dwio::common::FileFormat::PARQUET)
.planNode();

// Execute the write plan.
auto result = AssertQueryBuilder(plan).copyResults(pool());

// Get the fragment from the TableWriter output.
auto fragmentVector = result->childAt(TableWriteTraits::kFragmentChannel)
->asFlatVector<StringView>();

ASSERT(fragmentVector);

// The fragment contains data provided by the DataSink#finish.
// This includes the target filename, rowCount, etc...
// Extract the filename, row counts, filesize.
std::vector<std::string> writeFiles;
int64_t numRows{0};
int64_t writeFileSize{0};
for (int i = 0; i < result->size(); ++i) {
if (!fragmentVector->isNullAt(i)) {
folly::dynamic obj = folly::parseJson(fragmentVector->valueAt(i));
ASSERT_EQ(obj["targetPath"], kOutputDirectory);
ASSERT_EQ(obj["writePath"], kOutputDirectory);
numRows += obj["rowCount"].asInt();

folly::dynamic writerInfoObj = obj["fileWriteInfos"][0];
const std::string writeFileName =
writerInfoObj["writeFileName"].asString();
const std::string writeFileFullPath =
obj["writePath"].asString() + "/" + writeFileName;
writeFiles.push_back(writeFileFullPath);
writeFileSize += writerInfoObj["fileSize"].asInt();
}
}

ASSERT_EQ(numRows, kExpectedRows);
ASSERT_EQ(writeFiles.size(), 1);

// Verify that the data is written to S3 correctly by scanning the file.
auto tableScan = PlanBuilder(pool_.get()).tableScan(rowType).planNode();
CursorParameters params;
params.planNode = tableScan;
const int numSplitsPerFile = 1;
bool noMoreSplits = false;
auto addSplits = [&](exec::Task* task) {
if (!noMoreSplits) {
auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits(
writeFiles[0], numSplitsPerFile, dwio::common::FileFormat::PARQUET);
for (const auto& split : splits) {
task->addSplit("0", exec::Split(split));
}
task->noMoreSplits("0");
}
noMoreSplits = true;
};
auto scanResult = readCursor(params, addSplits);
assertEqualResults(scanResult.second, {input});
auto results = AssertQueryBuilder(plan).copyResults(pool());

// First column has number of rows written in the first row and nulls in other
// rows.
auto rowCount = results->childAt(TableWriteTraits::kRowCountChannel)
->as<FlatVector<int64_t>>();
ASSERT_FALSE(rowCount->isNullAt(0));
ASSERT_EQ(kExpectedRows, rowCount->valueAt(0));
ASSERT_TRUE(rowCount->isNullAt(1));

// Second column contains details about written files.
auto details = results->childAt(TableWriteTraits::kFragmentChannel)
->as<FlatVector<StringView>>();
ASSERT_TRUE(details->isNullAt(0));
ASSERT_FALSE(details->isNullAt(1));
folly::dynamic obj = folly::parseJson(details->valueAt(1));

ASSERT_EQ(kExpectedRows, obj["rowCount"].asInt());
auto fileWriteInfos = obj["fileWriteInfos"];
ASSERT_EQ(1, fileWriteInfos.size());

auto writeFileName = fileWriteInfos[0]["writeFileName"].asString();

// Read from 'writeFileName' and verify the data matches the original.
plan = PlanBuilder().tableScan(rowType).planNode();

auto splits = HiveConnectorTestBase::makeHiveConnectorSplits(
fmt::format("{}/{}", kOutputDirectory, writeFileName),
1,
dwio::common::FileFormat::PARQUET);
auto copy = AssertQueryBuilder(plan).split(splits[0]).copyResults(pool());
assertEqualResults({input}, {copy});
}

int main(int argc, char** argv) {
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ PlanBuilder& PlanBuilder::filter(const std::string& filter) {
return *this;
}

PlanBuilder& PlanBuilder::tableWrite(const std::string& outputDirectoryPath) {
PlanBuilder& PlanBuilder::tableWrite(
const std::string& outputDirectoryPath,
dwio::common::FileFormat fileFormat) {
auto rowType = planNode_->outputType();

std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
Expand All @@ -321,7 +323,7 @@ PlanBuilder& PlanBuilder::tableWrite(const std::string& outputDirectoryPath) {
auto hiveHandle = std::make_shared<connector::hive::HiveInsertTableHandle>(
columnHandles,
locationHandle,
dwio::common::FileFormat::DWRF,
fileFormat,
nullptr, // bucketProperty,
common::CompressionKind_NONE);

Expand Down
10 changes: 6 additions & 4 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,14 @@ class PlanBuilder {
/// function will skip creating a FilterNode in that case.
PlanBuilder& optionalFilter(const std::string& optionalFilter);

/// Adds a TableWriteNode to write all input columns into an unpartitioned
/// unbucketed Hive table without collecting statistics using DWRF file format
/// without compression.
/// Adds a TableWriteNode to write all input columns into an un-partitioned
/// un-bucketed Hive table without collecting statistics without compression.
///
/// @param outputDirectoryPath Path to a directory to write data to.
PlanBuilder& tableWrite(const std::string& outputDirectoryPath);
/// @param fileFormat File format to use for the written data.
PlanBuilder& tableWrite(
const std::string& outputDirectoryPath,
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF);

/// Adds a TableWriteNode.
///
Expand Down

0 comments on commit 0d87d69

Please sign in to comment.