From f79f138bd11e548df3c422e483a35122fffe86d6 Mon Sep 17 00:00:00 2001 From: Mryange Date: Thu, 28 Nov 2024 19:08:17 +0800 Subject: [PATCH] test --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 51 +++++++++++-------- be/src/pipeline/exec/exchange_sink_buffer.h | 44 +++++++++++++++- be/src/vec/runtime/vdata_stream_recvr.cpp | 26 ++++++---- be/src/vec/runtime/vdata_stream_recvr.h | 4 ++ be/src/vec/sink/vdata_stream_sender.h | 3 +- 5 files changed, 93 insertions(+), 35 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0f02ffc2b9a4b1..515dcac148b76b 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -36,6 +36,7 @@ #include #include +#include "common/logging.h" #include "common/status.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/pipeline_fragment_context.h" @@ -169,7 +170,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { } } if (send_now) { - RETURN_IF_ERROR(_send_rpc(ins_id)); + RETURN_IF_ERROR(_send_rpc(ins_id, []() {})); } return Status::OK(); @@ -202,15 +203,15 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { _instance_to_broadcast_package_queue[ins_id].emplace(request); } if (send_now) { - RETURN_IF_ERROR(_send_rpc(ins_id)); + RETURN_IF_ERROR(_send_rpc(ins_id, []() {})); } return Status::OK(); } -Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { +Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id, std::function pre_do) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - + pre_do(); DCHECK(_rpc_channel_is_idle[id] == false); std::queue>& q = _instance_to_package_queue[id]; @@ -227,9 +228,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& request = q.front(); auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_packet_seq(_instance_to_seq[id]); if (request.block && !request.block->column_metas().empty()) { - brpc_request->set_allocated_block(request.block.get()); + brpc_request->mutable_block()->CopyFrom(*request.block); } if (!request.exec_status.ok()) { request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); @@ -249,7 +250,13 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - _failed(id, err); + + LOG_WARNING("exchange send error").tag("err", err); + auto s = _send_rpc(id, []() {}); + if (!s) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + } }); send_callback->start_rpc_time = GetCurrentTimeNanos(); send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( @@ -276,7 +283,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (eos) { _ended(id); } else { - s = _send_rpc(id); + s = _send_rpc(id, [&, id]() { + _instance_to_seq[id]++; + _instance_to_package_queue[id].pop(); + }); if (!s) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); @@ -297,11 +307,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::move(send_remote_block_closure)); } } - if (request.block) { - COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong()); - static_cast(brpc_request->release_block()); - } - q.pop(); _total_queue_size--; if (_queue_dependency && _total_queue_size <= _queue_capacity) { _queue_dependency->set_ready(); @@ -311,10 +316,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& request = broadcast_q.front(); auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_packet_seq(_instance_to_seq[id]); if (request.block_holder->get_block() && !request.block_holder->get_block()->column_metas().empty()) { - brpc_request->set_allocated_block(request.block_holder->get_block()); + brpc_request->mutable_block()->CopyFrom(*request.block_holder->get_block()); } auto send_callback = request.channel->get_send_callback(id, request.eos); send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); @@ -330,7 +335,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - _failed(id, err); + LOG_WARNING("exchange send error").tag("err", err); + auto s = _send_rpc(id, []() {}); + if (!s) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + } }); send_callback->start_rpc_time = GetCurrentTimeNanos(); send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( @@ -357,7 +367,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (eos) { _ended(id); } else { - s = _send_rpc(id); + s = _send_rpc(id, [&, id]() { + _instance_to_seq[id]++; + _instance_to_broadcast_package_queue[id].pop(); + }); if (!s) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); @@ -378,10 +391,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::move(send_remote_block_closure)); } } - if (request.block_holder->get_block()) { - static_cast(brpc_request->release_block()); - } - broadcast_q.pop(); } else { _rpc_channel_is_idle[id] = true; } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 22a1452f8d545c..ae4afda8231d97 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -162,13 +163,52 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback { } int64_t start_rpc_time; -private: +protected: std::function _fail_fn; std::function _suc_fn; InstanceLoId _id; bool _eos; }; +class MockExchangeSendCallback : public ExchangeSendCallback { + ENABLE_FACTORY_CREATOR(MockExchangeSendCallback); + +public: + MockExchangeSendCallback() = default; + ~MockExchangeSendCallback() override = default; + MockExchangeSendCallback(const MockExchangeSendCallback& other) = delete; + MockExchangeSendCallback& operator=(const MockExchangeSendCallback& other) = delete; + + void call() noexcept override { + try { + if (fake_fail()) { + _fail_fn(_id, "MockExchangeSendCallback fake error"); + return; + } + if (::doris::DummyBrpcCallback::cntl_->Failed()) { + std::string err = fmt::format( + "failed to send brpc when exchange, error={}, error_text={}, client: {}, " + "latency = {}", + berror(::doris::DummyBrpcCallback::cntl_->ErrorCode()), + ::doris::DummyBrpcCallback::cntl_->ErrorText(), + BackendOptions::get_localhost(), + ::doris::DummyBrpcCallback::cntl_->latency_us()); + _fail_fn(_id, err); + } else { + _suc_fn(_id, _eos, *(::doris::DummyBrpcCallback::response_), + start_rpc_time); + } + } catch (const std::exception& exp) { + LOG(FATAL) << "brpc callback error: " << exp.what(); + } catch (...) { + LOG(FATAL) << "brpc callback error."; + __builtin_unreachable(); + } + } + inline static bool fake_fail() { return (flag++) % 5 == 0; } + inline static std::atomic_int64_t flag = 0; +}; + // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: @@ -236,7 +276,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { RuntimeState* _state = nullptr; QueryContext* _context = nullptr; - Status _send_rpc(InstanceLoId); + Status _send_rpc(InstanceLoId, std::function pre_do); // must hold the _instance_to_package_queue_mutex[id] mutex to opera void _construct_request(InstanceLoId id, PUniqueId); inline void _ended(InstanceLoId id); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 81e4e1cd5f037e..e293b910169fa3 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -150,19 +150,22 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr pblock, if (_is_cancelled) { return Status::OK(); } - auto iter = _packet_seq_map.find(be_number); - if (iter != _packet_seq_map.end()) { - if (iter->second >= packet_seq) { - LOG(WARNING) << fmt::format( - "packet already exist [cur_packet_id= {} receive_packet_id={}]", - iter->second, packet_seq); - return Status::OK(); - } - iter->second = packet_seq; - } else { - _packet_seq_map.emplace(be_number, packet_seq); + // _packet_seq_map is initialized to -1, and each packet_seq must be exactly 1 greater than the value in _packet_seq_map. + if (!_packet_seq_map.contains(be_number)) { + _packet_seq_map.emplace(be_number, -1); + } + if (packet_seq != _packet_seq_map[be_number] + 1) { + // A block might be processed by the current queue but encounter an error when calling back the sender. + // In this case, the sender will resend the block. + COUNTER_UPDATE(_recvr->_duplicate_block_sender_counter, 1); + return Status::OK(); } + // LOG_WARNING("VDataStreamRecvr get block from") + // .tag("be_number", be_number) + // .tag("packet_seq", packet_seq); + _packet_seq_map[be_number]++; + DCHECK(_num_remaining_senders >= 0); if (_num_remaining_senders == 0) { DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); @@ -361,6 +364,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES); _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT); _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT); + _duplicate_block_sender_counter = ADD_COUNTER(_profile, "DuplicateBlockSender", TUnit::UNIT); _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT); _max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrTime(NS)", TUnit::UNIT); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 1639366c8b83d6..7c0bb811f0b075 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -160,6 +160,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { // Number of blocks received RuntimeProfile::Counter* _blocks_produced_counter = nullptr; + RuntimeProfile::Counter* _duplicate_block_sender_counter = nullptr; RuntimeProfile::Counter* _max_wait_worker_time = nullptr; RuntimeProfile::Counter* _max_wait_to_process_time = nullptr; RuntimeProfile::Counter* _max_find_recvr_time = nullptr; @@ -296,6 +297,9 @@ class VDataStreamRecvr::SenderQueue { // sender_id std::unordered_set _sender_eos_set; // be_number => packet_seq + // Used to record the packet_seq sent by each sender. + // Note that be_number can distinguish each sender. + // The packet_seq starts at 0 and increments sequentially (0, 1, 2, 3, 4, ...). std::unordered_map _packet_seq_map; std::deque> _pending_closures; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5fe35e4da119d0..c607da9df49f27 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -174,7 +174,8 @@ class Channel { std::shared_ptr> get_send_callback( InstanceLoId id, bool eos) { if (!_send_callback) { - _send_callback = pipeline::ExchangeSendCallback::create_shared(); + _send_callback = pipeline::MockExchangeSendCallback::create_shared(); + //_send_callback = pipeline::ExchangeSendCallback::create_shared(); } else { _send_callback->cntl_->Reset(); }