Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](exchange) enable shared exchange sink buffer to reduce RPC concurrency #46764

Open
wants to merge 3 commits into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 150 additions & 71 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp

Large diffs are not rendered by default.

130 changes: 101 additions & 29 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,80 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
bool _eos;
};

// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
// ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances
// or be individually owned by each ExchangeSinkLocalState.
// The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances.
// Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer.

// A sink buffer contains multiple rpc_channels.
// Each rpc_channel corresponds to a target instance on the receiving side.
// Data is sent using a ping-pong mode within each rpc_channel,
// meaning that at most one RPC can exist in a single rpc_channel at a time.
// The next RPC can only be sent after the previous one has completed.
//
// Each exchange sink sends data to all target instances on the receiving side.
// If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks.

/*
+-----------+ +-----------+ +-----------+
|dest ins id| |dest ins id| |dest ins id|
| | | | | |
+----+------+ +-----+-----+ +------+----+
| | |
| | |
+----------------+ +----------------+ +----------------+
| | | | | |
sink buffer -------- | rpc_channel | | rpc_channel | | rpc_channel |
| | | | | |
+-------+--------+ +----------------+ +----------------+
| | |
|------------------------+----------------------+
| | |
| | |
+-----------------+ +-------+---------+ +-------+---------+
| | | | | |
| exchange sink | | exchange sink | | exchange sink |
| | | | | |
+-----------------+ +-----------------+ +-----------------+
*/

#ifdef BE_TEST
void transmit_blockv2(PBackendService_Stub& stub,
std::unique_ptr<AutoReleaseClosure<PTransmitDataParams,
ExchangeSendCallback<PTransmitDataResult>>>
closure);
#endif
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number,
RuntimeState* state, ExchangeSinkLocalState* parent);
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state,
const std::vector<InstanceLoId>& sender_ins_ids);

#ifdef BE_TEST
ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
: HasTaskExecutionCtx(state), _exchange_sink_num(sinknum) {};
#endif
~ExchangeSinkBuffer() override = default;
void register_sink(TUniqueId);

void construct_request(TUniqueId);

Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_profile(RuntimeProfile* profile);

void set_dependency(std::shared_ptr<Dependency> queue_dependency,
std::shared_ptr<Dependency> finish_dependency) {
_queue_dependency = queue_dependency;
_finish_dependency = finish_dependency;
void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency,
ExchangeSinkLocalState* local_state) {
DCHECK(_queue_deps.contains(sender_ins_id));
DCHECK(_parents.contains(sender_ins_id));
_queue_deps[sender_ins_id] = queue_dependency;
_parents[sender_ins_id] = local_state;
}

void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
}

#ifdef BE_TEST
public:
#else
private:
#endif
friend class ExchangeSinkLocalState;

phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
Expand All @@ -214,35 +263,58 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
// One channel is corresponding to a downstream instance.
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;

phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;

std::atomic<bool> _is_finishing;
// There could be multiple situations that cause an rpc_channel to be turned off,
// such as receiving the eof, manual cancellation by the user, or all sinks reaching eos.
// Therefore, it is necessary to prevent an rpc_channel from being turned off multiple times.
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off;
struct RpcInstanceStatistics {
RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
InstanceLoId inst_lo_id;
int64_t rpc_count = 0;
int64_t max_time = 0;
int64_t min_time = INT64_MAX;
int64_t sum_time = 0;
};
std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec;
phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats;

// It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism.
// If an RPC error occurs, the query will be canceled.
std::atomic<bool> _is_failed;
PUniqueId _query_id;
PlanNodeId _dest_node_id;
// Sender instance id, unique within a fragment. StreamSender save the variable
int _sender_id;
int _be_number;
std::atomic<int64_t> _rpc_count = 0;
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;

Status _send_rpc(InstanceLoId);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
void _construct_request(InstanceLoId id, PUniqueId);

#ifndef BE_TEST
inline void _ended(InstanceLoId id);
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
inline void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock);

#else
virtual void _ended(InstanceLoId id);
virtual void _failed(InstanceLoId id, const std::string& err);
virtual void _set_receiver_eof(InstanceLoId id);
virtual void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock);
#endif

void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();

std::atomic<int> _total_queue_size = 0;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
ExchangeSinkLocalState* _parent = nullptr;

// _running_sink_count is used to track how many sinks have not finished yet.
// It is only decremented when eos is reached.
phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count;
// _queue_deps is used for memory control.
phmap::flat_hash_map<InstanceLoId, std::shared_ptr<Dependency>> _queue_deps;
// The ExchangeSinkLocalState in _parents is only used in _turn_off_channel.
phmap::flat_hash_map<InstanceLoId, ExchangeSinkLocalState*> _parents;
const int64_t _exchange_sink_num;
};

} // namespace pipeline
Expand Down
85 changes: 70 additions & 15 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_fragment_context.h"
#include "util/runtime_profile.h"
#include "util/uid_util.h"
#include "vec/columns/column_const.h"
Expand Down Expand Up @@ -100,6 +102,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1));
}
_wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);

size_t local_size = 0;
for (int i = 0; i < channels.size(); ++i) {
if (channels[i]->is_local()) {
local_size++;
_last_local_channel_idx = i;
}
}
only_local_exchange = local_size == channels.size();

if (!only_local_exchange) {
_sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(state->fragment_instance_id().lo, _queue_dependency, this);
}

return Status::OK();
}

Expand Down Expand Up @@ -150,20 +170,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);

if (!only_local_exchange) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
}

if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "BroadcastDependency", true);
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
_broadcast_pb_mem_limiter =
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
} else if (local_size > 0) {
Expand Down Expand Up @@ -302,7 +312,8 @@ segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const {

ExchangeSinkOperatorX::ExchangeSinkOperatorX(
RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations)
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids)
: DataSinkOperatorX(operator_id, sink.dest_node_id),
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
Expand All @@ -316,7 +327,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()) {
_enable_local_merge_sort(state->enable_local_merge_sort()),
_fragment_instance_ids(fragment_instance_ids) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -362,6 +374,11 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) {
}
RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state));
}
std::vector<InstanceLoId> ins_ids;
for (auto fragment_instance_id : _fragment_instance_ids) {
ins_ids.push_back(fragment_instance_id.lo);
}
_sink_buffer = _create_buffer(ins_ids);
return Status::OK();
}

Expand Down Expand Up @@ -611,7 +628,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block

void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buffer) {
for (auto& channel : channels) {
channel->register_exchange_buffer(buffer);
channel->set_exchange_buffer(buffer);
}
}

Expand Down Expand Up @@ -661,8 +678,8 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: "
"{}, queue dep: {}), _reach_limit: {}, working channels: {}",
_sink_buffer->_is_finishing.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_sink_buffer->_queue_dependency.get(),
_sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_queue_dependency.get(),
_reach_limit.load(), _working_channels_count.load());
}
return fmt::to_string(debug_string_buffer);
Expand Down Expand Up @@ -716,4 +733,42 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
return DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
}

std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer(
const std::vector<InstanceLoId>& sender_ins_ids) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
auto sink_buffer =
std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, state(), sender_ins_ids);
for (const auto& _dest : _dests) {
sink_buffer->construct_request(_dest.fragment_instance_id);
}
return sink_buffer;
}

// For a normal shuffle scenario, if the concurrency is n,
// there can be up to n * n RPCs in the current fragment.
// Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed.
/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
InstanceLoId sender_ins_id) {
if (!_child) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"ExchangeSinkOperatorX did not correctly set the child.");
}
// When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX,
// it is an order-by scenario.
// In this case, there is only one target instance, and no n * n RPC concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
return _create_buffer({sender_ins_id});
}
if (_state->enable_shared_exchange_sink_buffer()) {
return _sink_buffer;
}
return _create_buffer({sender_ins_id});
}
} // namespace doris::pipeline
Loading
Loading