diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0f02ffc2b9a4b1d..f43f61565b6be8d 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -87,19 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde } // namespace vectorized namespace pipeline { -ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, - int be_number, RuntimeState* state, - ExchangeSinkLocalState* parent) +ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, + RuntimeState* state) : HasTaskExecutionCtx(state), _queue_capacity(0), - _is_finishing(false), + _is_failed(false), _query_id(std::move(query_id)), _dest_node_id(dest_node_id), - _sender_id(send_id), - _be_number(be_number), _state(state), - _context(state->get_query_ctx()), - _parent(parent) {} + _context(state->get_query_ctx()) {} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -110,8 +106,8 @@ void ExchangeSinkBuffer::close() { //_instance_to_request.clear(); } -void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { - if (_is_finishing) { +void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { + if (_is_failed) { return; } auto low_id = fragment_instance_id.lo; @@ -129,24 +125,26 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); _rpc_channel_is_idle[low_id] = true; + _rpc_channel_is_turn_off[low_id] = false; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_stats_vec.emplace_back(std::make_shared(low_id)); _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); - _construct_request(low_id, finst_id); + _instance_to_request[low_id] = std::make_shared(); + _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id); + _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id); + + _instance_to_request[low_id]->set_node_id(_dest_node_id); } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { - if (_is_finishing) { + if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->_fragment_instance_id.lo; + auto ins_id = request.channel->ins_id(); if (!_instance_to_package_queue_mutex.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_is_receiver_eof(ins_id)) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; { std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id]); @@ -158,14 +156,17 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); - COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); - COUNTER_SET(_parent->peak_memory_usage_counter(), - _parent->memory_used_counter()->value()); + auto* parent = request.channel->_parent; + COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong()); + COUNTER_SET(parent->peak_memory_usage_counter(), + parent->memory_used_counter()->value()); } _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; - if (_queue_dependency && _total_queue_size > _queue_capacity) { - _queue_dependency->block(); + if (_total_queue_size > _queue_capacity) { + for (auto dep : _queue_deps) { + dep->block(); + } } } if (send_now) { @@ -176,17 +177,14 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { } Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { - if (_is_finishing) { + if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->_fragment_instance_id.lo; + auto ins_id = request.channel->ins_id(); if (!_instance_to_package_queue_mutex.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_is_receiver_eof(ins_id)) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; { std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id]); @@ -211,16 +209,18 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - DCHECK(_rpc_channel_is_idle[id] == false); - std::queue>& q = _instance_to_package_queue[id]; std::queue>& broadcast_q = _instance_to_broadcast_package_queue[id]; - if (_is_finishing) { + if (_is_failed) { _turn_off_channel(id, lock); return Status::OK(); } + if (_instance_to_receiver_eof[id]) { + DCHECK(_rpc_channel_is_turn_off[id]); + return Status::OK(); + } if (!q.empty()) { // If we have data to shuffle which is not broadcasted @@ -228,6 +228,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block && !request.block->column_metas().empty()) { brpc_request->set_allocated_block(request.block.get()); } @@ -273,14 +275,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + return; } else if (eos) { _ended(id); - } else { - s = _send_rpc(id); - if (!s) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", - s.to_string())); - } + } + // The eos here only indicates that the current exchange sink has reached eos. + // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. + s = _send_rpc(id); + if (!s) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } }); { @@ -298,13 +302,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong()); + COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), + -request.block->ByteSizeLong()); static_cast(brpc_request->release_block()); } q.pop(); _total_queue_size--; - if (_queue_dependency && _total_queue_size <= _queue_capacity) { - _queue_dependency->set_ready(); + if (_total_queue_size <= _queue_capacity) { + for (auto dep : _queue_deps) { + dep->set_ready(); + } } } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted @@ -312,6 +319,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block_holder->get_block() && !request.block_holder->get_block()->column_metas().empty()) { brpc_request->set_allocated_block(request.block_holder->get_block()); @@ -354,14 +363,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + return; } else if (eos) { _ended(id); - } else { - s = _send_rpc(id); - if (!s) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", - s.to_string())); - } + } + + // The eos here only indicates that the current exchange sink has reached eos. + // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. + s = _send_rpc(id); + if (!s) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } }); { @@ -389,16 +401,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { return Status::OK(); } -void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) { - _instance_to_request[id] = std::make_shared(); - _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id); - _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id); - - _instance_to_request[id]->set_node_id(_dest_node_id); - _instance_to_request[id]->set_sender_id(_sender_id); - _instance_to_request[id]->set_be_number(_be_number); -} - void ExchangeSinkBuffer::_ended(InstanceLoId id) { if (!_instance_to_package_queue_mutex.template contains(id)) { std::stringstream ss; @@ -413,48 +415,25 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { __builtin_unreachable(); } else { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - _turn_off_channel(id, lock); + _running_sink_count[id]--; + if (_running_sink_count[id] == 0) { + _turn_off_channel(id, lock); + } } } void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { - _is_finishing = true; + _is_failed = true; _context->cancel(Status::Cancelled(err)); } void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; + // When the receiving side reaches eof, it means the receiver has finished early. + // The remaining data in the current rpc_channel does not need to be sent, + // and the rpc_channel should be turned off immediately. _turn_off_channel(id, lock); - std::queue>& broadcast_q = - _instance_to_broadcast_package_queue[id]; - for (; !broadcast_q.empty(); broadcast_q.pop()) { - if (broadcast_q.front().block_holder->get_block()) { - COUNTER_UPDATE(_parent->memory_used_counter(), - -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); - } - } - { - std::queue> empty; - swap(empty, broadcast_q); - } - - std::queue>& q = _instance_to_package_queue[id]; - for (; !q.empty(); q.pop()) { - if (q.front().block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); - } - } - - { - std::queue> empty; - swap(empty, q); - } -} - -bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - return _instance_to_receiver_eof[id]; } // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. @@ -463,11 +442,16 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, if (!_rpc_channel_is_idle[id]) { _rpc_channel_is_idle[id] = true; } - _instance_to_receiver_eof[id] = true; - + // Ensure that each RPC is turned off only once. + if (_rpc_channel_is_turn_off[id]) { + return; + } + _rpc_channel_is_turn_off[id] = true; auto weak_task_ctx = weak_task_exec_ctx(); if (auto pip_ctx = weak_task_ctx.lock()) { - _parent->on_channel_finished(id); + for (auto* parent : _parents) { + parent->on_channel_finished(id); + } } } @@ -511,7 +495,7 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1); auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime"); auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime"); - auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1); + auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT); auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime"); int64_t max_rpc_time = 0, min_rpc_time = 0; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 22a1452f8d545c8..7a4bbed29df5b3a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -169,13 +169,52 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback { bool _eos; }; -// Each ExchangeSinkOperator have one ExchangeSinkBuffer +// ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances +// or be individually owned by each ExchangeSinkLocalState. +// The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances. +// Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer. + +// A sink buffer contains multiple rpc_channels. +// Each rpc_channel corresponds to a target instance on the receiving side. +// Data is sent using a ping-pong mode within each rpc_channel, +// meaning that at most one RPC can exist in a single rpc_channel at a time. +// The next RPC can only be sent after the previous one has completed. +// +// Each exchange sink sends data to all target instances on the receiving side. +// If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks. + +/* + +-----------+ +-----------+ +-----------+ + |dest ins id| |dest ins id| |dest ins id| + | | | | | | + +----+------+ +-----+-----+ +------+----+ + | | | + | | | + +----------------+ +----------------+ +----------------+ + | | | | | | + sink buffer -------- | rpc_channel | | rpc_channel | | rpc_channel | + | | | | | | + +-------+--------+ +----------------+ +----------------+ + | | | + |------------------------+----------------------+ + | | | + | | | + +-----------------+ +-------+---------+ +-------+---------+ + | | | | | | + | exchange sink | | exchange sink | | exchange sink | + | | | | | | + +-----------------+ +-----------------+ +-----------------+ +*/ + class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: - ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, - RuntimeState* state, ExchangeSinkLocalState* parent); + ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state); ~ExchangeSinkBuffer() override = default; - void register_sink(TUniqueId); + void register_sink(InstanceLoId id) { + std::lock_guard lc(_init_lock); + _running_sink_count[id]++; + } + void construct_request(TUniqueId); Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); @@ -184,13 +223,10 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr queue_dependency, - std::shared_ptr finish_dependency) { - _queue_dependency = queue_dependency; - _finish_dependency = finish_dependency; - } - - void set_broadcast_dependency(std::shared_ptr broadcast_dependency) { - _broadcast_dependency = broadcast_dependency; + ExchangeSinkLocalState* local_state) { + std::lock_guard lc(_init_lock); + _queue_deps.push_back(queue_dependency); + _parents.push_back(local_state); } private: @@ -214,6 +250,10 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { // One channel is corresponding to a downstream instance. phmap::flat_hash_map _rpc_channel_is_idle; + // There could be multiple situations that cause an rpc_channel to be turned off, + // such as receiving the eof, manual cancellation by the user, or all sinks reaching eos. + // Therefore, it is necessary to prevent an rpc_channel from being turned off multiple times. + phmap::flat_hash_map _rpc_channel_is_turn_off; phmap::flat_hash_map _instance_to_receiver_eof; struct RpcInstanceStatistics { RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} @@ -226,32 +266,34 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { std::vector> _instance_to_rpc_stats_vec; phmap::flat_hash_map _instance_to_rpc_stats; - std::atomic _is_finishing; + // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism. + // If an RPC error occurs, the query will be canceled. + std::atomic _is_failed; PUniqueId _query_id; PlanNodeId _dest_node_id; - // Sender instance id, unique within a fragment. StreamSender save the variable - int _sender_id; - int _be_number; std::atomic _rpc_count = 0; RuntimeState* _state = nullptr; QueryContext* _context = nullptr; Status _send_rpc(InstanceLoId); - // must hold the _instance_to_package_queue_mutex[id] mutex to opera - void _construct_request(InstanceLoId id, PUniqueId); inline void _ended(InstanceLoId id); inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); - inline bool _is_receiver_eof(InstanceLoId id); + inline void _turn_off_channel(InstanceLoId id, std::unique_lock& with_lock); void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); std::atomic _total_queue_size = 0; - std::shared_ptr _queue_dependency = nullptr; - std::shared_ptr _finish_dependency = nullptr; - std::shared_ptr _broadcast_dependency = nullptr; - ExchangeSinkLocalState* _parent = nullptr; + + // Used to protect certain init functions, + // as the init function of ExchangeSinkLocalState is multi-threaded. + std::mutex _init_lock; + // _running_sink_count is used to track how many sinks have not finished yet. + // It is only decremented when eos is reached. + phmap::flat_hash_map _running_sink_count; + std::vector> _queue_deps; + std::vector _parents; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 85f58417197d52e..c04468ccd9c9921 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -32,6 +32,7 @@ #include "pipeline/exec/operator.h" #include "pipeline/exec/sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" +#include "pipeline/local_exchange/local_exchange_source_operator.h" #include "util/runtime_profile.h" #include "util/uid_util.h" #include "vec/columns/column_const.h" @@ -100,6 +101,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1)); } _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); + + size_t local_size = 0; + for (int i = 0; i < channels.size(); ++i) { + if (channels[i]->is_local()) { + local_size++; + _last_local_channel_idx = i; + } + } + only_local_exchange = local_size == channels.size(); + + if (!only_local_exchange) { + _sink_buffer = p.get_sink_buffer(); + register_channels(_sink_buffer.get()); + _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "ExchangeSinkQueueDependency", true); + _sink_buffer->set_dependency(_queue_dependency, this); + } + return Status::OK(); } @@ -149,20 +168,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - if (!only_local_exchange) { - _sink_buffer = std::make_unique(id, p._dest_node_id, _sender_id, - _state->be_number(), state, this); - register_channels(_sink_buffer.get()); - _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "ExchangeSinkQueueDependency", true); - _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); - } - if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared( _parent->operator_id(), _parent->node_id(), "BroadcastDependency", true); - _sink_buffer->set_broadcast_dependency(_broadcast_dependency); _broadcast_pb_mem_limiter = vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); } else if (local_size > 0) { @@ -360,6 +369,7 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { } RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); } + _sink_buffer = _create_buffer(); return Status::OK(); } @@ -632,6 +642,16 @@ void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buf for (auto& channel : channels) { channel->register_exchange_buffer(buffer); } + // The target ins_id of multiple channels may be the same. + // The calculation logic here is the same as that of _working_channels_count. + std::set ins_id_set; + for (auto& channel : channels) { + auto ins_id = channel->ins_id(); + if (!channel->is_local() && !ins_id_set.contains(ins_id)) { + buffer->register_sink(ins_id); + ins_id_set.insert(ins_id); + } + } } Status ExchangeSinkOperatorX::channel_add_rows( @@ -679,8 +699,8 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::format_to(debug_string_buffer, ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " "{}, queue dep: {}), _reach_limit: {}, working channels: {}", - _sink_buffer->_is_finishing.load(), _sink_buffer->_total_queue_size, - _sink_buffer->_queue_capacity, (void*)_sink_buffer->_queue_dependency.get(), + _sink_buffer->_is_failed.load(), _sink_buffer->_total_queue_size, + _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(), _working_channels_count.load()); } return fmt::to_string(debug_string_buffer); @@ -734,4 +754,33 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { return DataSinkOperatorX::required_data_distribution(); } +std::shared_ptr ExchangeSinkOperatorX::_create_buffer() { + PUniqueId id; + id.set_hi(_state->query_id().hi); + id.set_lo(_state->query_id().lo); + auto sink_buffer = std::make_unique(id, _dest_node_id, state()); + for (const auto& _dest : _dests) { + const auto& dest_fragment_instance_id = _dest.fragment_instance_id; + // There is no need to check for duplicate dest_fragment_instance_id here. + // The construct_request function already handles this check internally. + sink_buffer->construct_request(dest_fragment_instance_id); + } + return sink_buffer; +} + +std::shared_ptr ExchangeSinkOperatorX::get_sink_buffer() { + if (_child) { + if (std::dynamic_pointer_cast(_child)) { + return _create_buffer(); + } + if (std::dynamic_pointer_cast(_child)) { + return _create_buffer(); + } + } + if (_state->enable_shared_exchange_sink_buffer()) { + return _sink_buffer; + } + return _create_buffer(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 91ee1bd27a63e79..9157a5527914a11 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -89,6 +89,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { void set_reach_limit() { _reach_limit = true; }; [[nodiscard]] int sender_id() const { return _sender_id; } + [[nodiscard]] int be_number() const { return _state->be_number(); } std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; @@ -112,7 +113,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { friend class vectorized::Channel; friend class vectorized::BlockSerializer; - std::unique_ptr _sink_buffer = nullptr; + std::shared_ptr _sink_buffer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; RuntimeProfile::Counter* _bytes_sent_counter = nullptr; @@ -209,6 +210,14 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX get_sink_buffer(); + private: friend class ExchangeSinkLocalState; @@ -225,6 +234,12 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX>& channel2rows, vectorized::Block* block, bool eos); + + // Use ExchangeSinkOperatorX to create a sink buffer. + // The sink buffer can be shared among multiple ExchangeSinkLocalState instances, + // or each ExchangeSinkLocalState can have its own sink buffer. + std::shared_ptr _create_buffer(); + std::shared_ptr _sink_buffer = nullptr; RuntimeState* _state = nullptr; const std::vector _texprs; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0775ef3fb19826b..62bca4dd03519cf 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -260,7 +260,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state = RuntimeState::create_unique( request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - + _runtime_state->set_task_execution_context(shared_from_this()); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 0bc81bca4d99a1e..8cf580a04dc8e53 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -593,6 +593,11 @@ class RuntimeState { _query_options.enable_local_merge_sort; } + bool enable_shared_exchange_sink_buffer() const { + return _query_options.__isset.enable_shared_exchange_sink_buffer && + _query_options.enable_shared_exchange_sink_buffer; + } + int64_t min_revocable_mem() const { if (_query_options.__isset.min_revocable_mem) { return std::max(_query_options.min_revocable_mem, (int64_t)1); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5fe35e4da119d03..b4a61c04f284160 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -166,10 +166,9 @@ class Channel { return Status::OK(); } - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } + void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } + + InstanceLoId ins_id() const { return _fragment_instance_id.lo; } std::shared_ptr> get_send_callback( InstanceLoId id, bool eos) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 115614a41873a6c..9ccda5d61fbb74e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -284,6 +284,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort"; + public static final String ENABLE_SHARED_EXCHANGE_SINK_BUFFER = "enable_shared_exchange_sink_buffer"; + public static final String ENABLE_AGG_STATE = "enable_agg_state"; public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; @@ -1133,6 +1135,9 @@ public enum IgnoreSplitType { @VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT) private boolean enableLocalMergeSort = true; + @VariableMgr.VarAttr(name = ENABLE_SHARED_EXCHANGE_SINK_BUFFER) + private boolean enableSharedExchangeSinkBuffer = true; + @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) public boolean enableAggState = false; @@ -1487,7 +1492,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { public boolean enableCommonExprPushdown = true; @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED) - public boolean enableLocalExchange = true; + public boolean enableLocalExchange = false; /** * For debug purpose, don't merge unique key and agg key when reading data. @@ -2358,7 +2363,6 @@ public void initFuzzyModeVariables() { this.parallelPipelineTaskNum = random.nextInt(8); this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); - this.enableLocalExchange = random.nextBoolean(); this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); @@ -3954,6 +3958,7 @@ public TQueryOptions toThrift() { tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); tResult.setEnableLocalMergeSort(enableLocalMergeSort); + tResult.setEnableSharedExchangeSinkBuffer(enableSharedExchangeSinkBuffer); tResult.setEnableParallelResultSink(enableParallelResultSink); tResult.setEnableParallelOutfile(enableParallelOutfile); tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 9a0fd910d943879..0e550dbdefc425f 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -358,6 +358,8 @@ struct TQueryOptions { 140: optional i64 orc_max_merge_distance_bytes = 1048576; 141: optional bool ignore_runtime_filter_error = false; + + 142: optional bool enable_shared_exchange_sink_buffer = true; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.