-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] support phased schedule #47868
Conversation
318bbaa
to
461bd37
Compare
787b184
to
fe018a7
Compare
// params.fragment_instance_id = fragment_instance_id(); | ||
const auto& fe_addr = state->fragment_ctx()->fe_addr(); | ||
|
||
class RpcRunnable : public Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest to move RpcRunnable outerside of this function, or use a lambda instead?
although it is a legal syntax, however, it looks some weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will refator here after #48233 merged
final int fid = captureVersionFragmentId.asInt(); | ||
// Just get a tuple id. | ||
final ArrayList<TupleId> tupleIds = fragments.get(0).getPlanFragment().getPlanRoot().getTupleIds(); | ||
PlanNode dummyNode = new CaptureVersionNode(PlanNodeId.DUMMY_PLAN_NODE_ID, tupleIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put the CaptureVersionNode->BlockHoleTableSink's construction work into the PlanFragmentBuilder?
Visible versions of all involved tablets of the OlapScanNodes would be captured at first via this Fragment CaptureVersionNode->BlockHoleTableSink?
Do there exist optimized scenarios that some of OlapScanNodes should be captured, while some of them should not be captured?
For an example, the left side of left-semi/anti-join seems should capture the versions, while the right side need not? if we put CaptureVersionNode->BlockHoleTableSink's construction into PlanFragmentBuilder, we can control version capturing highly precisely, we can ignore this PlanFragment in explain's output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put the CaptureVersionNode->BlockHoleTableSink's construction work into the PlanFragmentBuilder?
I will try it.
Visible versions of all involved tablets of the OlapScanNodes would be captured at first via this Fragment
CaptureVersionNode->BlockHoleTableSink?
yes
Do there exist optimized scenarios that some of OlapScanNodes should be captured, while some of them should not be captured?
The first scan node to be delivered does not have to capture the version.
In the all-at-once implementation we assume that the scan range for delivering a batch is fast.
However, a simple join build side of a phased schedule can be very slow, and the probe side must capture the rowset version. A possible optimization would be to use the tablets involved in the undelivered scan nodes after more than N seconds if the delivery is not finished. One possible optimization is to capture the tablet involved in the undelivered scan node after more than N seconds if the delivery is not finished.
For an example, the left side of left-semi/anti-join seems should capture the versions, while the right side need not? if we put CaptureVersionNode->BlockHoleTableSink's construction into PlanFragmentBuilder, we can control version capturing highly precisely, we can ignore this PlanFragment in explain's output.
The planfragment Builder stage doesn't yet perceive which fragments are the first to be delivered.
Signed-off-by: stdpain <[email protected]>
fe018a7
to
cb8113c
Compare
Quality Gate passedIssues Measures |
[FE Incremental Coverage Report]✅ pass : 434 / 484 (89.67%) file detail
|
[BE Incremental Coverage Report]✅ pass : 118 / 128 (92.19%) file detail
|
fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentSequence.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also need to support cloud native table?
cloud table has been supported. |
Signed-off-by: stdpain <[email protected]>
introduced by StarRocks#47868 ``` *** Aborted at 1724997275 (unix time) try "date -d @1724997275" if you are using GNU date *** PC: @ 0x5debccb starrocks::pipeline::QueryContextManager::_clean_slot_unlocked(unsigned long, std::vector<std::shared_ptr<starrocks::pipeline::QueryContext>, std::allocator<std::shared_ptr<starrocks::pipeline::QueryContext> > >&) *** SIGSEGV (@0x68) received by PID 6669 (TID 0x7f1959fe6640) from PID 104; stack trace: *** @ 0x7f19793aaf68 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x99f67) @ 0xefd0d19 google::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) @ 0x7f197a3ae916 os::Linux::chained_handler(int, siginfo_t*, void*) @ 0x7f197a3b460b JVM_handle_linux_signal @ 0x7f197a3a746c signalHandler(int, siginfo_t*, void*) @ 0x7f1979353520 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x4251f) @ 0x5debccb starrocks::pipeline::QueryContextManager::_clean_slot_unlocked(unsigned long, std::vector<std::shared_ptr<starrocks::pipeline::QueryContext>, std::allocator<std::shared_ptr<starrocks::pipeline::QueryContext> > >&) @ 0x5debf27 starrocks::pipeline::QueryContextManager::_clean_query_contexts() @ 0x5dec191 starrocks::pipeline::QueryContextManager::_clean_func(starrocks::pipeline::QueryContextManager*) @ 0x5df245b std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(starrocks::pipeline::QueryContextManager*), starrocks::pipeline::QueryContextManager*> > >::_M_run() @ 0x115c1d44 execute_native_thread_routine @ 0x7f19793a5b43 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x94b42) @ 0x7f1979437a00 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x1269ff) ``` Signed-off-by: stdpain <[email protected]>
@mergify backport branch-3.3 |
✅ Backports have been created
|
Signed-off-by: stdpain <[email protected]> (cherry picked from commit d7ad29e) # Conflicts: # be/src/exec/pipeline/scan/olap_scan_context.cpp # fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java
Signed-off-by: silverbullet233 <[email protected]> Co-authored-by: stdpain <[email protected]> Co-authored-by: silverbullet233 <[email protected]>
Why I'm doing:
For particularly complex queries, the tried schedule approach will instantly consume a lot of cpu and memory, resulting in a very easy OOM.
An example is a union all 100 AGG, each AGG needs to consume about 100M memory, then this query at least 10G memory.
These 10 queries are concurrently sent down to the BE will cause the cpu of the BE to be overloaded and the performance will be degraded instead. If we limit N fragments to be sent down at a time, then it will reduce a lot of memory.
What I'm doing:
To solve above problem. we support a fixed phased scheduler
Tired: schedule A, then schedule B,C,D
phased: schedule A, schedule C, when C finished shedule B...
for phased max concurrency = 1
for 100 concurrency test union all 100 small AGG
for 200 concurrency
todo list
support adaptive phased schedule
Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: