Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test](enable back off in exchange) #44745

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <ostream>
#include <utility>

#include "common/logging.h"
#include "common/status.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/pipeline_fragment_context.h"
Expand Down Expand Up @@ -169,7 +170,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
}
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(ins_id));
RETURN_IF_ERROR(_send_rpc(ins_id, []() {}));
}

return Status::OK();
Expand Down Expand Up @@ -202,15 +203,15 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
_instance_to_broadcast_package_queue[ins_id].emplace(request);
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(ins_id));
RETURN_IF_ERROR(_send_rpc(ins_id, []() {}));
}

return Status::OK();
}

Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id, std::function<void()> pre_do) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

pre_do();
DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
Expand All @@ -227,9 +228,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_packet_seq(_instance_to_seq[id]);
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
brpc_request->mutable_block()->CopyFrom(*request.block);
}
if (!request.exec_status.ok()) {
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
Expand All @@ -249,7 +250,13 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(id, err);

LOG_WARNING("exchange send error").tag("err", err);
auto s = _send_rpc(id, []() {});
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand All @@ -276,7 +283,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
s = _send_rpc(id, [&, id]() {
_instance_to_seq[id]++;
_instance_to_package_queue[id].pop();
});
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
Expand All @@ -297,11 +307,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::move(send_remote_block_closure));
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
Expand All @@ -311,10 +316,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_packet_seq(_instance_to_seq[id]);
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
brpc_request->mutable_block()->CopyFrom(*request.block_holder->get_block());
}
auto send_callback = request.channel->get_send_callback(id, request.eos);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
Expand All @@ -330,7 +335,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(id, err);
LOG_WARNING("exchange send error").tag("err", err);
auto s = _send_rpc(id, []() {});
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand All @@ -357,7 +367,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
s = _send_rpc(id, [&, id]() {
_instance_to_seq[id]++;
_instance_to_broadcast_package_queue[id].pop();
});
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
Expand All @@ -378,10 +391,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::move(send_remote_block_closure));
}
}
if (request.block_holder->get_block()) {
static_cast<void>(brpc_request->release_block());
}
broadcast_q.pop();
} else {
_rpc_channel_is_idle[id] = true;
}
Expand Down
44 changes: 42 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -162,13 +163,52 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
}
int64_t start_rpc_time;

private:
protected:
std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
std::function<void(const InstanceLoId&, const bool&, const Response&, const int64_t&)> _suc_fn;
InstanceLoId _id;
bool _eos;
};

class MockExchangeSendCallback : public ExchangeSendCallback<PTransmitDataResult> {
ENABLE_FACTORY_CREATOR(MockExchangeSendCallback);

public:
MockExchangeSendCallback() = default;
~MockExchangeSendCallback() override = default;
MockExchangeSendCallback(const MockExchangeSendCallback& other) = delete;
MockExchangeSendCallback& operator=(const MockExchangeSendCallback& other) = delete;

void call() noexcept override {
try {
if (fake_fail()) {
_fail_fn(_id, "MockExchangeSendCallback fake error");
return;
}
if (::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->Failed()) {
std::string err = fmt::format(
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
"latency = {}",
berror(::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->ErrorCode()),
::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->ErrorText(),
BackendOptions::get_localhost(),
::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->latency_us());
_fail_fn(_id, err);
} else {
_suc_fn(_id, _eos, *(::doris::DummyBrpcCallback<PTransmitDataResult>::response_),
start_rpc_time);
}
} catch (const std::exception& exp) {
LOG(FATAL) << "brpc callback error: " << exp.what();
} catch (...) {
LOG(FATAL) << "brpc callback error.";
__builtin_unreachable();
}
}
inline static bool fake_fail() { return (flag++) % 5 == 0; }
inline static std::atomic_int64_t flag = 0;
};

// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
Expand Down Expand Up @@ -236,7 +276,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;

Status _send_rpc(InstanceLoId);
Status _send_rpc(InstanceLoId, std::function<void()> pre_do);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
void _construct_request(InstanceLoId id, PUniqueId);
inline void _ended(InstanceLoId id);
Expand Down
26 changes: 15 additions & 11 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,22 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
if (_is_cancelled) {
return Status::OK();
}
auto iter = _packet_seq_map.find(be_number);
if (iter != _packet_seq_map.end()) {
if (iter->second >= packet_seq) {
LOG(WARNING) << fmt::format(
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
iter->second, packet_seq);
return Status::OK();
}
iter->second = packet_seq;
} else {
_packet_seq_map.emplace(be_number, packet_seq);
// _packet_seq_map is initialized to -1, and each packet_seq must be exactly 1 greater than the value in _packet_seq_map.
if (!_packet_seq_map.contains(be_number)) {
_packet_seq_map.emplace(be_number, -1);
}
if (packet_seq != _packet_seq_map[be_number] + 1) {
// A block might be processed by the current queue but encounter an error when calling back the sender.
// In this case, the sender will resend the block.
COUNTER_UPDATE(_recvr->_duplicate_block_sender_counter, 1);
return Status::OK();
}

// LOG_WARNING("VDataStreamRecvr get block from")
// .tag("be_number", be_number)
// .tag("packet_seq", packet_seq);
_packet_seq_map[be_number]++;

DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
Expand Down Expand Up @@ -361,6 +364,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
_blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
_max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
_duplicate_block_sender_counter = ADD_COUNTER(_profile, "DuplicateBlockSender", TUnit::UNIT);
_max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
_max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrTime(NS)", TUnit::UNIT);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {

// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
RuntimeProfile::Counter* _duplicate_block_sender_counter = nullptr;
RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
RuntimeProfile::Counter* _max_wait_to_process_time = nullptr;
RuntimeProfile::Counter* _max_find_recvr_time = nullptr;
Expand Down Expand Up @@ -296,6 +297,9 @@ class VDataStreamRecvr::SenderQueue {
// sender_id
std::unordered_set<int> _sender_eos_set;
// be_number => packet_seq
// Used to record the packet_seq sent by each sender.
// Note that be_number can distinguish each sender.
// The packet_seq starts at 0 and increments sequentially (0, 1, 2, 3, 4, ...).
std::unordered_map<int, int64_t> _packet_seq_map;
std::deque<std::pair<google::protobuf::Closure*, MonotonicStopWatch>> _pending_closures;

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class Channel {
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback(
InstanceLoId id, bool eos) {
if (!_send_callback) {
_send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
_send_callback = pipeline::MockExchangeSendCallback::create_shared();
//_send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
} else {
_send_callback->cntl_->Reset();
}
Expand Down