From 35209a5ac734fdf130e6d03ddecd949393e782c2 Mon Sep 17 00:00:00 2001 From: Mryange Date: Wed, 4 Dec 2024 19:22:54 +0800 Subject: [PATCH] upd --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 144 ++++++++++-------- be/src/pipeline/exec/exchange_sink_buffer.h | 87 ++++++++--- .../pipeline/exec/exchange_sink_operator.cpp | 78 ++++++++-- be/src/pipeline/exec/exchange_sink_operator.h | 21 ++- be/src/pipeline/pipeline_fragment_context.cpp | 3 +- be/src/pipeline/pipeline_fragment_context.h | 2 + be/src/runtime/runtime_state.h | 13 ++ be/src/vec/sink/vdata_stream_sender.h | 7 +- .../org/apache/doris/qe/SessionVariable.java | 7 + gensrc/thrift/PaloInternalService.thrift | 2 + 10 files changed, 259 insertions(+), 105 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 6e6108d13a919f8..22008b47b3c1f07 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -87,19 +87,22 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde } // namespace vectorized namespace pipeline { -ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, - int be_number, RuntimeState* state, - ExchangeSinkLocalState* parent) +ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, + RuntimeState* state, + const std::vector& sender_ins_ids) : HasTaskExecutionCtx(state), _queue_capacity(0), - _is_finishing(false), + _is_failed(false), _query_id(std::move(query_id)), _dest_node_id(dest_node_id), - _sender_id(send_id), - _be_number(be_number), _state(state), _context(state->get_query_ctx()), - _parent(parent) {} + _exchange_sink_num(sender_ins_ids.size()) { + for (auto sender_ins_id : sender_ins_ids) { + _queue_deps.emplace(sender_ins_id, nullptr); + _parents.emplace(sender_ins_id, nullptr); + } +} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -110,8 +113,8 @@ void ExchangeSinkBuffer::close() { //_instance_to_request.clear(); } -void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { - if (_is_finishing) { +void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { + if (_is_failed) { return; } auto low_id = fragment_instance_id.lo; @@ -129,22 +132,27 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); _rpc_channel_is_idle[low_id] = true; - _instance_to_receiver_eof[low_id] = false; + _rpc_channel_is_turn_off[low_id] = false; _instance_to_rpc_stats_vec.emplace_back(std::make_shared(low_id)); _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); - _construct_request(low_id, finst_id); + _instance_to_request[low_id] = std::make_shared(); + _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id); + _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id); + + _instance_to_request[low_id]->set_node_id(_dest_node_id); + _running_sink_count[low_id] = _exchange_sink_num; } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { - if (_is_finishing) { + if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->_fragment_instance_id.lo; + auto ins_id = request.channel->dest_ins_id(); if (!_instance_to_package_queue_mutex.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_is_receiver_eof(ins_id)) { + if (_rpc_channel_is_turn_off[ins_id]) { return Status::EndOfFile("receiver eof"); } bool send_now = false; @@ -158,12 +166,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); - COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); + COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), + request.block->ByteSizeLong()); } _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; - if (_queue_dependency && _total_queue_size > _queue_capacity) { - _queue_dependency->block(); + if (_total_queue_size > _queue_capacity) { + for (auto& [id, dep] : _queue_deps) { + dep->block(); + } } } if (send_now) { @@ -174,15 +185,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { } Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { - if (_is_finishing) { + if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->_fragment_instance_id.lo; + auto ins_id = request.channel->dest_ins_id(); if (!_instance_to_package_queue_mutex.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_is_receiver_eof(ins_id)) { + if (_rpc_channel_is_turn_off[ins_id]) { return Status::EndOfFile("receiver eof"); } bool send_now = false; @@ -209,16 +220,17 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - DCHECK(_rpc_channel_is_idle[id] == false); - std::queue>& q = _instance_to_package_queue[id]; std::queue>& broadcast_q = _instance_to_broadcast_package_queue[id]; - if (_is_finishing) { + if (_is_failed) { _turn_off_channel(id, lock); return Status::OK(); } + if (_rpc_channel_is_turn_off[id]) { + return Status::OK(); + } if (!q.empty()) { // If we have data to shuffle which is not broadcasted @@ -226,6 +238,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block && !request.block->column_metas().empty()) { brpc_request->set_allocated_block(request.block.get()); } @@ -271,14 +285,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + return; } else if (eos) { _ended(id); - } else { - s = _send_rpc(id); - if (!s) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", - s.to_string())); - } + } + // The eos here only indicates that the current exchange sink has reached eos. + // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. + s = _send_rpc(id); + if (!s) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } }); { @@ -296,13 +312,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong()); + COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), + -request.block->ByteSizeLong()); static_cast(brpc_request->release_block()); } q.pop(); _total_queue_size--; - if (_queue_dependency && _total_queue_size <= _queue_capacity) { - _queue_dependency->set_ready(); + if (_total_queue_size <= _queue_capacity) { + for (auto& [id, dep] : _queue_deps) { + dep->set_ready(); + } } } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted @@ -310,6 +329,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block_holder->get_block() && !request.block_holder->get_block()->column_metas().empty()) { brpc_request->set_allocated_block(request.block_holder->get_block()); @@ -352,14 +373,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + return; } else if (eos) { _ended(id); - } else { - s = _send_rpc(id); - if (!s) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", - s.to_string())); - } + } + + // The eos here only indicates that the current exchange sink has reached eos. + // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. + s = _send_rpc(id); + if (!s) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } }); { @@ -387,16 +411,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { return Status::OK(); } -void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) { - _instance_to_request[id] = std::make_shared(); - _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id); - _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id); - - _instance_to_request[id]->set_node_id(_dest_node_id); - _instance_to_request[id]->set_sender_id(_sender_id); - _instance_to_request[id]->set_be_number(_be_number); -} - void ExchangeSinkBuffer::_ended(InstanceLoId id) { if (!_instance_to_package_queue_mutex.template contains(id)) { std::stringstream ss; @@ -411,24 +425,29 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { __builtin_unreachable(); } else { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - _turn_off_channel(id, lock); + _running_sink_count[id]--; + if (_running_sink_count[id] == 0) { + _turn_off_channel(id, lock); + } } } void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { - _is_finishing = true; + _is_failed = true; _context->cancel(Status::Cancelled(err)); } void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - _instance_to_receiver_eof[id] = true; + // When the receiving side reaches eof, it means the receiver has finished early. + // The remaining data in the current rpc_channel does not need to be sent, + // and the rpc_channel should be turned off immediately. _turn_off_channel(id, lock); std::queue>& broadcast_q = _instance_to_broadcast_package_queue[id]; for (; !broadcast_q.empty(); broadcast_q.pop()) { if (broadcast_q.front().block_holder->get_block()) { - COUNTER_UPDATE(_parent->memory_used_counter(), + COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(), -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); } } @@ -440,7 +459,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { if (q.front().block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); + COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), + -q.front().block->ByteSizeLong()); } } @@ -450,22 +470,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { } } -bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - return _instance_to_receiver_eof[id]; -} - // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, std::unique_lock& /*with_lock*/) { if (!_rpc_channel_is_idle[id]) { _rpc_channel_is_idle[id] = true; } - _instance_to_receiver_eof[id] = true; - + // Ensure that each RPC is turned off only once. + if (_rpc_channel_is_turn_off[id]) { + return; + } + _rpc_channel_is_turn_off[id] = true; auto weak_task_ctx = weak_task_exec_ctx(); if (auto pip_ctx = weak_task_ctx.lock()) { - _parent->on_channel_finished(id); + for (auto& [id, parent] : _parents) { + parent->on_channel_finished(id); + } } } @@ -509,7 +529,7 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1); auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); - auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1); + auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); int64_t max_rpc_time = 0, min_rpc_time = 0; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 22a1452f8d545c8..fc7c915a5f5e05f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -169,13 +169,50 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback { bool _eos; }; -// Each ExchangeSinkOperator have one ExchangeSinkBuffer +// ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances +// or be individually owned by each ExchangeSinkLocalState. +// The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances. +// Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer. + +// A sink buffer contains multiple rpc_channels. +// Each rpc_channel corresponds to a target instance on the receiving side. +// Data is sent using a ping-pong mode within each rpc_channel, +// meaning that at most one RPC can exist in a single rpc_channel at a time. +// The next RPC can only be sent after the previous one has completed. +// +// Each exchange sink sends data to all target instances on the receiving side. +// If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks. + +/* + +-----------+ +-----------+ +-----------+ + |dest ins id| |dest ins id| |dest ins id| + | | | | | | + +----+------+ +-----+-----+ +------+----+ + | | | + | | | + +----------------+ +----------------+ +----------------+ + | | | | | | + sink buffer -------- | rpc_channel | | rpc_channel | | rpc_channel | + | | | | | | + +-------+--------+ +----------------+ +----------------+ + | | | + |------------------------+----------------------+ + | | | + | | | + +-----------------+ +-------+---------+ +-------+---------+ + | | | | | | + | exchange sink | | exchange sink | | exchange sink | + | | | | | | + +-----------------+ +-----------------+ +-----------------+ +*/ + class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: - ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, - RuntimeState* state, ExchangeSinkLocalState* parent); + ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state, + const std::vector& sender_ins_ids); ~ExchangeSinkBuffer() override = default; - void register_sink(TUniqueId); + + void construct_request(TUniqueId); Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); @@ -183,14 +220,12 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); - void set_dependency(std::shared_ptr queue_dependency, - std::shared_ptr finish_dependency) { - _queue_dependency = queue_dependency; - _finish_dependency = finish_dependency; - } - - void set_broadcast_dependency(std::shared_ptr broadcast_dependency) { - _broadcast_dependency = broadcast_dependency; + void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr queue_dependency, + ExchangeSinkLocalState* local_state) { + DCHECK(_queue_deps.contains(sender_ins_id)); + DCHECK(_parents.contains(sender_ins_id)); + _queue_deps[sender_ins_id] = queue_dependency; + _parents[sender_ins_id] = local_state; } private: @@ -214,7 +249,10 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { // One channel is corresponding to a downstream instance. phmap::flat_hash_map _rpc_channel_is_idle; - phmap::flat_hash_map _instance_to_receiver_eof; + // There could be multiple situations that cause an rpc_channel to be turned off, + // such as receiving the eof, manual cancellation by the user, or all sinks reaching eos. + // Therefore, it is necessary to prevent an rpc_channel from being turned off multiple times. + phmap::flat_hash_map _rpc_channel_is_turn_off; struct RpcInstanceStatistics { RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} InstanceLoId inst_lo_id; @@ -226,32 +264,33 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { std::vector> _instance_to_rpc_stats_vec; phmap::flat_hash_map _instance_to_rpc_stats; - std::atomic _is_finishing; + // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism. + // If an RPC error occurs, the query will be canceled. + std::atomic _is_failed; PUniqueId _query_id; PlanNodeId _dest_node_id; - // Sender instance id, unique within a fragment. StreamSender save the variable - int _sender_id; - int _be_number; std::atomic _rpc_count = 0; RuntimeState* _state = nullptr; QueryContext* _context = nullptr; Status _send_rpc(InstanceLoId); - // must hold the _instance_to_package_queue_mutex[id] mutex to opera - void _construct_request(InstanceLoId id, PUniqueId); inline void _ended(InstanceLoId id); inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); - inline bool _is_receiver_eof(InstanceLoId id); inline void _turn_off_channel(InstanceLoId id, std::unique_lock& with_lock); void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); std::atomic _total_queue_size = 0; - std::shared_ptr _queue_dependency = nullptr; - std::shared_ptr _finish_dependency = nullptr; - std::shared_ptr _broadcast_dependency = nullptr; - ExchangeSinkLocalState* _parent = nullptr; + + // _running_sink_count is used to track how many sinks have not finished yet. + // It is only decremented when eos is reached. + phmap::flat_hash_map _running_sink_count; + // _queue_deps is used for memory control. + phmap::flat_hash_map> _queue_deps; + // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. + phmap::flat_hash_map _parents; + const int64_t _exchange_sink_num; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index dfa6df392b74ba9..add29f618ebd41e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -32,6 +32,8 @@ #include "pipeline/exec/operator.h" #include "pipeline/exec/sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" +#include "pipeline/local_exchange/local_exchange_source_operator.h" +#include "pipeline/pipeline_fragment_context.h" #include "util/runtime_profile.h" #include "util/uid_util.h" #include "vec/columns/column_const.h" @@ -100,6 +102,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1)); } _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); + + size_t local_size = 0; + for (int i = 0; i < channels.size(); ++i) { + if (channels[i]->is_local()) { + local_size++; + _last_local_channel_idx = i; + } + } + only_local_exchange = local_size == channels.size(); + + if (!only_local_exchange) { + _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo); + register_channels(_sink_buffer.get()); + _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "ExchangeSinkQueueDependency", true); + _sink_buffer->set_dependency(state->fragment_instance_id().lo, _queue_dependency, this); + } + return Status::OK(); } @@ -149,20 +169,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - if (!only_local_exchange) { - _sink_buffer = std::make_unique(id, p._dest_node_id, _sender_id, - _state->be_number(), state, this); - register_channels(_sink_buffer.get()); - _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "ExchangeSinkQueueDependency", true); - _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); - } - if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared( _parent->operator_id(), _parent->node_id(), "BroadcastDependency", true); - _sink_buffer->set_broadcast_dependency(_broadcast_dependency); _broadcast_pb_mem_limiter = vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); } else if (local_size > 0) { @@ -360,6 +370,19 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { } RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); } + + if (const auto* fragment_context = state->fragment_context()) { + auto fragment_instance_ids = fragment_context->fragment_instance_ids(); + std::vector ins_ids; + for (auto fragment_instance_id : fragment_instance_ids) { + ins_ids.push_back(fragment_instance_id.lo); + } + _sink_buffer = _create_buffer(ins_ids); + } else { + return Status::InternalError( + "ExchangeSinkOperatorX::open(state) error , state must be PipelineFragmentContext " + "runtime state "); + } return Status::OK(); } @@ -620,7 +643,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buffer) { for (auto& channel : channels) { - channel->register_exchange_buffer(buffer); + channel->set_exchange_buffer(buffer); } } @@ -669,8 +692,8 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::format_to(debug_string_buffer, ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " "{}, queue dep: {}), _reach_limit: {}, working channels: {}", - _sink_buffer->_is_finishing.load(), _sink_buffer->_total_queue_size, - _sink_buffer->_queue_capacity, (void*)_sink_buffer->_queue_dependency.get(), + _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, + _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(), _working_channels_count.load()); } return fmt::to_string(debug_string_buffer); @@ -724,4 +747,33 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { return DataSinkOperatorX::required_data_distribution(); } +std::shared_ptr ExchangeSinkOperatorX::_create_buffer( + const std::vector& sender_ins_ids) { + PUniqueId id; + id.set_hi(_state->query_id().hi); + id.set_lo(_state->query_id().lo); + auto sink_buffer = + std::make_unique(id, _dest_node_id, state(), sender_ins_ids); + for (const auto& _dest : _dests) { + sink_buffer->construct_request(_dest.fragment_instance_id); + } + return sink_buffer; +} + +/// TODO: Modify this to let FE handle the judgment instead of BE. +std::shared_ptr ExchangeSinkOperatorX::get_sink_buffer( + InstanceLoId sender_ins_id) { + if (_child) { + if (std::dynamic_pointer_cast(_child)) { + return _create_buffer({sender_ins_id}); + } + if (std::dynamic_pointer_cast(_child)) { + return _create_buffer({sender_ins_id}); + } + } + if (_state->enable_shared_exchange_sink_buffer()) { + return _sink_buffer; + } + return _create_buffer({sender_ins_id}); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 91ee1bd27a63e79..0ca0ab501a9fa40 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -88,7 +88,11 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { bool is_finished() const override { return _reach_limit.load(); } void set_reach_limit() { _reach_limit = true; }; + // sender_id indicates which instance within a fragment, while be_number indicates which instance + // across all fragments. For example, with 3 BEs and 8 instances, the range of sender_id would be 0 to 24, + // and the range of be_number would be from n + 0 to n + 24. [[nodiscard]] int sender_id() const { return _sender_id; } + [[nodiscard]] int be_number() const { return _state->be_number(); } std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; @@ -112,7 +116,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { friend class vectorized::Channel; friend class vectorized::BlockSerializer; - std::unique_ptr _sink_buffer = nullptr; + std::shared_ptr _sink_buffer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; RuntimeProfile::Counter* _bytes_sent_counter = nullptr; @@ -209,6 +213,14 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX get_sink_buffer(InstanceLoId sender_ins_id); + private: friend class ExchangeSinkLocalState; @@ -225,6 +237,13 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX>& channel2rows, vectorized::Block* block, bool eos); + + // Use ExchangeSinkOperatorX to create a sink buffer. + // The sink buffer can be shared among multiple ExchangeSinkLocalState instances, + // or each ExchangeSinkLocalState can have its own sink buffer. + std::shared_ptr _create_buffer( + const std::vector& sender_ins_ids); + std::shared_ptr _sink_buffer = nullptr; RuntimeState* _state = nullptr; const std::vector _texprs; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0775ef3fb19826b..73d474855c19958 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -260,7 +260,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state = RuntimeState::create_unique( request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - + _runtime_state->set_task_execution_context(shared_from_this()); + _runtime_state->set_fragment_context(this); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 1674afa886d520d..f07d3ee8ff33ddf 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -123,6 +123,8 @@ class PipelineFragmentContext : public TaskExecutionContext { } } + std::vector fragment_instance_ids() const { return _fragment_instance_ids; } + private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ad63510e2af82cb..c3d5bbf1a784b9a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -317,6 +317,12 @@ class RuntimeState { int per_fragment_instance_idx() const { return _per_fragment_instance_idx; } + const pipeline::PipelineFragmentContext* fragment_context() const { return _fragment_context; } + + void set_fragment_context(pipeline::PipelineFragmentContext* fragment_context) { + _fragment_context = fragment_context; + } + void set_num_per_fragment_instances(int num_instances) { _num_per_fragment_instances = num_instances; } @@ -592,6 +598,11 @@ class RuntimeState { _query_options.enable_local_merge_sort; } + bool enable_shared_exchange_sink_buffer() const { + return _query_options.__isset.enable_shared_exchange_sink_buffer && + _query_options.enable_shared_exchange_sink_buffer; + } + int64_t min_revocable_mem() const { if (_query_options.__isset.min_revocable_mem) { return std::max(_query_options.min_revocable_mem, (int64_t)1); @@ -637,6 +648,8 @@ class RuntimeState { const DescriptorTbl* _desc_tbl = nullptr; std::shared_ptr _obj_pool; + pipeline::PipelineFragmentContext* _fragment_context = nullptr; + // owned by PipelineFragmentContext RuntimeFilterMgr* _runtime_filter_mgr = nullptr; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5fe35e4da119d03..3605f7e5791b6e2 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -166,10 +166,9 @@ class Channel { return Status::OK(); } - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } + void set_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } + + InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; } std::shared_ptr> get_send_callback( InstanceLoId id, bool eos) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3c180be9d42802f..f802dd049495112 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -284,6 +284,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort"; + public static final String ENABLE_SHARED_EXCHANGE_SINK_BUFFER = "enable_shared_exchange_sink_buffer"; + public static final String ENABLE_AGG_STATE = "enable_agg_state"; public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; @@ -1133,6 +1135,9 @@ public enum IgnoreSplitType { @VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT) private boolean enableLocalMergeSort = true; + @VariableMgr.VarAttr(name = ENABLE_SHARED_EXCHANGE_SINK_BUFFER) + private boolean enableSharedExchangeSinkBuffer = true; + @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) public boolean enableAggState = false; @@ -2359,6 +2364,7 @@ public void initFuzzyModeVariables() { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.enableSharedExchangeSinkBuffer = random.nextBoolean(); this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); @@ -3954,6 +3960,7 @@ public TQueryOptions toThrift() { tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); tResult.setEnableLocalMergeSort(enableLocalMergeSort); + tResult.setEnableSharedExchangeSinkBuffer(enableSharedExchangeSinkBuffer); tResult.setEnableParallelResultSink(enableParallelResultSink); tResult.setEnableParallelOutfile(enableParallelOutfile); tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 9a0fd910d943879..0e550dbdefc425f 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -358,6 +358,8 @@ struct TQueryOptions { 140: optional i64 orc_max_merge_distance_bytes = 1048576; 141: optional bool ignore_runtime_filter_error = false; + + 142: optional bool enable_shared_exchange_sink_buffer = true; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.