diff --git a/velox/exec/OutputBuffer.h b/velox/exec/OutputBuffer.h index ff20834f7dd1..60554051f26a 100644 --- a/velox/exec/OutputBuffer.h +++ b/velox/exec/OutputBuffer.h @@ -306,8 +306,8 @@ class OutputBuffer { DataAvailableCallback notify, DataConsumerActiveCheckCallback activeCheck); - // Continues any possibly waiting producers. Called when the - // producer task has an error or cancellation. + /// Continues any possibly waiting producers. Called when the producer task + /// has an error or cancellation. void terminate(); std::string toString(); diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 30da04c6e40c..847532cd554e 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -51,9 +51,9 @@ class OutputBufferManager { /// Returns true if the buffer exists for a given taskId, else returns false. bool updateNumDrivers(const std::string& taskId, uint32_t newNumDrivers); - // Adds data to the outgoing queue for 'destination'. 'data' must not be - // nullptr. 'data' is always added but if the buffers are full the future is - // set to a ContinueFuture that will be realized when there is space. + /// Adds data to the outgoing queue for 'destination'. 'data' must not be + /// nullptr. 'data' is always added but if the buffers are full the future is + /// set to a ContinueFuture that will be realized when there is space. bool enqueue( const std::string& taskId, int destination, @@ -62,12 +62,12 @@ class OutputBufferManager { void noMoreData(const std::string& taskId); - // Returns true if noMoreData has been called and all the accumulated data - // have been fetched and acknowledged. + /// Returns true if noMoreData has been called and all the accumulated data + /// have been fetched and acknowledged. bool isFinished(const std::string& taskId); - // Removes data with sequence number < 'sequence' from the queue for - // 'destination_'. + /// Removes data with sequence number < 'sequence' from the queue for + /// 'destination_'. void acknowledge(const std::string& taskId, int destination, int64_t sequence); diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index d86563a29d11..c1806d1bcec6 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -232,12 +232,11 @@ void PartitionedOutput::estimateRowSizes() { } void PartitionedOutput::addInput(RowVectorPtr input) { - initializeInput(std::move(input)); + traceInput(input); + initializeInput(std::move(input)); initializeDestinations(); - initializeSizeBuffers(); - estimateRowSizes(); for (auto& destination : destinations_) { diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 77213f22ca1e..783f94bc2889 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. velox_add_library(velox_query_trace_replayer_base OperatorReplayerBase.cpp - TableWriterReplayer.cpp) + TableWriterReplayer.cpp PartitionedOutputReplayer.cpp) velox_link_libraries( velox_query_trace_replayer_base velox_aggregates diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 79186bf05083..92fc3f128e56 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -33,19 +33,20 @@ OperatorReplayerBase::OperatorReplayerBase( std::string nodeId, int32_t pipelineId, std::string operatorType) - : rootDir_(std::move(rootDir)), - taskId_(std::move(taskId)), + : taskId_(std::move(taskId)), nodeId_(std::move(nodeId)), pipelineId_(pipelineId), - operatorType_(std::move(operatorType)) { + operatorType_(std::move(operatorType)), + rootDir_(std::move(rootDir)), + taskDir_(fmt::format("{}/{}", rootDir_, taskId_)), + nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)){ VELOX_USER_CHECK(!rootDir_.empty()); VELOX_USER_CHECK(!taskId_.empty()); VELOX_USER_CHECK(!nodeId_.empty()); VELOX_USER_CHECK_GE(pipelineId_, 0); VELOX_USER_CHECK(!operatorType_.empty()); - const auto traceTaskDir = fmt::format("{}/{}", rootDir_, taskId_); const auto metadataReader = exec::trace::QueryMetadataReader( - traceTaskDir, memory::MemoryManager::getInstance()->tracePool()); + taskDir_, memory::MemoryManager::getInstance()->tracePool()); metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_); queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false"; fs_ = filesystems::getFileSystem(rootDir_, nullptr); diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index f29beff6c047..09d85edf6a64 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -20,6 +20,9 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +namespace facebook::velox::exec { +class Task; +} namespace facebook::velox::tool::trace { class OperatorReplayerBase { public: @@ -42,19 +45,20 @@ class OperatorReplayerBase { const std::string& taskId, bool shortSummary); - virtual RowVectorPtr run() const = 0; + virtual RowVectorPtr run() = 0; static std::string usage(); protected: - virtual core::PlanNodePtr createPlan() const = 0; - - const std::string rootDir_; const std::string taskId_; const std::string nodeId_; const int32_t pipelineId_; const std::string operatorType_; + const std::string rootDir_; + const std::string taskDir_; + const std::string nodeDir_; + std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp new file mode 100644 index 000000000000..2acf24139c65 --- /dev/null +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -0,0 +1,170 @@ +/* +* Copyright (c) Facebook, Inc. and its affiliates. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include + +#include "velox/common/memory/Memory.h" +#include "velox/exec/PartitionedOutput.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/tool/trace/PartitionedOutputReplayer.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { +namespace { +std::shared_ptr createQueryContext( + const std::unordered_map& config, + folly::Executor* executor) { + return core::QueryCtx::create( + executor, core::QueryConfig(std::move(config))); +} + +std::function +partitionedOutputNodeFactory( + const core::PartitionedOutputNode* originalNode) { + return [=](const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) -> core::PlanNodePtr { + return std::make_shared( + nodeId, + originalNode->kind(), + originalNode->keys(), + originalNode->numPartitions(), + originalNode->isReplicateNullsAndAny(), + originalNode->partitionFunctionSpecPtr(), + originalNode->outputType(), + source); + }; +} + +std::vector> getData( + const std::shared_ptr& bufferManager, + const std::string& taskId, + int destination, + int64_t sequence, + folly::Executor* executor) { + auto [promise, semiFuture] = + folly::makePromiseContract>>(); + VELOX_CHECK(bufferManager->getData( + taskId, + destination, + exec::PartitionedOutput::kMinDestinationSize, + sequence, + [result = std::make_shared< + folly::Promise>>>( + std::move(promise))]( + std::vector> pages, + int64_t /*inSequence*/, + std::vector /*remainingBytes*/) { + result->setValue(std::move(pages)); + })); + auto future = std::move(semiFuture).via(executor); + future.wait(std::chrono::seconds{10}); + VELOX_CHECK(future.isReady()); + return std::move(future).value(); +} +} // namespace + +std::vector>> getAllData( + const std::shared_ptr& bufferManager, + const std::string& taskId, + uint32_t numPartitions, + folly::Executor* executor) { + std::vector consumerThreads; + std::vector>> partitionedResults; + consumerThreads.reserve(numPartitions); + partitionedResults.reserve(numPartitions); + partitionedResults.resize(numPartitions); + for (uint32_t i = 0; i < numPartitions; i++) { + consumerThreads.push_back(std::thread([&, partition = i]() { + bool finished{false}; + while (!finished) { + std::vector> pages; + { + pages = getData( + bufferManager, + taskId, + partition, + partitionedResults[partition].size(), + executor); + } + for (auto& page : pages) { + if (page) { + partitionedResults[partition].push_back(std::move(page)); + } else { + // Null page indicates this buffer is finished. + bufferManager->deleteResults(taskId, partition); + finished = true; + } + } + } + })); + } + + for (auto& thread : consumerThreads) { + thread.join(); + } + return partitionedResults; +} + +PartitionedOutputReplayer::PartitionedOutputReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType) + : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + originalNode_(dynamic_cast( + core::PlanNode::findFirstNode( + planFragment_.get(), + [this](const core::PlanNode* node) { + return node->id() == nodeId_; + }))) { + VELOX_CHECK_NOT_NULL(originalNode_); +} + +RowVectorPtr PartitionedOutputReplayer::run() { + auto task = Task::create( + "local://partitioned-output-replayer", + core::PlanFragment{createPlan()}, + 0, + createQueryContext(queryConfigs_, executor_.get()), + Task::ExecutionMode::kParallel); + task->start(maxDrivers_); + + auto partitionedResults = getAllData( + bufferManager_, + task->taskId(), + originalNode_->numPartitions(), + executor_.get()); + common::testutil::TestValue::adjust( + "facebook::velox::tool::PartitionedOutputReplayer::run", + &partitionedResults); + return nullptr; +} + +core::PlanNodePtr PartitionedOutputReplayer::createPlan() const { + return PlanBuilder() + .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) + .addNode( + partitionedOutputNodeFactory(originalNode_)) + .planNode(); +} + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h new file mode 100644 index 000000000000..24898bd4eb65 --- /dev/null +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { + +/// Concurrently gets all partitioned buffer content (vec) for every +/// partition. +std::vector>> getAllData( + const std::shared_ptr& bufferManager, + const std::string& taskId, + uint32_t numPartitions, + folly::Executor* executor); + +/// The replayer to replay the traced 'PartitionedOutput' operator. +class PartitionedOutputReplayer final : public OperatorReplayerBase { + public: + PartitionedOutputReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType); + + RowVectorPtr run() override; + + private: + core::PlanNodePtr createPlan() const; + + const core::PartitionedOutputNode* originalNode_; + + std::unique_ptr executor_{ + std::make_unique( + std::thread::hardware_concurrency())}; + + const std::shared_ptr bufferManager_{ + exec::OutputBufferManager::getInstance().lock()}; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/QueryReplayer.cpp index 3d85f614188b..0e271c494b1e 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/QueryReplayer.cpp @@ -16,20 +16,20 @@ #include #include + +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" -#include "velox/core/PlanNode.h" -#include "velox/exec/PartitionFunction.h" -#include "velox/type/Type.h" - -#include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "velox/core/PlanNode.h" +#include "velox/exec/PartitionFunction.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/TableWriterReplayer.h" +#include "velox/type/Type.h" DEFINE_bool(usage, false, "Show the usage"); DEFINE_string(root, "", "Root dir of the query tracing"); @@ -85,10 +85,9 @@ void init() { connector::registerConnector(hiveConnector); } -std::unique_ptr createReplayer( - const std::string& operatorType) { - std::unique_ptr replayer = nullptr; - if (operatorType == "TableWriter") { +std::unique_ptr createReplayer() { + std::unique_ptr replayer; + if (FLAGS_operator_type == "TableWriter") { replayer = std::make_unique( FLAGS_root, FLAGS_task_id, @@ -97,7 +96,7 @@ std::unique_ptr createReplayer( FLAGS_operator_type, FLAGS_table_writer_output_dir); } else { - VELOX_FAIL("Unsupported opeartor type: {}", FLAGS_operator_type); + VELOX_UNSUPPORTED("Unsupported opeartor type: {}", FLAGS_operator_type); } VELOX_USER_CHECK_NOT_NULL(replayer); return replayer; @@ -105,35 +104,30 @@ std::unique_ptr createReplayer( } // namespace int main(int argc, char** argv) { - if (argc == 1) { - LOG(ERROR) << "\n" << tool::trace::OperatorReplayerBase::usage(); - return 1; - } - gflags::ParseCommandLineFlags(&argc, &argv, true); if (FLAGS_usage) { LOG(INFO) << "\n" << tool::trace::OperatorReplayerBase::usage(); return 0; } - if (FLAGS_root.empty()) { - LOG(ERROR) << "Root dir is not provided!\n" - << tool::trace::OperatorReplayerBase::usage(); + // False usage checks + if (argc == 1) { + LOG(ERROR) << "\n" << tool::trace::OperatorReplayerBase::usage(); return 1; } + VELOX_USER_CHECK( + FLAGS_root.empty(), + "Root dir is not provided!\n{}", + tool::trace::OperatorReplayerBase::usage()); init(); + if (FLAGS_summary || FLAGS_short_summary) { tool::trace::OperatorReplayerBase::printSummary( FLAGS_root, FLAGS_task_id, FLAGS_short_summary); return 0; } - const auto replayer = createReplayer(FLAGS_operator_type); - VELOX_USER_CHECK_NOT_NULL( - replayer, "Unsupported opeartor type: {}", FLAGS_operator_type); - - replayer->run(); - + createReplayer()->run(); return 0; } diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index 43c15caeb419..3a2fbbb1de71 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -14,23 +14,20 @@ * limitations under the License. */ -#include "velox/tool/trace/TableWriterReplayer.h" - #include + #include "velox/common/memory/Memory.h" -#include "velox/exec/QueryDataReader.h" #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" -#include "velox/exec/Task.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/tool/trace/TableWriterReplayer.h" using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; namespace facebook::velox::tool::trace { - namespace { std::shared_ptr @@ -70,12 +67,32 @@ std::shared_ptr createInsertTableHanlde( connectorId, makeHiveInsertTableHandle(node, std::move(targetDir))); } +std::function +tableWriterNodeFactory( + const core::TableWriteNode* originalNode, + const std::string& targetDir) { + return [=](const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) -> core::PlanNodePtr { + const auto insertTableHandle = + createInsertTableHanlde("test-hive", originalNode, targetDir); + return std::make_shared( + nodeId, + originalNode->columns(), + originalNode->columnNames(), + originalNode->aggregationNode(), + insertTableHandle, + originalNode->hasPartitioningScheme(), + TableWriteTraits::outputType(originalNode->aggregationNode()), + originalNode->commitStrategy(), + source); + }; +} } // namespace -RowVectorPtr TableWriterReplayer::run() const { - const auto restoredPlanNode = createPlan(); +RowVectorPtr TableWriterReplayer::run() { + const auto replayPlan = createPlan(); - return AssertQueryBuilder(restoredPlanNode) + return AssertQueryBuilder(replayPlan) .maxDrivers(maxDrivers_) .configs(queryConfigs_) .connectorSessionProperties(connectorConfigs_) @@ -83,47 +100,16 @@ RowVectorPtr TableWriterReplayer::run() const { } core::PlanNodePtr TableWriterReplayer::createPlan() const { - const auto* tableWriterNode = core::PlanNode::findFirstNode( - planFragment_.get(), - [this](const core::PlanNode* node) { return node->id() == nodeId_; }); - const auto traceRoot = fmt::format("{}/{}", rootDir_, taskId_); + const auto* tableWriterNode = + dynamic_cast(core::PlanNode::findFirstNode( + planFragment_.get(), [this](const core::PlanNode* node) { + return node->id() == nodeId_; + })); + VELOX_CHECK_NOT_NULL(tableWriterNode); return PlanBuilder() - .traceScan( - fmt::format("{}/{}", traceRoot, nodeId_), - exec::trace::getDataType(planFragment_, nodeId_)) - .addNode(addTableWriter( - dynamic_cast(tableWriterNode), - replayOutputDir_)) + .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) + .addNode(tableWriterNodeFactory(tableWriterNode, replayOutputDir_)) .planNode(); } -core::PlanNodePtr TableWriterReplayer::createTableWrtierNode( - const core::TableWriteNode* node, - const std::string& targetDir, - const core::PlanNodeId& nodeId, - const core::PlanNodePtr& source) { - const auto insertTableHandle = - createInsertTableHanlde("test-hive", node, targetDir); - return std::make_shared( - nodeId, - node->columns(), - node->columnNames(), - node->aggregationNode(), - insertTableHandle, - node->hasPartitioningScheme(), - TableWriteTraits::outputType(node->aggregationNode()), - node->commitStrategy(), - source); -} - -std::function -TableWriterReplayer::addTableWriter( - const core::TableWriteNode* node, - const std::string& targetDir) { - return [=](const core::PlanNodeId& nodeId, - const core::PlanNodePtr& source) -> core::PlanNodePtr { - return createTableWrtierNode(node, targetDir, nodeId, source); - }; -} - } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index 499ec628d301..7cf44d1efa5b 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -19,10 +19,10 @@ #include #include "velox/tool/trace/OperatorReplayerBase.h" - #include "velox/core/PlanNode.h" namespace facebook::velox::tool::trace { + /// The replayer to replay the traced 'TableWriter' operator. class TableWriterReplayer final : public OperatorReplayerBase { public: @@ -38,23 +38,10 @@ class TableWriterReplayer final : public OperatorReplayerBase { VELOX_CHECK(!replayOutputDir_.empty()); } - RowVectorPtr run() const override; - - protected: - core::PlanNodePtr createPlan() const override; + RowVectorPtr run() override; private: - static core::PlanNodePtr createTableWrtierNode( - const core::TableWriteNode* node, - const std::string& targetDir, - const core::PlanNodeId& nodeId, - const core::PlanNodePtr& source); - - static std::function - addTableWriter( - const core::TableWriteNode* node, - const std::string& targetDir); - + core::PlanNodePtr createPlan() const; const std::string replayOutputDir_; }; diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index fa07996fdd68..ac071855417a 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_tool_trace_test TableWriterReplayerTest.cpp) +add_executable(velox_tool_trace_test TableWriterReplayerTest.cpp PartitionedOutputReplayerTest.cpp) add_test( NAME velox_tool_trace_test diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp new file mode 100644 index 000000000000..1b7ea551a60b --- /dev/null +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -0,0 +1,184 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "folly/dynamic.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/PartitionedOutput.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/PartitionedOutputReplayer.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace::test { +class PartitionedOutputReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + std::vector makeBatches( + vector_size_t numBatches, + std::function makeVector) { + std::vector batches; + batches.reserve(numBatches); + for (int32_t i = 0; i < numBatches; ++i) { + batches.push_back(makeVector(i)); + } + return batches; + } + + std::shared_ptr createQueryContext( + const std::unordered_map& config) { + return core::QueryCtx::create( + executor_.get(), core::QueryConfig(std::move(config))); + } + + const std::shared_ptr bufferManager_{ + exec::OutputBufferManager::getInstance().lock()}; +}; + +DEBUG_ONLY_TEST_F(PartitionedOutputReplayerTest, basic) { + struct TestParam { + std::string testName; + uint32_t numPartitions; + RowVectorPtr input; + std::string debugString() { + return fmt::format( + "testName {}, numPartitions {}, input type {}", + testName, + numPartitions, + input->toString()); + } + }; + std::vector testParams = { + // 10 partitions, 1000 row vector[int, int] + {"small-dataset", + 10, + makeRowVector( + {"key", "value"}, + {makeFlatVector(1'000, [](auto row) { return row; }), + makeFlatVector( + 1'000, [](auto row) { return row * 2; }, nullEvery(7))})}, + // 4 partitions, 80'000 row vector[int, string] with each string being + // 1024 bytes size + {"large-dataset", + 4, + makeRowVector( + {"key", "value"}, + {makeFlatVector(80'000, [](auto row) { return row; }), + makeFlatVector( + 80'000, [](auto row) { return std::string(1024, 'x'); })})}}; + + for (auto& testParam : testParams) { + SCOPED_TRACE(testParam.debugString()); + std::string planNodeId; + auto plan = + PlanBuilder() + .values({testParam.input}, false) + .partitionedOutput( + {"key"}, testParam.numPartitions, false, {"key", "value"}) + .capturePlanNodeId(planNodeId) + .planNode(); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = + fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + auto originalTask = Task::create( + fmt::format( + "local://test-partitioned-output-replayer-basic-{}", + testParam.testName), + core::PlanFragment{plan}, + 0, + createQueryContext( + {{core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, traceRoot}, + {core::QueryConfig::kQueryTraceMaxBytes, + std::to_string(100UL << 30)}, + {core::QueryConfig::kQueryTraceTaskRegExp, ".*"}, + {core::QueryConfig::kQueryTraceNodeIds, planNodeId}, + {core::QueryConfig::kMaxPartitionedOutputBufferSize, + std::to_string(8UL << 20)}, + {core::QueryConfig::kMaxOutputBufferSize, + std::to_string(8UL << 20)}}), + Task::ExecutionMode::kParallel); + originalTask->start(1); + + std::vector>> + originalPartitionedResults; + originalPartitionedResults.reserve(testParam.numPartitions); + originalPartitionedResults = getAllData( + bufferManager_, + originalTask->taskId(), + testParam.numPartitions, + executor_.get()); + + SCOPED_TESTVALUE_SET( + "facebook::velox::tool::PartitionedOutputReplayer::run", + std::function([&](void* results) { + std::vector>>* + partitionedResults = reinterpret_cast< + std::vector>>*>( + results); + ASSERT_EQ(partitionedResults->size(), testParam.numPartitions); + for (uint32_t partition = 0; partition < testParam.numPartitions; + partition++) { + const auto& originalBufList = + originalPartitionedResults.at(partition); + const auto& replayedBufList = partitionedResults->at(partition); + ASSERT_EQ(replayedBufList.size(), originalBufList.size()); + for (uint32_t i = 0; i < replayedBufList.size(); i++) { + ASSERT_EQ( + replayedBufList[i]->computeChainDataLength(), + originalBufList[i]->computeChainDataLength()); + } + } + })); + + PartitionedOutputReplayer( + traceRoot, originalTask->taskId(), planNodeId, 0, "PartitionedOutput") + .run(); + } +} +} // namespace facebook::velox::tool::trace::test diff --git a/velox/tool/trace/tests/TableWriterReplayerTest.cpp b/velox/tool/trace/tests/TableWriterReplayerTest.cpp index 7bda92ec6213..5cd6f9021c2f 100644 --- a/velox/tool/trace/tests/TableWriterReplayerTest.cpp +++ b/velox/tool/trace/tests/TableWriterReplayerTest.cpp @@ -14,32 +14,26 @@ * limitations under the License. */ -#include #include #include #include #include +#include #include "folly/dynamic.h" -#include "folly/experimental/EventCount.h" #include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/hyperloglog/SparseHll.h" -#include "velox/common/testutil/TestValue.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/QueryDataReader.h" #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" -#include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/TableWriterReplayer.h" -#include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -268,14 +262,14 @@ TEST_F(TableWriterReplayerTest, basic) { .split(makeHiveConnectorSplit(sourceFilePath->getPath())) .copyResults(pool(), task); const auto traceOutputDir = TempDirectoryPath::create(); - const auto tableWriterReplayer = TableWriterReplayer( + const auto result = TableWriterReplayer( traceRoot, task->taskId(), "1", 0, "TableWriter", - traceOutputDir->getPath()); - const auto result = tableWriterReplayer.run(); + traceOutputDir->getPath()).run(); + // Second column contains details about written files. const auto details = results->childAt(TableWriteTraits::kFragmentChannel) ->as>(); @@ -392,14 +386,13 @@ TEST_F(TableWriterReplayerTest, partitionWrite) { rowType); const auto traceOutputDir = TempDirectoryPath::create(); - const auto tableWriterReplayer = TableWriterReplayer( + TableWriterReplayer( traceRoot, task->taskId(), tableWriteNodeId, 0, "TableWriter", - traceOutputDir->getPath()); - tableWriterReplayer.run(); + traceOutputDir->getPath()).run(); actualPartitionDirectories = getLeafSubdirectories(traceOutputDir->getPath()); checkWriteResults( actualPartitionDirectories,