Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] support phased schedule #47868

Merged
merged 1 commit into from
Jul 25, 2024

Conversation

stdpain
Copy link
Contributor

@stdpain stdpain commented Jul 4, 2024

Why I'm doing:

For particularly complex queries, the tried schedule approach will instantly consume a lot of cpu and memory, resulting in a very easy OOM.
An example is a union all 100 AGG, each AGG needs to consume about 100M memory, then this query at least 10G memory.

These 10 queries are concurrently sent down to the BE will cause the cpu of the BE to be overloaded and the performance will be degraded instead. If we limit N fragments to be sent down at a time, then it will reduce a lot of memory.

What I'm doing:

To solve above problem. we support a fixed phased scheduler

      │     
      │     
      ▼     
      A     
      │     
      │     
┌─────┴────┐
▼     ▼    ▼
D     B    C

Tired: schedule A, then schedule B,C,D
phased: schedule A, schedule C, when C finished shedule B...

select count(lo_orderkey),count(sp), count(k) from (
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder partition (p1) group by lo_orderkey 
    union all 
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k2' k from lineorder partition (p1) group by lo_orderkey 
    union all 
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k3' k from lineorder partition (p1) group by lo_orderkey 
    union all
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k4' k from lineorder partition (p1) group by lo_orderkey 
) tb;

for phased max concurrency = 1

dop=4 Memory Time cost
single AGG 1.690 GB 2.00
Tiered schedule: 6.822 GB 2.51s
Phased schedule: 2.630 GB 8.54

for 100 concurrency test union all 100 small AGG

select count(lo_orderkey),count(sp), count(k) from (
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder tablet (11068) group by lo_orderkey 
    union all
    ......
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder tablet (11068) group by lo_orderkey 
) tb;
Time
Tiered schedule: 43.442
Phased schedule: 22.918

for 200 concurrency

Time
Tiered schedule: OOM
Phased schedule: 36.402

todo list

support adaptive phased schedule

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

// params.fragment_instance_id = fragment_instance_id();
const auto& fe_addr = state->fragment_ctx()->fe_addr();

class RpcRunnable : public Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to move RpcRunnable outerside of this function, or use a lambda instead?
although it is a legal syntax, however, it looks some weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will refator here after #48233 merged

final int fid = captureVersionFragmentId.asInt();
// Just get a tuple id.
final ArrayList<TupleId> tupleIds = fragments.get(0).getPlanFragment().getPlanRoot().getTupleIds();
PlanNode dummyNode = new CaptureVersionNode(PlanNodeId.DUMMY_PLAN_NODE_ID, tupleIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put the CaptureVersionNode->BlockHoleTableSink's construction work into the PlanFragmentBuilder?

Visible versions of all involved tablets of the OlapScanNodes would be captured at first via this Fragment CaptureVersionNode->BlockHoleTableSink?

Do there exist optimized scenarios that some of OlapScanNodes should be captured, while some of them should not be captured?

For an example, the left side of left-semi/anti-join seems should capture the versions, while the right side need not? if we put CaptureVersionNode->BlockHoleTableSink's construction into PlanFragmentBuilder, we can control version capturing highly precisely, we can ignore this PlanFragment in explain's output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put the CaptureVersionNode->BlockHoleTableSink's construction work into the PlanFragmentBuilder?

I will try it.

Visible versions of all involved tablets of the OlapScanNodes would be captured at first via this Fragment
CaptureVersionNode->BlockHoleTableSink?

yes

Do there exist optimized scenarios that some of OlapScanNodes should be captured, while some of them should not be captured?

The first scan node to be delivered does not have to capture the version.
In the all-at-once implementation we assume that the scan range for delivering a batch is fast.
However, a simple join build side of a phased schedule can be very slow, and the probe side must capture the rowset version. A possible optimization would be to use the tablets involved in the undelivered scan nodes after more than N seconds if the delivery is not finished. One possible optimization is to capture the tablet involved in the undelivered scan node after more than N seconds if the delivery is not finished.

For an example, the left side of left-semi/anti-join seems should capture the versions, while the right side need not? if we put CaptureVersionNode->BlockHoleTableSink's construction into PlanFragmentBuilder, we can control version capturing highly precisely, we can ignore this PlanFragment in explain's output.

The planfragment Builder stage doesn't yet perceive which fragments are the first to be delivered.

Copy link

sonarcloud bot commented Jul 17, 2024

Copy link

[FE Incremental Coverage Report]

pass : 434 / 484 (89.67%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/planner/ScanNode.java 0 1 00.00% [94]
🔵 com/starrocks/qe/scheduler/Coordinator.java 0 1 00.00% [109]
🔵 com/starrocks/qe/scheduler/dag/ExecutionDAG.java 13 34 38.24% [557, 558, 559, 562, 563, 564, 565, 566, 568, 570, 571, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582]
🔵 com/starrocks/qe/QeProcessorImpl.java 7 11 63.64% [298, 299, 300, 301]
🔵 com/starrocks/qe/DefaultCoordinator.java 21 28 75.00% [543, 544, 545, 546, 909, 927, 928]
🔵 com/starrocks/qe/scheduler/dag/FragmentInstance.java 5 6 83.33% [138]
🔵 com/starrocks/qe/scheduler/dag/FragmentSequence.java 41 48 85.42% [72, 120, 121, 122, 123, 125, 126]
🔵 com/starrocks/common/util/RuntimeProfile.java 7 8 87.50% [455]
🔵 com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java 9 10 90.00% [45]
🔵 com/starrocks/qe/scheduler/QueryRuntimeProfile.java 16 17 94.12% [230]
🔵 com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java 216 221 97.74% [214, 245, 293, 294, 438]
🔵 com/starrocks/qe/scheduler/TFragmentInstanceFactory.java 1 1 100.00% []
🔵 com/starrocks/qe/SessionVariable.java 11 11 100.00% []
🔵 com/starrocks/service/FrontendServiceImpl.java 1 1 100.00% []
🔵 com/starrocks/qe/scheduler/Deployer.java 8 8 100.00% []
🔵 com/starrocks/planner/OlapScanNode.java 1 1 100.00% []
🔵 com/starrocks/sql/plan/PlanFragmentBuilder.java 5 5 100.00% []
🔵 com/starrocks/qe/scheduler/slot/DeployState.java 4 4 100.00% []
🔵 com/starrocks/planner/PlanNodeId.java 2 2 100.00% []
🔵 com/starrocks/qe/scheduler/dag/ExecutionFragment.java 8 8 100.00% []
🔵 com/starrocks/scheduler/mv/MVMaintenanceJob.java 1 1 100.00% []
🔵 com/starrocks/planner/CaptureVersionNode.java 5 5 100.00% []
🔵 com/starrocks/qe/StmtExecutor.java 1 1 100.00% []
🔵 com/starrocks/qe/CoordinatorPreprocessor.java 4 4 100.00% []
🔵 com/starrocks/qe/scheduler/dag/CaptureVersionFragmentBuilder.java 47 47 100.00% []

Copy link

[BE Incremental Coverage Report]

pass : 118 / 128 (92.19%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 be/src/exec/pipeline/query_context.h 3 7 42.86% [67, 68, 69, 70]
🔵 be/src/exec/capture_version_node.h 1 2 50.00% [25]
🔵 be/src/service/internal_service.cpp 7 10 70.00% [515, 540, 545]
🔵 be/src/exec/pipeline/query_context.cpp 3 4 75.00% [372]
🔵 be/src/exec/pipeline/fragment_executor.cpp 12 13 92.31% [108]
🔵 be/src/exec/pipeline/scan/olap_scan_context.cpp 6 6 100.00% []
🔵 be/src/exec/capture_version_node.cpp 8 8 100.00% []
🔵 be/src/exec/pipeline/scan/olap_scan_context.h 1 1 100.00% []
🔵 be/src/exec/pipeline/fragment_context.cpp 17 17 100.00% []
🔵 be/src/exec/pipeline/capture_version_operator.h 12 12 100.00% []
🔵 be/src/exec/exec_node.cpp 3 3 100.00% []
🔵 be/src/exec/pipeline/fragment_context.h 1 1 100.00% []
🔵 be/src/exec/olap_scan_node.cpp 11 11 100.00% []
🔵 be/src/storage/rowset/rowset.h 15 15 100.00% []
🔵 be/src/exec/pipeline/capture_version_operator.cpp 18 18 100.00% []

Copy link
Contributor

@wyb wyb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to support cloud native table?

@stdpain
Copy link
Contributor Author

stdpain commented Jul 25, 2024

also need to support cloud native table?

cloud table has been supported.

@stdpain stdpain merged commit d7ad29e into StarRocks:main Jul 25, 2024
74 of 78 checks passed
dujijun007 pushed a commit to dujijun007/starrocks that referenced this pull request Jul 29, 2024
stdpain added a commit to stdpain/starrocks-2 that referenced this pull request Aug 30, 2024
introduced by StarRocks#47868

```
*** Aborted at 1724997275 (unix time) try "date -d @1724997275" if you are using GNU date ***
PC: @          0x5debccb starrocks::pipeline::QueryContextManager::_clean_slot_unlocked(unsigned long, std::vector<std::shared_ptr<starrocks::pipeline::QueryContext>, std::allocator<std::shared_ptr<starrocks::pipeline::QueryContext> > >&)
*** SIGSEGV (@0x68) received by PID 6669 (TID 0x7f1959fe6640) from PID 104; stack trace: ***
    @     0x7f19793aaf68 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x99f67)
    @          0xefd0d19 google::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*)
    @     0x7f197a3ae916 os::Linux::chained_handler(int, siginfo_t*, void*)
    @     0x7f197a3b460b JVM_handle_linux_signal
    @     0x7f197a3a746c signalHandler(int, siginfo_t*, void*)
    @     0x7f1979353520 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x4251f)
    @          0x5debccb starrocks::pipeline::QueryContextManager::_clean_slot_unlocked(unsigned long, std::vector<std::shared_ptr<starrocks::pipeline::QueryContext>, std::allocator<std::shared_ptr<starrocks::pipeline::QueryContext> > >&)
    @          0x5debf27 starrocks::pipeline::QueryContextManager::_clean_query_contexts()
    @          0x5dec191 starrocks::pipeline::QueryContextManager::_clean_func(starrocks::pipeline::QueryContextManager*)
    @          0x5df245b std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(starrocks::pipeline::QueryContextManager*), starrocks::pipeline::QueryContextManager*> > >::_M_run()
    @         0x115c1d44 execute_native_thread_routine
    @     0x7f19793a5b43 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x94b42)
    @     0x7f1979437a00 (/usr/lib/x86_64-linux-gnu/libc.so.6 (deleted)+0x1269ff)
```

Signed-off-by: stdpain <[email protected]>
@silverbullet233
Copy link
Contributor

@mergify backport branch-3.3

Copy link
Contributor

mergify bot commented Sep 14, 2024

backport branch-3.3

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request Sep 14, 2024
Signed-off-by: stdpain <[email protected]>
(cherry picked from commit d7ad29e)

# Conflicts:
#	be/src/exec/pipeline/scan/olap_scan_context.cpp
#	fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java
wanpengfei-git pushed a commit that referenced this pull request Sep 23, 2024
Signed-off-by: silverbullet233 <[email protected]>
Co-authored-by: stdpain <[email protected]>
Co-authored-by: silverbullet233 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants