Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Dec 5, 2024
1 parent f1f9758 commit fcb52aa
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 117 deletions.
144 changes: 82 additions & 62 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,22 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
} // namespace vectorized

namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state,
const std::vector<InstanceLoId>& sender_ins_ids)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_is_failed(false),
_query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()),
_parent(parent) {}
_exchange_sink_num(sender_ins_ids.size()) {
for (auto sender_ins_id : sender_ins_ids) {
_queue_deps.emplace(sender_ins_id, nullptr);
_parents.emplace(sender_ins_id, nullptr);
}
}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand All @@ -110,8 +113,8 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}

void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_is_finishing) {
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
if (_is_failed) {
return;
}
auto low_id = fragment_instance_id.lo;
Expand All @@ -129,22 +132,27 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
_instance_to_receiver_eof[low_id] = false;
_rpc_channel_is_turn_off[low_id] = false;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
_instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[low_id]->set_node_id(_dest_node_id);
_running_sink_count[low_id] = _exchange_sink_num;
}

Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (_is_finishing) {
if (_is_failed) {
return Status::OK();
}
auto ins_id = request.channel->_fragment_instance_id.lo;
auto ins_id = request.channel->dest_ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
if (_rpc_channel_is_turn_off[ins_id]) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
Expand All @@ -158,12 +166,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
request.block->ByteSizeLong());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
_queue_dependency->block();
if (_total_queue_size > _queue_capacity) {
for (auto& [_, dep] : _queue_deps) {
dep->block();
}
}
}
if (send_now) {
Expand All @@ -174,15 +185,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
}

Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_is_finishing) {
if (_is_failed) {
return Status::OK();
}
auto ins_id = request.channel->_fragment_instance_id.lo;
auto ins_id = request.channel->dest_ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
if (_rpc_channel_is_turn_off[ins_id]) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
Expand All @@ -209,23 +220,26 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];

if (_is_finishing) {
if (_is_failed) {
_turn_off_channel(id, lock);
return Status::OK();
}
if (_rpc_channel_is_turn_off[id]) {
return Status::OK();
}

if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
Expand Down Expand Up @@ -271,14 +285,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
}
}
// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
s = _send_rpc(id);
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
{
Expand All @@ -296,20 +312,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
if (_total_queue_size <= _queue_capacity) {
for (auto& [_, dep] : _queue_deps) {
dep->set_ready();
}
}
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
Expand Down Expand Up @@ -352,14 +373,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
}
}

// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
s = _send_rpc(id);
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
{
Expand Down Expand Up @@ -387,16 +411,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}

void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[id]->set_node_id(_dest_node_id);
_instance_to_request[id]->set_sender_id(_sender_id);
_instance_to_request[id]->set_be_number(_be_number);
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
if (!_instance_to_package_queue_mutex.template contains(id)) {
std::stringstream ss;
Expand All @@ -411,24 +425,29 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_turn_off_channel(id, lock);
_running_sink_count[id]--;
if (_running_sink_count[id] == 0) {
_turn_off_channel(id, lock);
}
}
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_is_failed = true;
_context->cancel(Status::Cancelled(err));
}

void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
// When the receiving side reaches eof, it means the receiver has finished early.
// The remaining data in the current rpc_channel does not need to be sent,
// and the rpc_channel should be turned off immediately.
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
Expand All @@ -440,7 +459,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(),
-q.front().block->ByteSizeLong());
}
}

Expand All @@ -450,22 +470,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
}
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}

// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
std::unique_lock<std::mutex>& /*with_lock*/) {
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
}
_instance_to_receiver_eof[id] = true;

// Ensure that each RPC is turned off only once.
if (_rpc_channel_is_turn_off[id]) {
return;
}
_rpc_channel_is_turn_off[id] = true;
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
_parent->on_channel_finished(id);
for (auto& [_, parent] : _parents) {
parent->on_channel_finished(id);
}
}
}

Expand Down Expand Up @@ -509,7 +529,7 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1);
auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");

int64_t max_rpc_time = 0, min_rpc_time = 0;
Expand Down
Loading

0 comments on commit fcb52aa

Please sign in to comment.