Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Nov 28, 2024
1 parent 51ad321 commit 05e59cc
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 131 deletions.
162 changes: 73 additions & 89 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
} // namespace vectorized

namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_is_failed(false),
_query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()),
_parent(parent) {}
_context(state->get_query_ctx()) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand All @@ -110,8 +106,8 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}

void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_is_finishing) {
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
if (_is_failed) {
return;
}
auto low_id = fragment_instance_id.lo;
Expand All @@ -129,24 +125,26 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
_rpc_channel_is_turn_off[low_id] = false;
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
_instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[low_id]->set_node_id(_dest_node_id);
}

Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (_is_finishing) {
if (_is_failed) {
return Status::OK();
}
auto ins_id = request.channel->_fragment_instance_id.lo;
auto ins_id = request.channel->ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]);
Expand All @@ -158,14 +156,17 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
auto* parent = request.channel->_parent;
COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(parent->peak_memory_usage_counter(),
parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
_queue_dependency->block();
if (_total_queue_size > _queue_capacity) {
for (auto dep : _queue_deps) {
dep->block();
}
}
}
if (send_now) {
Expand All @@ -176,17 +177,14 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
}

Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_is_finishing) {
if (_is_failed) {
return Status::OK();
}
auto ins_id = request.channel->_fragment_instance_id.lo;
auto ins_id = request.channel->ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]);
Expand All @@ -211,23 +209,27 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];

if (_is_finishing) {
if (_is_failed) {
_turn_off_channel(id, lock);
return Status::OK();
}
if (_instance_to_receiver_eof[id]) {
DCHECK(_rpc_channel_is_turn_off[id]);
return Status::OK();
}

if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
Expand Down Expand Up @@ -273,14 +275,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
}
}
// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
s = _send_rpc(id);
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
{
Expand All @@ -298,20 +302,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
if (_total_queue_size <= _queue_capacity) {
for (auto dep : _queue_deps) {
dep->set_ready();
}
}
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
Expand Down Expand Up @@ -354,14 +363,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
}
}

// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
s = _send_rpc(id);
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
{
Expand Down Expand Up @@ -389,16 +401,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}

void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[id]->set_node_id(_dest_node_id);
_instance_to_request[id]->set_sender_id(_sender_id);
_instance_to_request[id]->set_be_number(_be_number);
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
if (!_instance_to_package_queue_mutex.template contains(id)) {
std::stringstream ss;
Expand All @@ -413,48 +415,25 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_turn_off_channel(id, lock);
_running_sink_count[id]--;
if (_running_sink_count[id] == 0) {
_turn_off_channel(id, lock);
}
}
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_is_failed = true;
_context->cancel(Status::Cancelled(err));
}

void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
// When the receiving side reaches eof, it means the receiver has finished early.
// The remaining data in the current rpc_channel does not need to be sent,
// and the rpc_channel should be turned off immediately.
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
}
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}

// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
Expand All @@ -463,11 +442,16 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
}
_instance_to_receiver_eof[id] = true;

// Ensure that each RPC is turned off only once.
if (_rpc_channel_is_turn_off[id]) {
return;
}
_rpc_channel_is_turn_off[id] = true;
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
_parent->on_channel_finished(id);
for (auto* parent : _parents) {
parent->on_channel_finished(id);
}
}
}

Expand Down Expand Up @@ -511,7 +495,7 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1);
auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");

int64_t max_rpc_time = 0, min_rpc_time = 0;
Expand Down
Loading

0 comments on commit 05e59cc

Please sign in to comment.