Skip to content

Commit

Permalink
Add HashAggregation Replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 6, 2024
1 parent 96944d5 commit 8b9f2c6
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 78 deletions.
1 change: 1 addition & 0 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
}

void HashAggregation::addInput(RowVectorPtr input) {
traceInput(input);
if (!pushdownChecked_) {
mayPushdown_ = operatorCtx_->driver()->mayPushdownAggregation(this);
pushdownChecked_ = true;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ TEST_F(QueryTracerTest, task) {
}
} testSettings[]{{".*", 1}, {"test_cursor .*", 1}, {"xxx_yyy \\d+", 0}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto expectedQueryConfigs =
std::unordered_map<std::string, std::string>{
Expand Down
44 changes: 44 additions & 0 deletions velox/tool/trace/AggregationReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/exec/QueryDataReader.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr AggregationReplayer::createRepalyNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto* aggregationNode =
dynamic_cast<const core::AggregationNode*>(node);
return std::make_shared<core::AggregationNode>(
nodeId,
aggregationNode->step(),
aggregationNode->groupingKeys(),
aggregationNode->preGroupedKeys(),
aggregationNode->aggregateNames(),
aggregationNode->aggregates(),
aggregationNode->globalGroupingSets(),
aggregationNode->groupId(),
aggregationNode->ignoreNullKeys(),
source);
}
} // namespace facebook::velox::tool::trace
45 changes: 45 additions & 0 deletions velox/tool/trace/AggregationReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashAggregation' operator.
class AggregationReplayer : public OperatorReplayerBase {
public:
AggregationReplayer(
const std::string& rootDir,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
core::PlanNodePtr createRepalyNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

velox_add_library(velox_query_trace_replayer_base OperatorReplayerBase.cpp
TableWriterReplayer.cpp)
TableWriterReplayer.cpp AggregationReplayer.cpp)
velox_link_libraries(
velox_query_trace_replayer_base
velox_aggregates
Expand Down
33 changes: 32 additions & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
#include "velox/exec/QueryMetadataReader.h"
#include "velox/exec/QueryTraceTraits.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

using namespace facebook::velox;

namespace facebook::velox::tool::trace {

OperatorReplayerBase::OperatorReplayerBase(
std::string rootDir,
std::string taskId,
Expand All @@ -53,6 +54,36 @@ OperatorReplayerBase::OperatorReplayerBase(
exec::trace::getNumDrivers(rootDir_, taskId_, nodeId_, pipelineId_, fs_);
}

RowVectorPtr OperatorReplayerBase::run() const {
const auto restoredPlanNode = createPlan();
return exec::test::AssertQueryBuilder(restoredPlanNode)
.maxDrivers(maxDrivers_)
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.copyResults(memory::MemoryManager::getInstance()->tracePool());
}

core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
const auto traceRoot = fmt::format("{}/{}", rootDir_, taskId_);
return exec::test::PlanBuilder()
.traceScan(
fmt::format("{}/{}", traceRoot, nodeId_),
exec::trace::getDataType(planFragment_, nodeId_))
.addNode(addReplayNode(replayNode))
.planNode();
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
OperatorReplayerBase::addReplayNode(const core::PlanNode* node) const {
return [=](const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) -> core::PlanNodePtr {
return createRepalyNode(node, nodeId, source);
};
}

void OperatorReplayerBase::printSummary(
const std::string& rootDir,
const std::string& taskId,
Expand Down
17 changes: 12 additions & 5 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#pragma once

#include <gflags/gflags.h>
#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"

Expand All @@ -37,17 +36,25 @@ class OperatorReplayerBase {
OperatorReplayerBase& operator=(OperatorReplayerBase&& other) noexcept =
delete;

RowVectorPtr run() const;

static void printSummary(
const std::string& rootDir,
const std::string& taskId,
bool shortSummary);

virtual RowVectorPtr run() const = 0;

static std::string usage();

protected:
virtual core::PlanNodePtr createPlan() const = 0;
private:
virtual core::PlanNodePtr createPlan() const;

virtual std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
addReplayNode(const core::PlanNode* node) const;

virtual core::PlanNodePtr createRepalyNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const = 0;

const std::string rootDir_;
const std::string taskId_;
Expand Down
18 changes: 13 additions & 5 deletions velox/tool/trace/QueryReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@

#include <folly/executors/IOThreadPoolExecutor.h>
#include <gflags/gflags.h>

#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/PartitionFunction.h"
#include "velox/type/Type.h"

#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/PartitionFunction.h"
#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/TableWriterReplayer.h"
#include "velox/type/Type.h"

DEFINE_bool(usage, false, "Show the usage");
DEFINE_string(root, "", "Root dir of the query tracing");
Expand Down Expand Up @@ -96,6 +97,13 @@ std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer(
FLAGS_pipeline_id,
FLAGS_operator_type,
FLAGS_table_writer_output_dir);
} else if (operatorType == "Aggregation") {
replayer = std::make_unique<tool::trace::AggregationReplayer>(
FLAGS_root,
FLAGS_task_id,
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else {
VELOX_FAIL("Unsupported opeartor type: {}", FLAGS_operator_type);
}
Expand Down
61 changes: 11 additions & 50 deletions velox/tool/trace/TableWriterReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/

#include "velox/tool/trace/TableWriterReplayer.h"

#include <folly/executors/IOThreadPoolExecutor.h>
#include "velox/common/memory/Memory.h"
#include "velox/exec/QueryDataReader.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/TableWriter.h"
Expand Down Expand Up @@ -72,58 +69,22 @@ std::shared_ptr<core::InsertTableHandle> createInsertTableHanlde(

} // namespace

RowVectorPtr TableWriterReplayer::run() const {
const auto restoredPlanNode = createPlan();

return AssertQueryBuilder(restoredPlanNode)
.maxDrivers(maxDrivers_)
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.copyResults(memory::MemoryManager::getInstance()->tracePool());
}

core::PlanNodePtr TableWriterReplayer::createPlan() const {
const auto* tableWriterNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
const auto traceRoot = fmt::format("{}/{}", rootDir_, taskId_);
return PlanBuilder()
.traceScan(
fmt::format("{}/{}", traceRoot, nodeId_),
exec::trace::getDataType(planFragment_, nodeId_))
.addNode(addTableWriter(
dynamic_cast<const core::TableWriteNode*>(tableWriterNode),
replayOutputDir_))
.planNode();
}

core::PlanNodePtr TableWriterReplayer::createTableWrtierNode(
const core::TableWriteNode* node,
const std::string& targetDir,
core::PlanNodePtr TableWriterReplayer::createRepalyNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) {
const core::PlanNodePtr& source) const {
const auto* tableWriterNode = dynamic_cast<const core::TableWriteNode*>(node);
const auto insertTableHandle =
createInsertTableHanlde("test-hive", node, targetDir);
createInsertTableHanlde("test-hive", tableWriterNode, replayOutputDir_);
return std::make_shared<core::TableWriteNode>(
nodeId,
node->columns(),
node->columnNames(),
node->aggregationNode(),
tableWriterNode->columns(),
tableWriterNode->columnNames(),
tableWriterNode->aggregationNode(),
insertTableHandle,
node->hasPartitioningScheme(),
TableWriteTraits::outputType(node->aggregationNode()),
node->commitStrategy(),
tableWriterNode->hasPartitioningScheme(),
TableWriteTraits::outputType(tableWriterNode->aggregationNode()),
tableWriterNode->commitStrategy(),
source);
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
TableWriterReplayer::addTableWriter(
const core::TableWriteNode* node,
const std::string& targetDir) {
return [=](const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) -> core::PlanNodePtr {
return createTableWrtierNode(node, targetDir, nodeId, source);
};
}

} // namespace facebook::velox::tool::trace
18 changes: 3 additions & 15 deletions velox/tool/trace/TableWriterReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <utility>

#include "velox/tool/trace/OperatorReplayerBase.h"

#include "velox/core/PlanNode.h"

namespace facebook::velox::tool::trace {
Expand All @@ -38,22 +37,11 @@ class TableWriterReplayer final : public OperatorReplayerBase {
VELOX_CHECK(!replayOutputDir_.empty());
}

RowVectorPtr run() const override;

protected:
core::PlanNodePtr createPlan() const override;

private:
static core::PlanNodePtr createTableWrtierNode(
const core::TableWriteNode* node,
const std::string& targetDir,
core::PlanNodePtr createRepalyNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source);

static std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
addTableWriter(
const core::TableWriteNode* node,
const std::string& targetDir);
const core::PlanNodePtr& source) const override;

const std::string replayOutputDir_;
};
Expand Down
Loading

0 comments on commit 8b9f2c6

Please sign in to comment.