From f051233d9a26cb20fecef364959c2a14df4a7c88 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Sun, 6 Oct 2024 17:00:27 +0800 Subject: [PATCH] Add HashAggregation Replayer --- velox/exec/HashAggregation.cpp | 1 + velox/exec/tests/QueryTraceTest.cpp | 1 + velox/tool/trace/AggregationReplayer.cpp | 44 ++++ velox/tool/trace/AggregationReplayer.h | 45 ++++ velox/tool/trace/CMakeLists.txt | 2 +- velox/tool/trace/OperatorReplayerBase.cpp | 33 ++- velox/tool/trace/OperatorReplayerBase.h | 15 +- velox/tool/trace/QueryReplayer.cpp | 18 +- velox/tool/trace/TableWriterReplayer.cpp | 61 +----- velox/tool/trace/TableWriterReplayer.h | 18 +- .../trace/tests/AggregationReplayerTest.cpp | 207 ++++++++++++++++++ velox/tool/trace/tests/CMakeLists.txt | 3 +- 12 files changed, 371 insertions(+), 77 deletions(-) create mode 100644 velox/tool/trace/AggregationReplayer.cpp create mode 100644 velox/tool/trace/AggregationReplayer.h create mode 100644 velox/tool/trace/tests/AggregationReplayerTest.cpp diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 05c2f73c5fd2..ae38ba9816cb 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -117,6 +117,7 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { } void HashAggregation::addInput(RowVectorPtr input) { + traceInput(input); if (!pushdownChecked_) { mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this); pushdownChecked_ = true; diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/QueryTraceTest.cpp index ccecdf38f542..0f7671672214 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/QueryTraceTest.cpp @@ -288,6 +288,7 @@ TEST_F(QueryTracerTest, task) { } } testSettings[]{{".*", 1}, {"test_cursor .*", 1}, {"xxx_yyy \\d+", 0}}; for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); const auto outputDir = TempDirectoryPath::create(); const auto expectedQueryConfigs = std::unordered_map{ diff --git a/velox/tool/trace/AggregationReplayer.cpp b/velox/tool/trace/AggregationReplayer.cpp new file mode 100644 index 000000000000..54041bf8bbcb --- /dev/null +++ b/velox/tool/trace/AggregationReplayer.cpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/AggregationReplayer.h" +#include "velox/exec/QueryDataReader.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { +core::PlanNodePtr AggregationReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const { + const auto* aggregationNode = + dynamic_cast(node); + return std::make_shared( + nodeId, + aggregationNode->step(), + aggregationNode->groupingKeys(), + aggregationNode->preGroupedKeys(), + aggregationNode->aggregateNames(), + aggregationNode->aggregates(), + aggregationNode->globalGroupingSets(), + aggregationNode->groupId(), + aggregationNode->ignoreNullKeys(), + source); +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/AggregationReplayer.h b/velox/tool/trace/AggregationReplayer.h new file mode 100644 index 000000000000..21ce2c409acf --- /dev/null +++ b/velox/tool/trace/AggregationReplayer.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { +/// The replayer to replay the traced 'HashAggregation' operator. +class AggregationReplayer : public OperatorReplayerBase { + public: + AggregationReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType) + : OperatorReplayerBase( + rootDir, + taskId, + nodeId, + pipelineId, + operatorType) {} + + private: + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const override; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 77213f22ca1e..618a69c75720 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. velox_add_library(velox_query_trace_replayer_base OperatorReplayerBase.cpp - TableWriterReplayer.cpp) + TableWriterReplayer.cpp AggregationReplayer.cpp) velox_link_libraries( velox_query_trace_replayer_base velox_aggregates diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 79186bf05083..ba37cd140769 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -21,12 +21,13 @@ #include "velox/exec/QueryMetadataReader.h" #include "velox/exec/QueryTraceTraits.h" #include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/tool/trace/OperatorReplayerBase.h" using namespace facebook::velox; namespace facebook::velox::tool::trace { - OperatorReplayerBase::OperatorReplayerBase( std::string rootDir, std::string taskId, @@ -53,6 +54,36 @@ OperatorReplayerBase::OperatorReplayerBase( exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_); } +RowVectorPtr OperatorReplayerBase::run() const { + const auto restoredPlanNode = createPlan(); + return exec::test::AssertQueryBuilder(restoredPlanNode) + .maxDrivers(maxDrivers_) + .configs(queryConfigs_) + .connectorSessionProperties(connectorConfigs_) + .copyResults(memory::MemoryManager::getInstance()->tracePool()); +} + +core::PlanNodePtr OperatorReplayerBase::createPlan() const { + const auto* replayNode = core::PlanNode::findFirstNode( + planFragment_.get(), + [this](const core::PlanNode* node) { return node->id() == nodeId_; }); + const auto traceDir = fmt::format("{}/{}", rootDir_, taskId_); + return exec::test::PlanBuilder() + .traceScan( + fmt::format("{}/{}", traceDir, nodeId_), + exec::trace::getDataType(planFragment_, nodeId_)) + .addNode(addReplayNode(replayNode)) + .planNode(); +} + +std::function +OperatorReplayerBase::addReplayNode(const core::PlanNode* node) const { + return [=](const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) -> core::PlanNodePtr { + return createPlanNode(node, nodeId, source); + }; +} + void OperatorReplayerBase::printSummary( const std::string& rootDir, const std::string& taskId, diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index f29beff6c047..77f45ff41ca8 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -16,7 +16,6 @@ #pragma once -#include #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" @@ -37,17 +36,25 @@ class OperatorReplayerBase { OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept = delete; + RowVectorPtr run() const; + static void printSummary( const std::string& rootDir, const std::string& taskId, bool shortSummary); - virtual RowVectorPtr run() const = 0; - static std::string usage(); protected: - virtual core::PlanNodePtr createPlan() const = 0; + virtual core::PlanNodePtr createPlan() const; + + virtual std::function + addReplayNode(const core::PlanNode* node) const; + + virtual core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const = 0; const std::string rootDir_; const std::string taskId_; diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/QueryReplayer.cpp index 3d85f614188b..2067f2b543b7 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/QueryReplayer.cpp @@ -16,20 +16,21 @@ #include #include + +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" -#include "velox/core/PlanNode.h" -#include "velox/exec/PartitionFunction.h" -#include "velox/type/Type.h" - -#include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "velox/core/PlanNode.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/tool/trace/AggregationReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/TableWriterReplayer.h" +#include "velox/type/Type.h" DEFINE_bool(usage, false, "Show the usage"); DEFINE_string(root, "", "Root dir of the query tracing"); @@ -96,6 +97,13 @@ std::unique_ptr createReplayer( FLAGS_pipeline_id, FLAGS_operator_type, FLAGS_table_writer_output_dir); + } else if (operatorType == "Aggregation") { + replayer = std::make_unique( + FLAGS_root, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type); } else { VELOX_FAIL("Unsupported opeartor type: {}", FLAGS_operator_type); } diff --git a/velox/tool/trace/TableWriterReplayer.cpp b/velox/tool/trace/TableWriterReplayer.cpp index 43c15caeb419..fb827efef05c 100644 --- a/velox/tool/trace/TableWriterReplayer.cpp +++ b/velox/tool/trace/TableWriterReplayer.cpp @@ -15,9 +15,6 @@ */ #include "velox/tool/trace/TableWriterReplayer.h" - -#include -#include "velox/common/memory/Memory.h" #include "velox/exec/QueryDataReader.h" #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/TableWriter.h" @@ -72,58 +69,22 @@ std::shared_ptr createInsertTableHanlde( } // namespace -RowVectorPtr TableWriterReplayer::run() const { - const auto restoredPlanNode = createPlan(); - - return AssertQueryBuilder(restoredPlanNode) - .maxDrivers(maxDrivers_) - .configs(queryConfigs_) - .connectorSessionProperties(connectorConfigs_) - .copyResults(memory::MemoryManager::getInstance()->tracePool()); -} - -core::PlanNodePtr TableWriterReplayer::createPlan() const { - const auto* tableWriterNode = core::PlanNode::findFirstNode( - planFragment_.get(), - [this](const core::PlanNode* node) { return node->id() == nodeId_; }); - const auto traceRoot = fmt::format("{}/{}", rootDir_, taskId_); - return PlanBuilder() - .traceScan( - fmt::format("{}/{}", traceRoot, nodeId_), - exec::trace::getDataType(planFragment_, nodeId_)) - .addNode(addTableWriter( - dynamic_cast(tableWriterNode), - replayOutputDir_)) - .planNode(); -} - -core::PlanNodePtr TableWriterReplayer::createTableWrtierNode( - const core::TableWriteNode* node, - const std::string& targetDir, +core::PlanNodePtr TableWriterReplayer::createPlanNode( + const core::PlanNode* node, const core::PlanNodeId& nodeId, - const core::PlanNodePtr& source) { + const core::PlanNodePtr& source) const { + const auto* tableWriterNode = dynamic_cast(node); const auto insertTableHandle = - createInsertTableHanlde("test-hive", node, targetDir); + createInsertTableHanlde("test-hive", tableWriterNode, replayOutputDir_); return std::make_shared( nodeId, - node->columns(), - node->columnNames(), - node->aggregationNode(), + tableWriterNode->columns(), + tableWriterNode->columnNames(), + tableWriterNode->aggregationNode(), insertTableHandle, - node->hasPartitioningScheme(), - TableWriteTraits::outputType(node->aggregationNode()), - node->commitStrategy(), + tableWriterNode->hasPartitioningScheme(), + TableWriteTraits::outputType(tableWriterNode->aggregationNode()), + tableWriterNode->commitStrategy(), source); } - -std::function -TableWriterReplayer::addTableWriter( - const core::TableWriteNode* node, - const std::string& targetDir) { - return [=](const core::PlanNodeId& nodeId, - const core::PlanNodePtr& source) -> core::PlanNodePtr { - return createTableWrtierNode(node, targetDir, nodeId, source); - }; -} - } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index 499ec628d301..e88d94a259af 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -19,7 +19,6 @@ #include #include "velox/tool/trace/OperatorReplayerBase.h" - #include "velox/core/PlanNode.h" namespace facebook::velox::tool::trace { @@ -38,22 +37,11 @@ class TableWriterReplayer final : public OperatorReplayerBase { VELOX_CHECK(!replayOutputDir_.empty()); } - RowVectorPtr run() const override; - - protected: - core::PlanNodePtr createPlan() const override; - private: - static core::PlanNodePtr createTableWrtierNode( - const core::TableWriteNode* node, - const std::string& targetDir, + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, const core::PlanNodeId& nodeId, - const core::PlanNodePtr& source); - - static std::function - addTableWriter( - const core::TableWriteNode* node, - const std::string& targetDir); + const core::PlanNodePtr& source) const override; const std::string replayOutputDir_; }; diff --git a/velox/tool/trace/tests/AggregationReplayerTest.cpp b/velox/tool/trace/tests/AggregationReplayerTest.cpp new file mode 100644 index 000000000000..fa575de301bd --- /dev/null +++ b/velox/tool/trace/tests/AggregationReplayerTest.cpp @@ -0,0 +1,207 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include "folly/experimental/EventCount.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/common/testutil/TestValue.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/QueryDataReader.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/tests/utils/ArbitratorTestUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/AggregationReplayer.h" +#include "velox/tool/trace/TableWriterReplayer.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::common::testutil; +using namespace facebook::velox::common::hll; + +namespace facebook::velox::tool::trace::test { +class AggregationReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + struct PlanWithName { + const std::string name; + const core::PlanNodePtr plan; + + PlanWithName(std::string _name, core::PlanNodePtr _plan) + : name(std::move(_name)), plan(std::move(_plan)) {} + }; + + std::vector generateKeyTypes(int32_t numKeys) { + std::vector types; + types.reserve(numKeys); + for (auto i = 0; i < numKeys; ++i) { + types.push_back(vectorFuzzer_.randType(0 /*maxDepth*/)); + } + return types; + } + + std::vector generateInput( + const std::vector& keyNames, + const std::vector& keyTypes) { + std::vector names = keyNames; + std::vector types = keyTypes; + + // Add up to 3 payload columns. + const auto numPayload = randInt(1, 3); + for (auto i = 0; i < numPayload; ++i) { + names.push_back(fmt::format("c{}", i + keyNames.size())); + types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); + } + + const auto inputType = ROW(std::move(names), std::move(types)); + std::vector input; + for (auto i = 0; i < 10; ++i) { + input.push_back(vectorFuzzer_.fuzzInputRow(inputType)); + } + return input; + } + + std::vector makeNames(const std::string& prefix, size_t n) { + std::vector names; + names.reserve(n); + for (auto i = 0; i < n; ++i) { + names.push_back(fmt::format("{}{}", prefix, i)); + } + return names; + } + + std::vector aggregatePlans(const RowTypePtr& rowType) { + const std::vector aggregates{ + "count(1)", "min(c2)", "count(c2),"}; + std::vector plans; + // Single aggregation plan. + plans.emplace_back( + "Single", + PlanBuilder() + .tableScan(rowType) + .singleAggregation(groupingKeys_, aggregates, {}) + .capturePlanNodeId(traceNodeId_) + .planNode()); + // Partial -> final aggregation plan. + plans.emplace_back( + "Partial-Final", + PlanBuilder() + .tableScan(rowType) + .partialAggregation(groupingKeys_, aggregates, {}) + .capturePlanNodeId(traceNodeId_) + .finalAggregation() + .planNode()); + // Partial -> intermediate -> final aggregation plan. + plans.emplace_back( + "Partial-Intermediate-Final", + PlanBuilder() + .tableScan(rowType) + .partialAggregation(groupingKeys_, aggregates, {}) + .capturePlanNodeId(traceNodeId_) + .intermediateAggregation() + .finalAggregation() + .planNode()); + return plans; + } + + int32_t randInt(int32_t min, int32_t max) { + return boost::random::uniform_int_distribution(min, max)(rng_); + } + + static VectorFuzzer::Options getFuzzerOptions() { + VectorFuzzer::Options opts; + opts.vectorSize = 1000; + opts.stringVariableLength = true; + opts.stringLength = 100; + opts.nullRatio = 0.2; + return opts; + } + + core::PlanNodeId traceNodeId_; + VectorFuzzer vectorFuzzer_{getFuzzerOptions(), pool()}; + std::mt19937 rng_; + const std::vector keyTypes_{generateKeyTypes(2)}; + const std::vector groupingKeys_{ + makeNames("c", keyTypes_.size())}; +}; + +TEST_F(AggregationReplayerTest, test) { + const auto data = generateInput(groupingKeys_, keyTypes_); + const auto planWithNames = aggregatePlans(asRowType(data[0]->type())); + const auto sourceFilePath = TempFilePath::create(); + writeToFile(sourceFilePath->getPath(), data); + + for (const auto& planWithName : planWithNames) { + SCOPED_TRACE(planWithName.name); + const auto& plan = planWithName.plan; + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = + fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + auto results = + AssertQueryBuilder(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .split(makeHiveConnectorSplit(sourceFilePath->getPath())) + .copyResults(pool(), task); + + const auto tableWriterReplayer = AggregationReplayer( + traceRoot, task->taskId(), traceNodeId_, 0, "TableWriter"); + const auto replayingResult = tableWriterReplayer.run(); + assertEqualResults({results}, {replayingResult}); + } +} +} // namespace facebook::velox::tool::trace::test diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index fa07996fdd68..5ed9db65a1b8 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_tool_trace_test TableWriterReplayerTest.cpp) +add_executable(velox_tool_trace_test TableWriterReplayerTest.cpp + AggregationReplayerTest.cpp) add_test( NAME velox_tool_trace_test