Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Nov 22, 2024
1 parent 127e597 commit 5cda382
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 135 deletions.
117 changes: 48 additions & 69 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
} // namespace vectorized

namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()),
_parent(parent) {}
_context(state->get_query_ctx()) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand All @@ -110,7 +106,7 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}

void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
if (_is_finishing) {
return;
}
Expand All @@ -132,7 +128,11 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
_instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[low_id]->set_node_id(_dest_node_id);
}

Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
Expand All @@ -144,9 +144,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]);
Expand All @@ -158,14 +155,17 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
auto* parent = request.channel->_parent;
COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(parent->peak_memory_usage_counter(),
parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
_queue_dependency->block();
if (_total_queue_size > _queue_capacity) {
for (auto dep : _queue_deps) {
dep->block();
}
}
}
if (send_now) {
Expand All @@ -184,9 +184,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
return Status::InternalError("fragment_instance_id {} not do register_sink",
print_id(request.channel->_fragment_instance_id));
}
if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]);
Expand All @@ -211,8 +208,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
Expand All @@ -221,13 +216,18 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_turn_off_channel(id, lock);
return Status::OK();
}
if (_instance_to_receiver_eof[id]) {
return Status::OK();
}

if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
Expand Down Expand Up @@ -273,9 +273,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
}
{
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
Expand All @@ -298,20 +300,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
if (_total_queue_size <= _queue_capacity) {
for (auto dep : _queue_deps) {
dep->set_ready();
}
}
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
Expand Down Expand Up @@ -354,9 +361,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
}
{
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
Expand Down Expand Up @@ -389,16 +398,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}

void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[id]->set_node_id(_dest_node_id);
_instance_to_request[id]->set_sender_id(_sender_id);
_instance_to_request[id]->set_be_number(_be_number);
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
if (!_instance_to_package_queue_mutex.template contains(id)) {
std::stringstream ss;
Expand All @@ -413,7 +412,13 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_turn_off_channel(id, lock);
_turn_off_count[id]--;
if (_instance_to_receiver_eof[id]) {
return;
}
if (_turn_off_count[id] == 0) {
_turn_off_channel(id, lock);
}
}
}

Expand All @@ -425,49 +430,23 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
if (_turn_off_count[id] > 0) {
_turn_off_channel(id, lock);
}
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}

// The unused parameter `with_lock` is to ensure that the function is called when the lock is held.
void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
std::unique_lock<std::mutex>& /*with_lock*/) {
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
}
_instance_to_receiver_eof[id] = true;

auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
_parent->on_channel_finished(id);
for (auto* parent : _parents) {
parent->on_channel_finished(id);
}
}
}

Expand Down
34 changes: 14 additions & 20 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,13 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number,
RuntimeState* state, ExchangeSinkLocalState* parent);
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state);
~ExchangeSinkBuffer() override = default;
void register_sink(TUniqueId);
void register_sink(InstanceLoId id) {
std::lock_guard lc(_lock);
_turn_off_count[id]++;
}
void construct_request(TUniqueId);

Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
Expand All @@ -184,17 +187,15 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
void update_profile(RuntimeProfile* profile);

void set_dependency(std::shared_ptr<Dependency> queue_dependency,
std::shared_ptr<Dependency> finish_dependency) {
_queue_dependency = queue_dependency;
_finish_dependency = finish_dependency;
}

void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
ExchangeSinkLocalState* local_state) {
std::lock_guard lc(_lock);
_queue_deps.push_back(queue_dependency);
_parents.push_back(local_state);
}

private:
friend class ExchangeSinkLocalState;
phmap::flat_hash_map<InstanceLoId, int64_t> _turn_off_count;

phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
Expand Down Expand Up @@ -229,29 +230,22 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
PlanNodeId _dest_node_id;
// Sender instance id, unique within a fragment. StreamSender save the variable
int _sender_id;
int _be_number;
std::atomic<int64_t> _rpc_count = 0;
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;

Status _send_rpc(InstanceLoId);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
void _construct_request(InstanceLoId id, PUniqueId);
inline void _ended(InstanceLoId id);
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
inline void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();

std::atomic<int> _total_queue_size = 0;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
ExchangeSinkLocalState* _parent = nullptr;
std::vector<std::shared_ptr<Dependency>> _queue_deps;
std::vector<ExchangeSinkLocalState*> _parents;
std::mutex _lock;
};

} // namespace pipeline
Expand Down
Loading

0 comments on commit 5cda382

Please sign in to comment.