Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Shared sink buffer test] #44728

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 71 additions & 87 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
} // namespace vectorized

namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_is_failed(false),
_query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()),
_parent(parent) {}
_context(state->get_query_ctx()) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand All @@ -110,8 +106,8 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}

void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_is_finishing) {
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
if (_is_failed) {
return;
}
auto low_id = fragment_instance_id.lo;
Expand All @@ -129,24 +125,26 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
_rpc_channel_is_turn_off[low_id] = false;
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
_instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[low_id]->set_node_id(_dest_node_id);
}

Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (_is_finishing) {
if (_is_failed) {
return Status::OK();
}
auto ins_id = request.channel->_fragment_instance_id.lo;
auto ins_id = request.channel->ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]);
Expand All @@ -158,12 +156,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
auto* parent = request.channel->_parent;
COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
_queue_dependency->block();
if (_total_queue_size > _queue_capacity) {
for (auto dep : _queue_deps) {
dep->block();
}
}
}
if (send_now) {
Expand All @@ -174,17 +175,14 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
}

Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_is_finishing) {
if (_is_failed) {
return Status::OK();
}
auto ins_id = request.channel->_fragment_instance_id.lo;
auto ins_id = request.channel->ins_id();
if (!_instance_to_package_queue_mutex.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]);
Expand All @@ -209,23 +207,27 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];

if (_is_finishing) {
if (_is_failed) {
_turn_off_channel(id, lock);
return Status::OK();
}
if (_instance_to_receiver_eof[id]) {
DCHECK(_rpc_channel_is_turn_off[id]);
return Status::OK();
}

if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
Expand Down Expand Up @@ -271,14 +273,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
}
}
// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
s = _send_rpc(id);
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
{
Expand All @@ -296,20 +300,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
if (_total_queue_size <= _queue_capacity) {
for (auto dep : _queue_deps) {
dep->set_ready();
}
}
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
Expand Down Expand Up @@ -352,14 +361,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
s.to_string()));
}
}

// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
s = _send_rpc(id);
if (!s) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
}
});
{
Expand Down Expand Up @@ -387,16 +399,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}

void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[id]->set_node_id(_dest_node_id);
_instance_to_request[id]->set_sender_id(_sender_id);
_instance_to_request[id]->set_be_number(_be_number);
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
if (!_instance_to_package_queue_mutex.template contains(id)) {
std::stringstream ss;
Expand All @@ -411,48 +413,25 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_turn_off_channel(id, lock);
_running_sink_count[id]--;
if (_running_sink_count[id] == 0) {
_turn_off_channel(id, lock);
}
}
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_is_failed = true;
_context->cancel(Status::Cancelled(err));
}

void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
// When the receiving side reaches eof, it means the receiver has finished early.
// The remaining data in the current rpc_channel does not need to be sent,
// and the rpc_channel should be turned off immediately.
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
}
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}

// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
Expand All @@ -461,11 +440,16 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
}
_instance_to_receiver_eof[id] = true;

// Ensure that each RPC is turned off only once.
if (_rpc_channel_is_turn_off[id]) {
return;
}
_rpc_channel_is_turn_off[id] = true;
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
_parent->on_channel_finished(id);
for (auto* parent : _parents) {
parent->on_channel_finished(id);
}
}
}

Expand Down Expand Up @@ -509,7 +493,7 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount", TUnit::UNIT, 1);
auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");

int64_t max_rpc_time = 0, min_rpc_time = 0;
Expand Down
Loading