-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature](exchange) enable shared exchange sink buffer to reduce RPC concurrency. #44850
[feature](exchange) enable shared exchange sink buffer to reduce RPC concurrency. #44850
Conversation
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
run buildall |
TeamCity be ut coverage result: |
} | ||
|
||
std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() { | ||
if (_child) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is _child
always true?
_buffer = buffer; | ||
_buffer->register_sink(_fragment_instance_id); | ||
} | ||
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change register_exchange_buffer
to set_exchange_buffer
} | ||
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } | ||
|
||
InstanceLoId ins_id() const { return _fragment_instance_id.lo; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InstanceLoId ins_id() const { return _fragment_instance_id.lo; } | |
InstanceLoId ins_lo_id() const { return _fragment_instance_id.lo; } |
~ExchangeSinkBuffer() override = default; | ||
void register_sink(TUniqueId); | ||
void register_sink(InstanceLoId id) { | ||
std::lock_guard lc(_init_lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this lock.
void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) { | ||
_broadcast_dependency = broadcast_dependency; | ||
ExchangeSinkLocalState* local_state) { | ||
std::lock_guard lc(_init_lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to reduce locking scope
33eb739
to
35209a5
Compare
run buildall |
1 similar comment
run buildall |
TeamCity be ut coverage result: |
run buildall |
TeamCity be ut coverage result: |
be/src/runtime/runtime_state.h
Outdated
@@ -317,6 +317,12 @@ class RuntimeState { | |||
|
|||
int per_fragment_instance_idx() const { return _per_fragment_instance_idx; } | |||
|
|||
const pipeline::PipelineFragmentContext* fragment_context() const { return _fragment_context; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get this from pipeline task
36ad1de
to
db66a83
Compare
run buildall |
db66a83
to
c783418
Compare
run buildall |
TeamCity be ut coverage result: |
c783418
to
fcb52aa
Compare
run buildall |
run beut |
TeamCity be ut coverage result: |
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
fcb52aa
to
d02232c
Compare
run buildall |
run buildall |
TeamCity be ut coverage result: |
throw doris::Exception(ErrorCode::INTERNAL_ERROR, | ||
"ExchangeSinkOperatorX did not correctly set the child."); | ||
} | ||
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better if (_state->enable_shared_exchange_sink_buffer() && ! std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) && !std::dynamic_pointer_cast<SortSourceOperatorX>(_child))
And add enough comment for the code
|
||
// _running_sink_count is used to track how many sinks have not finished yet. | ||
// It is only decremented when eos is reached. | ||
phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rethink should we union the maps. seems all maps same key?
@@ -271,14 +285,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { | |||
} else if (!s.ok()) { | |||
_failed(id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should union the failed check?
run buildall |
TeamCity be ut coverage result: |
PR approved by at least one committer and no changes requested. |
…concurrency. (apache#44850) In the past, each exchange sink had its own sink buffer. If the query concurrency is n, there would be n * n RPCs running concurrently in a typical shuffle scenario (each sender instance can send data to all downstream instances). Here, we introduce support for shared sink buffers. This does not reduce the total number of RPCs but can limit the number of concurrent RPCs.
What problem does this PR solve?
In the past, each exchange sink had its own sink buffer.
If the query concurrency is n, there would be n * n RPCs running concurrently
in a typical shuffle scenario (each sender instance can send data to all downstream instances).
Here, we introduce support for shared sink buffers.
This does not reduce the total number of RPCs but can limit the number of concurrent RPCs.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)