Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Nov 13, 2024
1 parent 3d7785d commit 1c65114
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 87 deletions.
101 changes: 43 additions & 58 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
} // namespace vectorized

namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()),
_parent(parent) {}
_context(state->get_query_ctx()) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand All @@ -110,7 +106,7 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}

void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) {
if (_is_finishing) {
return;
}
Expand All @@ -132,7 +128,11 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
_instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
_instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[low_id]->set_node_id(_dest_node_id);
}

Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
Expand All @@ -158,14 +158,17 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
if (request.block) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
auto* parent = request.channel->_parent;
COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(parent->peak_memory_usage_counter(),
parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
_queue_dependency->block();
if (_total_queue_size > _queue_capacity) {
for (auto dep : _queue_deps) {
dep->block();
}
}
}
if (send_now) {
Expand Down Expand Up @@ -211,8 +214,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);

DCHECK(_rpc_channel_is_idle[id] == false);

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
Expand All @@ -228,6 +229,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
Expand Down Expand Up @@ -273,9 +276,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
}
{
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
Expand All @@ -298,20 +303,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
}
if (request.block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -request.block->ByteSizeLong());
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-request.block->ByteSizeLong());
static_cast<void>(brpc_request->release_block());
}
q.pop();
_total_queue_size--;
if (_queue_dependency && _total_queue_size <= _queue_capacity) {
_queue_dependency->set_ready();
if (_total_queue_size <= _queue_capacity) {
for (auto dep : _queue_deps) {
dep->set_ready();
}
}
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
Expand Down Expand Up @@ -354,9 +364,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
return;
} else if (eos) {
_ended(id);
} else {
}
{
s = _send_rpc(id);
if (!s) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}",
Expand Down Expand Up @@ -389,16 +401,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}

void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
_instance_to_request[id] = std::make_shared<PTransmitDataParams>();
_instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->mutable_query_id()->CopyFrom(_query_id);

_instance_to_request[id]->set_node_id(_dest_node_id);
_instance_to_request[id]->set_sender_id(_sender_id);
_instance_to_request[id]->set_be_number(_be_number);
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
if (!_instance_to_package_queue_mutex.template contains(id)) {
std::stringstream ss;
Expand All @@ -413,7 +415,10 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_turn_off_channel(id, lock);
_turn_off_count[id]--;
if (_turn_off_count[id] == 0) {
_turn_off_channel(id, lock);
}
}
}

Expand All @@ -424,31 +429,9 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {

void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
_turn_off_channel(id, lock);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(_parent->memory_used_counter(),
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
}
}
{
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, broadcast_q);
}

std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
if (q.front().block) {
COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong());
}
}

{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
_turn_off_count[id]--;
if (_turn_off_count[id] == 0) {
_turn_off_channel(id, lock);
}
}

Expand All @@ -467,7 +450,9 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,

auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
_parent->on_channel_finished(id);
for (auto* parent : _parents) {
parent->on_channel_finished(id);
}
}
}

Expand Down
33 changes: 14 additions & 19 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,13 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number,
RuntimeState* state, ExchangeSinkLocalState* parent);
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state);
~ExchangeSinkBuffer() override = default;
void register_sink(TUniqueId);
void register_sink(InstanceLoId id) {
std::lock_guard lc(_lock);
_turn_off_count[id]++;
}
void construct_request(TUniqueId);

Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
Expand All @@ -184,17 +187,15 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
void update_profile(RuntimeProfile* profile);

void set_dependency(std::shared_ptr<Dependency> queue_dependency,
std::shared_ptr<Dependency> finish_dependency) {
_queue_dependency = queue_dependency;
_finish_dependency = finish_dependency;
}

void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
ExchangeSinkLocalState* local_state) {
std::lock_guard lc(_lock);
_queue_deps.push_back(queue_dependency);
_parents.push_back(local_state);
}

private:
friend class ExchangeSinkLocalState;
phmap::flat_hash_map<InstanceLoId, int64_t> _turn_off_count;

phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
Expand Down Expand Up @@ -229,16 +230,11 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
PlanNodeId _dest_node_id;
// Sender instance id, unique within a fragment. StreamSender save the variable
int _sender_id;
int _be_number;
std::atomic<int64_t> _rpc_count = 0;
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;

Status _send_rpc(InstanceLoId);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
void _construct_request(InstanceLoId id, PUniqueId);
inline void _ended(InstanceLoId id);
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
Expand All @@ -248,10 +244,9 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
int64_t get_sum_rpc_time();

std::atomic<int> _total_queue_size = 0;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
ExchangeSinkLocalState* _parent = nullptr;
std::vector<std::shared_ptr<Dependency>> _queue_deps;
std::vector<ExchangeSinkLocalState*> _parents;
std::mutex _lock;
};

} // namespace pipeline
Expand Down
19 changes: 14 additions & 5 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id.set_lo(_state->query_id().lo);

if (!only_local_exchange) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);
_sink_buffer = p._sink_buffer;
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_sink_buffer->set_dependency(_queue_dependency, this);
}

if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "BroadcastDependency", true);
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
_broadcast_pb_mem_limiter =
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
} else if (local_size > 0) {
Expand Down Expand Up @@ -368,6 +366,7 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) {
}
RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state));
}
create_buffer();
return Status::OK();
}

Expand Down Expand Up @@ -689,7 +688,7 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: "
"{}, queue dep: {}), _reach_limit: {}, working channels: {}",
_sink_buffer->_is_finishing.load(), _sink_buffer->_total_queue_size,
_sink_buffer->_queue_capacity, (void*)_sink_buffer->_queue_dependency.get(),
_sink_buffer->_queue_capacity, (void*)_queue_dependency.get(),
_reach_limit.load(), _working_channels_count.load());
}
return fmt::to_string(debug_string_buffer);
Expand Down Expand Up @@ -743,4 +742,14 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
return DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
}

void ExchangeSinkOperatorX::create_buffer() {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, state());
for (const auto& _dest : _dests) {
const auto& dest_fragment_instance_id = _dest.fragment_instance_id;
_sink_buffer->construct_request(dest_fragment_instance_id);
}
}
} // namespace doris::pipeline
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
void set_reach_limit() { _reach_limit = true; };

[[nodiscard]] int sender_id() const { return _sender_id; }
[[nodiscard]] int be_number() const { return _state->be_number(); }

std::string name_suffix() override;
segment_v2::CompressionTypePB compression_type() const;
Expand All @@ -112,7 +113,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
friend class vectorized::Channel;
friend class vectorized::BlockSerializer;

std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
RuntimeProfile::Counter* _compress_timer = nullptr;
RuntimeProfile::Counter* _bytes_sent_counter = nullptr;
Expand Down Expand Up @@ -224,6 +225,9 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
size_t num_channels,
std::vector<std::vector<uint32_t>>& channel2rows,
vectorized::Block* block, bool eos);

void create_buffer();
std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeState* _state = nullptr;

const std::vector<TExpr> _texprs;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state = RuntimeState::create_unique(
request.query_id, request.fragment_id, request.query_options,
_query_ctx->query_globals, _exec_env, _query_ctx.get());

_runtime_state->set_task_execution_context(shared_from_this());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ class Channel {

void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer;
_buffer->register_sink(_fragment_instance_id);
if (!_is_local) {
_buffer->register_sink(_fragment_instance_id.lo);
}
}

std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback(
Expand Down
Loading

0 comments on commit 1c65114

Please sign in to comment.