Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Nov 28, 2024
1 parent a4d9aaa commit f79f138
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 35 deletions.
51 changes: 30 additions & 21 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <ostream>
#include <utility>

#include "common/logging.h"
#include "common/status.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/pipeline_fragment_context.h"
Expand Down Expand Up @@ -169,7 +170,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
}
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(ins_id));
RETURN_IF_ERROR(_send_rpc(ins_id, []() {}));
}

return Status::OK();
Expand Down Expand Up @@ -202,15 +203,15 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
_instance_to_broadcast_package_queue[ins_id].emplace(request);
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(ins_id));
RETURN_IF_ERROR(_send_rpc(ins_id, []() {}));
}

return Status::OK();
}

Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id, std::function<void()> pre_do) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

pre_do();
DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
Expand All @@ -227,9 +228,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_packet_seq(_instance_to_seq[id]);
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
brpc_request->mutable_block()->CopyFrom(*request.block);
}
if (!request.exec_status.ok()) {
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
Expand All @@ -249,7 +250,13 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(id, err);

LOG_WARNING("exchange send error").tag("err", err);
auto s = _send_rpc(id, []() {});
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand All @@ -276,7 +283,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
s = _send_rpc(id, [&, id]() {
_instance_to_seq[id]++;
_instance_to_package_queue[id].pop();
});
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
Expand All @@ -297,11 +307,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::move(send_remote_block_closure));
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
Expand All @@ -311,10 +316,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_packet_seq(_instance_to_seq[id]);
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
brpc_request->mutable_block()->CopyFrom(*request.block_holder->get_block());
}
auto send_callback = request.channel->get_send_callback(id, request.eos);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
Expand All @@ -330,7 +335,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(id, err);
LOG_WARNING("exchange send error").tag("err", err);
auto s = _send_rpc(id, []() {});
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand All @@ -357,7 +367,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
s = _send_rpc(id, [&, id]() {
_instance_to_seq[id]++;
_instance_to_broadcast_package_queue[id].pop();
});
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
Expand All @@ -378,10 +391,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::move(send_remote_block_closure));
}
}
if (request.block_holder->get_block()) {
static_cast<void>(brpc_request->release_block());
}
broadcast_q.pop();
} else {
_rpc_channel_is_idle[id] = true;
}
Expand Down
44 changes: 42 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -162,13 +163,52 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
}
int64_t start_rpc_time;

private:
protected:
std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
std::function<void(const InstanceLoId&, const bool&, const Response&, const int64_t&)> _suc_fn;
InstanceLoId _id;
bool _eos;
};

class MockExchangeSendCallback : public ExchangeSendCallback<PTransmitDataResult> {
ENABLE_FACTORY_CREATOR(MockExchangeSendCallback);

public:
MockExchangeSendCallback() = default;
~MockExchangeSendCallback() override = default;
MockExchangeSendCallback(const MockExchangeSendCallback& other) = delete;
MockExchangeSendCallback& operator=(const MockExchangeSendCallback& other) = delete;

void call() noexcept override {
try {
if (fake_fail()) {
_fail_fn(_id, "MockExchangeSendCallback fake error");
return;
}
if (::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->Failed()) {
std::string err = fmt::format(
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
"latency = {}",
berror(::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->ErrorCode()),
::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->ErrorText(),
BackendOptions::get_localhost(),
::doris::DummyBrpcCallback<PTransmitDataResult>::cntl_->latency_us());
_fail_fn(_id, err);
} else {
_suc_fn(_id, _eos, *(::doris::DummyBrpcCallback<PTransmitDataResult>::response_),
start_rpc_time);
}
} catch (const std::exception& exp) {
LOG(FATAL) << "brpc callback error: " << exp.what();
} catch (...) {
LOG(FATAL) << "brpc callback error.";
__builtin_unreachable();
}
}
inline static bool fake_fail() { return (flag++) % 5 == 0; }
inline static std::atomic_int64_t flag = 0;
};

// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
Expand Down Expand Up @@ -236,7 +276,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;

Status _send_rpc(InstanceLoId);
Status _send_rpc(InstanceLoId, std::function<void()> pre_do);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
void _construct_request(InstanceLoId id, PUniqueId);
inline void _ended(InstanceLoId id);
Expand Down
26 changes: 15 additions & 11 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,22 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
if (_is_cancelled) {
return Status::OK();
}
auto iter = _packet_seq_map.find(be_number);
if (iter != _packet_seq_map.end()) {
if (iter->second >= packet_seq) {
LOG(WARNING) << fmt::format(
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
iter->second, packet_seq);
return Status::OK();
}
iter->second = packet_seq;
} else {
_packet_seq_map.emplace(be_number, packet_seq);
// _packet_seq_map is initialized to -1, and each packet_seq must be exactly 1 greater than the value in _packet_seq_map.
if (!_packet_seq_map.contains(be_number)) {
_packet_seq_map.emplace(be_number, -1);
}
if (packet_seq != _packet_seq_map[be_number] + 1) {
// A block might be processed by the current queue but encounter an error when calling back the sender.
// In this case, the sender will resend the block.
COUNTER_UPDATE(_recvr->_duplicate_block_sender_counter, 1);
return Status::OK();
}

// LOG_WARNING("VDataStreamRecvr get block from")
// .tag("be_number", be_number)
// .tag("packet_seq", packet_seq);
_packet_seq_map[be_number]++;

DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
Expand Down Expand Up @@ -361,6 +364,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
_blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
_max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
_duplicate_block_sender_counter = ADD_COUNTER(_profile, "DuplicateBlockSender", TUnit::UNIT);
_max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
_max_find_recvr_time = ADD_COUNTER(_profile, "MaxFindRecvrTime(NS)", TUnit::UNIT);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {

// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
RuntimeProfile::Counter* _duplicate_block_sender_counter = nullptr;
RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
RuntimeProfile::Counter* _max_wait_to_process_time = nullptr;
RuntimeProfile::Counter* _max_find_recvr_time = nullptr;
Expand Down Expand Up @@ -296,6 +297,9 @@ class VDataStreamRecvr::SenderQueue {
// sender_id
std::unordered_set<int> _sender_eos_set;
// be_number => packet_seq
// Used to record the packet_seq sent by each sender.
// Note that be_number can distinguish each sender.
// The packet_seq starts at 0 and increments sequentially (0, 1, 2, 3, 4, ...).
std::unordered_map<int, int64_t> _packet_seq_map;
std::deque<std::pair<google::protobuf::Closure*, MonotonicStopWatch>> _pending_closures;

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class Channel {
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback(
InstanceLoId id, bool eos) {
if (!_send_callback) {
_send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
_send_callback = pipeline::MockExchangeSendCallback::create_shared();
//_send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
} else {
_send_callback->cntl_->Reset();
}
Expand Down

0 comments on commit f79f138

Please sign in to comment.