Skip to content

Commit

Permalink
[fix](local exchange) Fix unbalance data distribution (#44421)
Browse files Browse the repository at this point in the history
Follow-up for #44137
  • Loading branch information
Gabriel39 authored Nov 22, 2024
1 parent d8215d7 commit 1ffecfd
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
16 changes: 7 additions & 9 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ExchangerBase {
ExchangerBase(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
: _running_sink_operators(running_sink_operators),
_running_source_operators(num_partitions),
_running_source_operators(num_sources),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
_num_sources(num_sources),
Expand Down Expand Up @@ -202,10 +202,13 @@ struct BlockWrapper {
class ShuffleExchanger : public Exchanger<PartitionedBlock> {
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
ShuffleExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_partitions,
ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
free_block_limit) {
_data_queue.resize(num_partitions);
DCHECK_GT(num_partitions, 0);
DCHECK_GT(num_sources, 0);
_data_queue.resize(num_sources);
}
~ShuffleExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Expand All @@ -217,10 +220,6 @@ class ShuffleExchanger : public Exchanger<PartitionedBlock> {
ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; }

protected:
ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
free_block_limit) {}
Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, LocalExchangeSinkLocalState& local_state);
};
Expand All @@ -232,7 +231,6 @@ class BucketShuffleExchanger final : public ShuffleExchanger {
: ShuffleExchanger(running_sink_operators, num_sources, num_partitions,
free_block_limit) {
DCHECK_GT(num_partitions, 0);
_data_queue.resize(std::max(num_partitions, num_sources));
}
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
use_global_hash_shuffle ? _total_instances : _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? cast_set<int>(
Expand Down

0 comments on commit 1ffecfd

Please sign in to comment.