From 1c65114e8d076942f702d29425b96e4e064fe466 Mon Sep 17 00:00:00 2001 From: Mryange Date: Wed, 13 Nov 2024 19:15:12 +0800 Subject: [PATCH] test --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 101 ++++++++---------- be/src/pipeline/exec/exchange_sink_buffer.h | 33 +++--- .../pipeline/exec/exchange_sink_operator.cpp | 19 +++- be/src/pipeline/exec/exchange_sink_operator.h | 6 +- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/vec/sink/vdata_stream_sender.h | 4 +- .../org/apache/doris/qe/SessionVariable.java | 4 +- 7 files changed, 82 insertions(+), 87 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0f02ffc2b9a4b1..2a596f03c44297 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -87,19 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde } // namespace vectorized namespace pipeline { -ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, - int be_number, RuntimeState* state, - ExchangeSinkLocalState* parent) +ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, + RuntimeState* state) : HasTaskExecutionCtx(state), _queue_capacity(0), _is_finishing(false), _query_id(std::move(query_id)), _dest_node_id(dest_node_id), - _sender_id(send_id), - _be_number(be_number), _state(state), - _context(state->get_query_ctx()), - _parent(parent) {} + _context(state->get_query_ctx()) {} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -110,7 +106,7 @@ void ExchangeSinkBuffer::close() { //_instance_to_request.clear(); } -void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { +void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { if (_is_finishing) { return; } @@ -132,7 +128,11 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_stats_vec.emplace_back(std::make_shared(low_id)); _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); - _construct_request(low_id, finst_id); + _instance_to_request[low_id] = std::make_shared(); + _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id); + _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id); + + _instance_to_request[low_id]->set_node_id(_dest_node_id); } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { @@ -158,14 +158,17 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); - COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); - COUNTER_SET(_parent->peak_memory_usage_counter(), - _parent->memory_used_counter()->value()); + auto* parent = request.channel->_parent; + COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong()); + COUNTER_SET(parent->peak_memory_usage_counter(), + parent->memory_used_counter()->value()); } _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; - if (_queue_dependency && _total_queue_size > _queue_capacity) { - _queue_dependency->block(); + if (_total_queue_size > _queue_capacity) { + for (auto dep : _queue_deps) { + dep->block(); + } } } if (send_now) { @@ -211,8 +214,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - DCHECK(_rpc_channel_is_idle[id] == false); - std::queue>& q = _instance_to_package_queue[id]; std::queue>& broadcast_q = _instance_to_broadcast_package_queue[id]; @@ -228,6 +229,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block && !request.block->column_metas().empty()) { brpc_request->set_allocated_block(request.block.get()); } @@ -273,9 +276,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + return; } else if (eos) { _ended(id); - } else { + } + { s = _send_rpc(id); if (!s) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", @@ -298,13 +303,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong()); + COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), + -request.block->ByteSizeLong()); static_cast(brpc_request->release_block()); } q.pop(); _total_queue_size--; - if (_queue_dependency && _total_queue_size <= _queue_capacity) { - _queue_dependency->set_ready(); + if (_total_queue_size <= _queue_capacity) { + for (auto dep : _queue_deps) { + dep->set_ready(); + } } } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted @@ -312,6 +320,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block_holder->get_block() && !request.block_holder->get_block()->column_metas().empty()) { brpc_request->set_allocated_block(request.block_holder->get_block()); @@ -354,9 +364,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + return; } else if (eos) { _ended(id); - } else { + } + { s = _send_rpc(id); if (!s) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", @@ -389,16 +401,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { return Status::OK(); } -void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) { - _instance_to_request[id] = std::make_shared(); - _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id); - _instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id); - - _instance_to_request[id]->set_node_id(_dest_node_id); - _instance_to_request[id]->set_sender_id(_sender_id); - _instance_to_request[id]->set_be_number(_be_number); -} - void ExchangeSinkBuffer::_ended(InstanceLoId id) { if (!_instance_to_package_queue_mutex.template contains(id)) { std::stringstream ss; @@ -413,7 +415,10 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { __builtin_unreachable(); } else { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - _turn_off_channel(id, lock); + _turn_off_count[id]--; + if (_turn_off_count[id] == 0) { + _turn_off_channel(id, lock); + } } } @@ -424,31 +429,9 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - _instance_to_receiver_eof[id] = true; - _turn_off_channel(id, lock); - std::queue>& broadcast_q = - _instance_to_broadcast_package_queue[id]; - for (; !broadcast_q.empty(); broadcast_q.pop()) { - if (broadcast_q.front().block_holder->get_block()) { - COUNTER_UPDATE(_parent->memory_used_counter(), - -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); - } - } - { - std::queue> empty; - swap(empty, broadcast_q); - } - - std::queue>& q = _instance_to_package_queue[id]; - for (; !q.empty(); q.pop()) { - if (q.front().block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); - } - } - - { - std::queue> empty; - swap(empty, q); + _turn_off_count[id]--; + if (_turn_off_count[id] == 0) { + _turn_off_channel(id, lock); } } @@ -467,7 +450,9 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, auto weak_task_ctx = weak_task_exec_ctx(); if (auto pip_ctx = weak_task_ctx.lock()) { - _parent->on_channel_finished(id); + for (auto* parent : _parents) { + parent->on_channel_finished(id); + } } } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 22a1452f8d545c..9b0825c4b17296 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -172,10 +172,13 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback { // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: - ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, - RuntimeState* state, ExchangeSinkLocalState* parent); + ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state); ~ExchangeSinkBuffer() override = default; - void register_sink(TUniqueId); + void register_sink(InstanceLoId id) { + std::lock_guard lc(_lock); + _turn_off_count[id]++; + } + void construct_request(TUniqueId); Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); @@ -184,17 +187,15 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr queue_dependency, - std::shared_ptr finish_dependency) { - _queue_dependency = queue_dependency; - _finish_dependency = finish_dependency; - } - - void set_broadcast_dependency(std::shared_ptr broadcast_dependency) { - _broadcast_dependency = broadcast_dependency; + ExchangeSinkLocalState* local_state) { + std::lock_guard lc(_lock); + _queue_deps.push_back(queue_dependency); + _parents.push_back(local_state); } private: friend class ExchangeSinkLocalState; + phmap::flat_hash_map _turn_off_count; phmap::flat_hash_map> _instance_to_package_queue_mutex; @@ -229,16 +230,11 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { std::atomic _is_finishing; PUniqueId _query_id; PlanNodeId _dest_node_id; - // Sender instance id, unique within a fragment. StreamSender save the variable - int _sender_id; - int _be_number; std::atomic _rpc_count = 0; RuntimeState* _state = nullptr; QueryContext* _context = nullptr; Status _send_rpc(InstanceLoId); - // must hold the _instance_to_package_queue_mutex[id] mutex to opera - void _construct_request(InstanceLoId id, PUniqueId); inline void _ended(InstanceLoId id); inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); @@ -248,10 +244,9 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { int64_t get_sum_rpc_time(); std::atomic _total_queue_size = 0; - std::shared_ptr _queue_dependency = nullptr; - std::shared_ptr _finish_dependency = nullptr; - std::shared_ptr _broadcast_dependency = nullptr; - ExchangeSinkLocalState* _parent = nullptr; + std::vector> _queue_deps; + std::vector _parents; + std::mutex _lock; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1f91af01aa1f6b..d8bbfa208be9ee 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -149,19 +149,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { id.set_lo(_state->query_id().lo); if (!only_local_exchange) { - _sink_buffer = std::make_unique(id, p._dest_node_id, _sender_id, - _state->be_number(), state, this); + _sink_buffer = p._sink_buffer; register_channels(_sink_buffer.get()); _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "ExchangeSinkQueueDependency", true); - _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + _sink_buffer->set_dependency(_queue_dependency, this); } if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared( _parent->operator_id(), _parent->node_id(), "BroadcastDependency", true); - _sink_buffer->set_broadcast_dependency(_broadcast_dependency); _broadcast_pb_mem_limiter = vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); } else if (local_size > 0) { @@ -368,6 +366,7 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { } RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); } + create_buffer(); return Status::OK(); } @@ -689,7 +688,7 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " "{}, queue dep: {}), _reach_limit: {}, working channels: {}", _sink_buffer->_is_finishing.load(), _sink_buffer->_total_queue_size, - _sink_buffer->_queue_capacity, (void*)_sink_buffer->_queue_dependency.get(), + _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(), _reach_limit.load(), _working_channels_count.load()); } return fmt::to_string(debug_string_buffer); @@ -743,4 +742,14 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { return DataSinkOperatorX::required_data_distribution(); } +void ExchangeSinkOperatorX::create_buffer() { + PUniqueId id; + id.set_hi(_state->query_id().hi); + id.set_lo(_state->query_id().lo); + _sink_buffer = std::make_unique(id, _dest_node_id, state()); + for (const auto& _dest : _dests) { + const auto& dest_fragment_instance_id = _dest.fragment_instance_id; + _sink_buffer->construct_request(dest_fragment_instance_id); + } +} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 63d50290005470..bb7e0f6dcbda20 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -89,6 +89,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { void set_reach_limit() { _reach_limit = true; }; [[nodiscard]] int sender_id() const { return _sender_id; } + [[nodiscard]] int be_number() const { return _state->be_number(); } std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; @@ -112,7 +113,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { friend class vectorized::Channel; friend class vectorized::BlockSerializer; - std::unique_ptr _sink_buffer = nullptr; + std::shared_ptr _sink_buffer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; RuntimeProfile::Counter* _bytes_sent_counter = nullptr; @@ -224,6 +225,9 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX>& channel2rows, vectorized::Block* block, bool eos); + + void create_buffer(); + std::shared_ptr _sink_buffer = nullptr; RuntimeState* _state = nullptr; const std::vector _texprs; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d14a0d0c3cd4a7..4682ad7858c77c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -260,7 +260,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state = RuntimeState::create_unique( request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - + _runtime_state->set_task_execution_context(shared_from_this()); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5fe35e4da119d0..a84d0451d71e42 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -168,7 +168,9 @@ class Channel { void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); + if (!_is_local) { + _buffer->register_sink(_fragment_instance_id.lo); + } } std::shared_ptr> get_send_callback( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index e4588844422057..7a017ae00ccbc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1483,7 +1483,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { public boolean enableCommonExprPushdown = true; @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED) - public boolean enableLocalExchange = true; + public boolean enableLocalExchange = false; /** * For debug purpose, don't merge unique key and agg key when reading data. @@ -2354,7 +2354,7 @@ public void initFuzzyModeVariables() { this.parallelPipelineTaskNum = random.nextInt(8); this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); - this.enableLocalExchange = random.nextBoolean(); + this.enableLocalExchange = false; this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean();