Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UT](exchanger) Add UT case for shuffle exchanger #47598

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,17 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_shuffle_idx_to_instance_idx.clear();
_use_global_shuffle = use_global_hash_shuffle;
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (use_global_hash_shuffle) {
std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(),
[&](const auto& item) {
DCHECK(item.first != -1);
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
});
_shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
} else {
_shuffle_idx_to_instance_idx.resize(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = {i, i};
_shuffle_idx_to_instance_idx[i] = i;
}
}
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
Expand Down Expand Up @@ -147,7 +143,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
RETURN_IF_ERROR(local_state._exchanger->sink(
state, in_block, eos,
{local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr},
{&local_state._channel_id, local_state._partitioner.get(), &local_state}));
{&local_state._channel_id, local_state._partitioner.get(), &local_state,
&_shuffle_idx_to_instance_idx}));

// If all exchange sources ended due to limit reached, current task should also finish
if (local_state._exchanger->_running_source_operators == 0) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
_num_partitions(num_partitions),
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
_bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}

Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
Expand All @@ -116,8 +116,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
const std::vector<TExpr>& _texprs;
const size_t _partitioned_exprs_num;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
std::map<int, int> _shuffle_idx_to_instance_idx;
bool _use_global_shuffle = false;
};

Expand Down
22 changes: 9 additions & 13 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_st
BlockType& block, bool* eos, vectorized::Block* data_block,
int channel_id) {
if (local_state == nullptr) {
if (!_dequeue_data(block, eos, data_block, channel_id)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: {}",
data_queue_debug_string(channel_id));
}
return true;
return _dequeue_data(block, eos, data_block, channel_id);
}
bool all_finished = _running_sink_operators == 0;
if (_data_queue[channel_id].try_dequeue(block)) {
Expand Down Expand Up @@ -160,7 +156,8 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
{
SCOPED_TIMER(profile.distribute_timer);
RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids().get<uint32_t>(),
in_block, *sink_info.channel_id, sink_info.local_state));
in_block, *sink_info.channel_id, sink_info.local_state,
sink_info.shuffle_idx_to_instance_idx));
}

return Status::OK();
Expand Down Expand Up @@ -214,7 +211,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block

Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, int channel_id,
LocalExchangeSinkLocalState* local_state) {
LocalExchangeSinkLocalState* local_state,
std::map<int, int>* shuffle_idx_to_instance_idx) {
if (local_state == nullptr) {
return _split_rows(state, channel_ids, block, channel_id);
}
Expand Down Expand Up @@ -249,17 +247,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
}
local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
channel_id);
auto bucket_seq_to_instance_idx =
local_state->_parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
if (get_type() == ExchangeType::HASH_SHUFFLE) {
/**
* If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed to all instances of
* all BEs. So we need a shuffleId-To-InstanceId mapping.
* For example, row 1 get a hash value 1 which means we should distribute to instance 1 on
* BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3.
*/
const auto& map = local_state->_parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
DCHECK(shuffle_idx_to_instance_idx && shuffle_idx_to_instance_idx->size() > 0);
const auto& map = *shuffle_idx_to_instance_idx;
new_block_wrapper->ref(cast_set<int>(map.size()));
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
Expand All @@ -274,13 +270,13 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
}
}
} else {
DCHECK(!bucket_seq_to_instance_idx.empty());
DCHECK(shuffle_idx_to_instance_idx && shuffle_idx_to_instance_idx->size() > 0);
new_block_wrapper->ref(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
uint32_t start = partition_rows_histogram[i];
uint32_t size = partition_rows_histogram[i + 1] - start;
if (size > 0) {
_enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], local_state,
_enqueue_data_and_set_ready((*shuffle_idx_to_instance_idx)[i], local_state,
{new_block_wrapper, {row_idx, start, size}});
} else {
new_block_wrapper->unref(local_state->_shared_state, channel_id);
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct SinkInfo {
int* channel_id;
vectorized::PartitionerBase* partitioner;
LocalExchangeSinkLocalState* local_state;
std::map<int, int>* shuffle_idx_to_instance_idx;
};

struct SourceInfo {
Expand Down Expand Up @@ -262,7 +263,8 @@ class ShuffleExchanger : public Exchanger<PartitionedBlock> {
protected:
Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, int channel_id,
LocalExchangeSinkLocalState* local_state);
LocalExchangeSinkLocalState* local_state,
std::map<int, int>* shuffle_idx_to_instance_idx);
Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, int channel_id);
std::vector<std::vector<uint32_t>> _partition_rows_histogram;
Expand Down
Loading
Loading