Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start agg step stream according to agg_col_cost and group_by_keys_cost #230

Merged
merged 4 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void LocalServer::initialize(Poco::Util::Application & self)
}

GlobalThreadPool::initialize(
config().getUInt("max_thread_pool_size", std::max(getNumberOfPhysicalCPUCores() * 2, 256u)),
config().getUInt("max_thread_pool_size", std::max(getNumberOfPhysicalCPUCores(), 1024u)),
config().getUInt("max_thread_pool_free_size", 0),
config().getUInt("thread_pool_queue_size", 10000));

Expand Down
28 changes: 25 additions & 3 deletions src/Processors/QueryPlan/AggregatingStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
#include <Common/Logger.h>

namespace DB
{
Expand Down Expand Up @@ -457,12 +458,33 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
{
auto stream_count = pipeline.getNumStreams();
/// Calculate the stream count by adding up all the stream aggregate functions costs.
/// the max stream count is the number of pipeline streams.
size_t estimate_stream = 0;
size_t agg_col_cost = 0;
size_t group_by_keys_cost = 0;
for (const auto & agg : params.aggregates)
{
/// get the function count by counting "("
agg_col_cost += static_cast<size_t>(std::count(agg.column_name.begin(), agg.column_name.end(), '('));
/// get the column count by counting "," + 1
agg_col_cost += 1 + static_cast<size_t>(std::count(agg.column_name.begin(), agg.column_name.end(), ','));
}
for (const auto & key : params.keys)
{
group_by_keys_cost += 1 + 8 * static_cast<size_t>(std::log2(key.size() + 1));
}
estimate_stream = std::min(stream_count, std::max(4ul, 2 * (agg_col_cost + group_by_keys_cost)));

LOG_TRACE(getLogger("AggregatingStep"), "AggregatingStep: estimate_stream = {}", estimate_stream);

/// Add resize transform to uniformly distribute data between aggregating streams.
/// But not if we execute aggregation over partitioned data in which case data streams shouldn't be mixed.
if (!storage_has_evenly_distributed_read && !skip_merging)
pipeline.resize(pipeline.getNumStreams(), true, true);
pipeline.resize(estimate_stream, true, true);

auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
auto many_data = std::make_shared<ManyAggregatedData>(estimate_stream);

size_t counter = 0;
pipeline.addSimpleTransform(
Expand All @@ -479,7 +501,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
skip_merging);
});

pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : estimate_stream, true /* force */);

aggregating = collector.detachProcessors(0);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,14 @@ Chunk PythonSource::scanDataToChunk()
{
// log first 10 rows of the column
std::stringstream ss;
LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure());
// LOG_DEBUG(logger, "Column {} structure: {}", col.name, columns[i]->dumpStructure());
for (size_t j = 0; j < std::min(count, static_cast<size_t>(10)); ++j)
{
Field value;
columns[i]->get(j, value);
ss << toString(value) << ", ";
}
LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str());
// LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str());
}
}
catch (const Exception & e)
Expand Down
Loading