Skip to content

Commit

Permalink
Add partitioned output trace replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 7, 2024
1 parent 96944d5 commit be72382
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 123 deletions.
4 changes: 2 additions & 2 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ class OutputBuffer {
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck);

// Continues any possibly waiting producers. Called when the
// producer task has an error or cancellation.
/// Continues any possibly waiting producers. Called when the producer task
/// has an error or cancellation.
void terminate();

std::string toString();
Expand Down
14 changes: 7 additions & 7 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class OutputBufferManager {
/// Returns true if the buffer exists for a given taskId, else returns false.
bool updateNumDrivers(const std::string& taskId, uint32_t newNumDrivers);

// Adds data to the outgoing queue for 'destination'. 'data' must not be
// nullptr. 'data' is always added but if the buffers are full the future is
// set to a ContinueFuture that will be realized when there is space.
/// Adds data to the outgoing queue for 'destination'. 'data' must not be
/// nullptr. 'data' is always added but if the buffers are full the future is
/// set to a ContinueFuture that will be realized when there is space.
bool enqueue(
const std::string& taskId,
int destination,
Expand All @@ -62,12 +62,12 @@ class OutputBufferManager {

void noMoreData(const std::string& taskId);

// Returns true if noMoreData has been called and all the accumulated data
// have been fetched and acknowledged.
/// Returns true if noMoreData has been called and all the accumulated data
/// have been fetched and acknowledged.
bool isFinished(const std::string& taskId);

// Removes data with sequence number < 'sequence' from the queue for
// 'destination_'.
/// Removes data with sequence number < 'sequence' from the queue for
/// 'destination_'.
void
acknowledge(const std::string& taskId, int destination, int64_t sequence);

Expand Down
5 changes: 2 additions & 3 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,11 @@ void PartitionedOutput::estimateRowSizes() {
}

void PartitionedOutput::addInput(RowVectorPtr input) {
initializeInput(std::move(input));
traceInput(input);

initializeInput(std::move(input));
initializeDestinations();

initializeSizeBuffers();

estimateRowSizes();

for (auto& destination : destinations_) {
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

velox_add_library(velox_query_trace_replayer_base OperatorReplayerBase.cpp
TableWriterReplayer.cpp)
TableWriterReplayer.cpp PartitionedOutputReplayer.cpp)
velox_link_libraries(
velox_query_trace_replayer_base
velox_aggregates
Expand Down
11 changes: 6 additions & 5 deletions velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ OperatorReplayerBase::OperatorReplayerBase(
std::string nodeId,
int32_t pipelineId,
std::string operatorType)
: rootDir_(std::move(rootDir)),
taskId_(std::move(taskId)),
: taskId_(std::move(taskId)),
nodeId_(std::move(nodeId)),
pipelineId_(pipelineId),
operatorType_(std::move(operatorType)) {
operatorType_(std::move(operatorType)),
rootDir_(std::move(rootDir)),
taskDir_(fmt::format("{}/{}", rootDir_, taskId_)),
nodeDir_(fmt::format("{}/{}", taskDir_, nodeId_)){
VELOX_USER_CHECK(!rootDir_.empty());
VELOX_USER_CHECK(!taskId_.empty());
VELOX_USER_CHECK(!nodeId_.empty());
VELOX_USER_CHECK_GE(pipelineId_, 0);
VELOX_USER_CHECK(!operatorType_.empty());
const auto traceTaskDir = fmt::format("{}/{}", rootDir_, taskId_);
const auto metadataReader = exec::trace::QueryMetadataReader(
traceTaskDir, memory::MemoryManager::getInstance()->tracePool());
taskDir_, memory::MemoryManager::getInstance()->tracePool());
metadataReader.read(queryConfigs_, connectorConfigs_, planFragment_);
queryConfigs_[core::QueryConfig::kQueryTraceEnabled] = "false";
fs_ = filesystems::getFileSystem(rootDir_, nullptr);
Expand Down
12 changes: 8 additions & 4 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"

namespace facebook::velox::exec {
class Task;
}
namespace facebook::velox::tool::trace {
class OperatorReplayerBase {
public:
Expand All @@ -42,19 +45,20 @@ class OperatorReplayerBase {
const std::string& taskId,
bool shortSummary);

virtual RowVectorPtr run() const = 0;
virtual RowVectorPtr run() = 0;

static std::string usage();

protected:
virtual core::PlanNodePtr createPlan() const = 0;

const std::string rootDir_;
const std::string taskId_;
const std::string nodeId_;
const int32_t pipelineId_;
const std::string operatorType_;

const std::string rootDir_;
const std::string taskDir_;
const std::string nodeDir_;

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
Expand Down
170 changes: 170 additions & 0 deletions velox/tool/trace/PartitionedOutputReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <folly/executors/IOThreadPoolExecutor.h>

#include "velox/common/memory/Memory.h"
#include "velox/exec/PartitionedOutput.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/tool/trace/PartitionedOutputReplayer.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
namespace {
std::shared_ptr<core::QueryCtx> createQueryContext(
const std::unordered_map<std::string, std::string>& config,
folly::Executor* executor) {
return core::QueryCtx::create(
executor, core::QueryConfig(std::move(config)));
}

std::function<core::PlanNodePtr(std::string, core::PlanNodePtr)>
partitionedOutputNodeFactory(
const core::PartitionedOutputNode* originalNode) {
return [=](const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) -> core::PlanNodePtr {
return std::make_shared<core::PartitionedOutputNode>(
nodeId,
originalNode->kind(),
originalNode->keys(),
originalNode->numPartitions(),
originalNode->isReplicateNullsAndAny(),
originalNode->partitionFunctionSpecPtr(),
originalNode->outputType(),
source);
};
}

std::vector<std::unique_ptr<folly::IOBuf>> getData(
const std::shared_ptr<exec::OutputBufferManager>& bufferManager,
const std::string& taskId,
int destination,
int64_t sequence,
folly::Executor* executor) {
auto [promise, semiFuture] =
folly::makePromiseContract<std::vector<std::unique_ptr<folly::IOBuf>>>();
VELOX_CHECK(bufferManager->getData(
taskId,
destination,
exec::PartitionedOutput::kMinDestinationSize,
sequence,
[result = std::make_shared<
folly::Promise<std::vector<std::unique_ptr<folly::IOBuf>>>>(
std::move(promise))](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t /*inSequence*/,
std::vector<int64_t> /*remainingBytes*/) {
result->setValue(std::move(pages));
}));
auto future = std::move(semiFuture).via(executor);
future.wait(std::chrono::seconds{10});
VELOX_CHECK(future.isReady());
return std::move(future).value();
}
} // namespace

std::vector<std::vector<std::unique_ptr<folly::IOBuf>>> getAllData(
const std::shared_ptr<exec::OutputBufferManager>& bufferManager,
const std::string& taskId,
uint32_t numPartitions,
folly::Executor* executor) {
std::vector<std::thread> consumerThreads;
std::vector<std::vector<std::unique_ptr<folly::IOBuf>>> partitionedResults;
consumerThreads.reserve(numPartitions);
partitionedResults.reserve(numPartitions);
partitionedResults.resize(numPartitions);
for (uint32_t i = 0; i < numPartitions; i++) {
consumerThreads.push_back(std::thread([&, partition = i]() {
bool finished{false};
while (!finished) {
std::vector<std::unique_ptr<folly::IOBuf>> pages;
{
pages = getData(
bufferManager,
taskId,
partition,
partitionedResults[partition].size(),
executor);
}
for (auto& page : pages) {
if (page) {
partitionedResults[partition].push_back(std::move(page));
} else {
// Null page indicates this buffer is finished.
bufferManager->deleteResults(taskId, partition);
finished = true;
}
}
}
}));
}

for (auto& thread : consumerThreads) {
thread.join();
}
return partitionedResults;
}

PartitionedOutputReplayer::PartitionedOutputReplayer(
const std::string& rootDir,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType),
originalNode_(dynamic_cast<const core::PartitionedOutputNode*>(
core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) {
return node->id() == nodeId_;
}))) {
VELOX_CHECK_NOT_NULL(originalNode_);
}

RowVectorPtr PartitionedOutputReplayer::run() {
auto task = Task::create(
"local://partitioned-output-replayer",
core::PlanFragment{createPlan()},
0,
createQueryContext(queryConfigs_, executor_.get()),
Task::ExecutionMode::kParallel);
task->start(maxDrivers_);

auto partitionedResults = getAllData(
bufferManager_,
task->taskId(),
originalNode_->numPartitions(),
executor_.get());
common::testutil::TestValue::adjust(
"facebook::velox::tool::PartitionedOutputReplayer::run",
&partitionedResults);
return nullptr;
}

core::PlanNodePtr PartitionedOutputReplayer::createPlan() const {
return PlanBuilder()
.traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_))
.addNode(
partitionedOutputNodeFactory(originalNode_))
.planNode();
}

} // namespace facebook::velox::tool::trace
58 changes: 58 additions & 0 deletions velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <utility>

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {

/// Concurrently gets all partitioned buffer content (vec<IOBuf>) for every
/// partition.
std::vector<std::vector<std::unique_ptr<folly::IOBuf>>> getAllData(
const std::shared_ptr<exec::OutputBufferManager>& bufferManager,
const std::string& taskId,
uint32_t numPartitions,
folly::Executor* executor);

/// The replayer to replay the traced 'PartitionedOutput' operator.
class PartitionedOutputReplayer final : public OperatorReplayerBase {
public:
PartitionedOutputReplayer(
const std::string& rootDir,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType);

RowVectorPtr run() override;

private:
core::PlanNodePtr createPlan() const;

const core::PartitionedOutputNode* originalNode_;

std::unique_ptr<folly::Executor> executor_{
std::make_unique<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency())};

const std::shared_ptr<exec::OutputBufferManager> bufferManager_{
exec::OutputBufferManager::getInstance().lock()};
};
} // namespace facebook::velox::tool::trace
Loading

0 comments on commit be72382

Please sign in to comment.