Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](exchange) enable shared exchange sink buffer to reduce RPC concurrency. #44850

Merged
merged 3 commits into from
Dec 9, 2024

Conversation

Mryange
Copy link
Contributor

@Mryange Mryange commented Dec 2, 2024

What problem does this PR solve?

In the past, each exchange sink had its own sink buffer.
If the query concurrency is n, there would be n * n RPCs running concurrently
in a typical shuffle scenario (each sender instance can send data to all downstream instances).
Here, we introduce support for shared sink buffers.
This does not reduce the total number of RPCs but can limit the number of concurrent RPCs.

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@doris-robot
Copy link

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@Mryange
Copy link
Contributor Author

Mryange commented Dec 2, 2024

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.39% (9978/25992)
Line Coverage: 29.46% (83688/284085)
Region Coverage: 28.59% (43068/150628)
Branch Coverage: 25.20% (21890/86852)
Coverage Report: http://coverage.selectdb-in.cc/coverage/33eb7399ce6330553fcaeeacc1e1a6ad8139540c_33eb7399ce6330553fcaeeacc1e1a6ad8139540c/report/index.html

}

std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() {
if (_child) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _child always true?

be/src/pipeline/exec/exchange_sink_operator.cpp Outdated Show resolved Hide resolved
_buffer = buffer;
_buffer->register_sink(_fragment_instance_id);
}
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change register_exchange_buffer to set_exchange_buffer

}
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; }

InstanceLoId ins_id() const { return _fragment_instance_id.lo; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
InstanceLoId ins_id() const { return _fragment_instance_id.lo; }
InstanceLoId ins_lo_id() const { return _fragment_instance_id.lo; }

be/src/pipeline/exec/exchange_sink_buffer.cpp Show resolved Hide resolved
~ExchangeSinkBuffer() override = default;
void register_sink(TUniqueId);
void register_sink(InstanceLoId id) {
std::lock_guard lc(_init_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this lock.

void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
ExchangeSinkLocalState* local_state) {
std::lock_guard lc(_init_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to reduce locking scope

be/src/pipeline/exec/exchange_sink_buffer.cpp Show resolved Hide resolved
be/src/pipeline/exec/exchange_sink_buffer.h Show resolved Hide resolved
@Mryange Mryange force-pushed the shared-sink-buffer-dev-12-2 branch from 33eb739 to 35209a5 Compare December 4, 2024 11:23
@Mryange
Copy link
Contributor Author

Mryange commented Dec 4, 2024

run buildall

1 similar comment
@Mryange
Copy link
Contributor Author

Mryange commented Dec 4, 2024

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.47% (10008/26015)
Line Coverage: 29.51% (83893/284295)
Region Coverage: 28.62% (43125/150683)
Branch Coverage: 25.22% (21923/86942)
Coverage Report: http://coverage.selectdb-in.cc/coverage/03d7c1b4f4dbfe8f48eb0bdf014b071e53ec7d1e_03d7c1b4f4dbfe8f48eb0bdf014b071e53ec7d1e/report/index.html

@Mryange
Copy link
Contributor Author

Mryange commented Dec 4, 2024

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.45% (10005/26018)
Line Coverage: 29.50% (83864/284315)
Region Coverage: 28.61% (43108/150692)
Branch Coverage: 25.20% (21913/86946)
Coverage Report: http://coverage.selectdb-in.cc/coverage/36ad1deccaa2d9d7bd738ec1986f3cb5f9cc0ec1_36ad1deccaa2d9d7bd738ec1986f3cb5f9cc0ec1/report/index.html

@@ -317,6 +317,12 @@ class RuntimeState {

int per_fragment_instance_idx() const { return _per_fragment_instance_idx; }

const pipeline::PipelineFragmentContext* fragment_context() const { return _fragment_context; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get this from pipeline task

@Mryange Mryange force-pushed the shared-sink-buffer-dev-12-2 branch from 36ad1de to db66a83 Compare December 5, 2024 03:36
@Mryange
Copy link
Contributor Author

Mryange commented Dec 5, 2024

run buildall

@Mryange Mryange force-pushed the shared-sink-buffer-dev-12-2 branch from db66a83 to c783418 Compare December 5, 2024 03:41
@Mryange
Copy link
Contributor Author

Mryange commented Dec 5, 2024

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.47% (10008/26015)
Line Coverage: 29.52% (83922/284311)
Region Coverage: 28.62% (43122/150692)
Branch Coverage: 25.21% (21919/86948)
Coverage Report: http://coverage.selectdb-in.cc/coverage/c7834184fd82bef01d7ed9b81c6b68bce1a311ff_c7834184fd82bef01d7ed9b81c6b68bce1a311ff/report/index.html

@Mryange Mryange force-pushed the shared-sink-buffer-dev-12-2 branch from c783418 to fcb52aa Compare December 5, 2024 06:55
@Mryange
Copy link
Contributor Author

Mryange commented Dec 5, 2024

run buildall

@Mryange
Copy link
Contributor Author

Mryange commented Dec 5, 2024

run beut

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.48% (10006/26005)
Line Coverage: 29.51% (83920/284349)
Region Coverage: 28.61% (43128/150722)
Branch Coverage: 25.20% (21920/86978)
Coverage Report: http://coverage.selectdb-in.cc/coverage/fcb52aae9d02224a6f5c133c505b7ab11353e21f_fcb52aae9d02224a6f5c133c505b7ab11353e21f/report/index.html

Gabriel39
Gabriel39 previously approved these changes Dec 5, 2024
@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Dec 5, 2024
Copy link
Contributor

github-actions bot commented Dec 5, 2024

PR approved by at least one committer and no changes requested.

Copy link
Contributor

github-actions bot commented Dec 5, 2024

PR approved by anyone and no changes requested.

@Mryange
Copy link
Contributor Author

Mryange commented Dec 6, 2024

run buildall

@github-actions github-actions bot removed the approved Indicates a PR has been approved by one committer. label Dec 6, 2024
@Mryange
Copy link
Contributor Author

Mryange commented Dec 7, 2024

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.69% (10062/26006)
Line Coverage: 29.61% (84227/284438)
Region Coverage: 28.70% (43276/150771)
Branch Coverage: 25.26% (21980/87006)
Coverage Report: http://coverage.selectdb-in.cc/coverage/d02232ccfaf22044d6ecc62dbc8440861819aaa1_d02232ccfaf22044d6ecc62dbc8440861819aaa1/report/index.html

throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"ExchangeSinkOperatorX did not correctly set the child.");
}
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better if (_state->enable_shared_exchange_sink_buffer() && ! std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) && !std::dynamic_pointer_cast<SortSourceOperatorX>(_child))

And add enough comment for the code


// _running_sink_count is used to track how many sinks have not finished yet.
// It is only decremented when eos is reached.
phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count;
Copy link
Contributor

@HappenLee HappenLee Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rethink should we union the maps. seems all maps same key?

@@ -271,14 +285,16 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!s.ok()) {
_failed(id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should union the failed check?

@Mryange
Copy link
Contributor Author

Mryange commented Dec 8, 2024

run buildall

@doris-robot
Copy link

TeamCity be ut coverage result:
Function Coverage: 38.69% (10061/26006)
Line Coverage: 29.61% (84210/284437)
Region Coverage: 28.69% (43265/150777)
Branch Coverage: 25.25% (21973/87010)
Coverage Report: http://coverage.selectdb-in.cc/coverage/682969d3e6e502c8f35e63bfa1c23cc639d800e8_682969d3e6e502c8f35e63bfa1c23cc639d800e8/report/index.html

Copy link
Contributor

github-actions bot commented Dec 9, 2024

PR approved by at least one committer and no changes requested.

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Dec 9, 2024
@BiteTheDDDDt BiteTheDDDDt merged commit 2bc011f into apache:master Dec 9, 2024
23 of 25 checks passed
Mryange added a commit to Mryange/doris that referenced this pull request Jan 10, 2025
…concurrency. (apache#44850)

In the past, each exchange sink had its own sink buffer.
If the query concurrency is n, there would be n * n RPCs running
concurrently
in a typical shuffle scenario (each sender instance can send data to all
downstream instances).
Here, we introduce support for shared sink buffers.
This does not reduce the total number of RPCs but can limit the number
of concurrent RPCs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by one committer. reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants