Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] support phased schedule (backport #47868) #51033

Merged
merged 2 commits into from
Sep 23, 2024

Conversation

mergify[bot]
Copy link
Contributor

@mergify mergify bot commented Sep 14, 2024

Why I'm doing:

For particularly complex queries, the tried schedule approach will instantly consume a lot of cpu and memory, resulting in a very easy OOM.
An example is a union all 100 AGG, each AGG needs to consume about 100M memory, then this query at least 10G memory.

These 10 queries are concurrently sent down to the BE will cause the cpu of the BE to be overloaded and the performance will be degraded instead. If we limit N fragments to be sent down at a time, then it will reduce a lot of memory.

What I'm doing:

To solve above problem. we support a fixed phased scheduler

      │     
      │     
      ▼     
      A     
      │     
      │     
┌─────┴────┐
▼     ▼    ▼
D     B    C

Tired: schedule A, then schedule B,C,D
phased: schedule A, schedule C, when C finished shedule B...

select count(lo_orderkey),count(sp), count(k) from (
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder partition (p1) group by lo_orderkey 
    union all 
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k2' k from lineorder partition (p1) group by lo_orderkey 
    union all 
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k3' k from lineorder partition (p1) group by lo_orderkey 
    union all
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k4' k from lineorder partition (p1) group by lo_orderkey 
) tb;

for phased max concurrency = 1

dop=4 Memory Time cost
single AGG 1.690 GB 2.00
Tiered schedule: 6.822 GB 2.51s
Phased schedule: 2.630 GB 8.54

for 100 concurrency test union all 100 small AGG

select count(lo_orderkey),count(sp), count(k) from (
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder tablet (11068) group by lo_orderkey 
    union all
    ......
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder tablet (11068) group by lo_orderkey 
) tb;
Time
Tiered schedule: 43.442
Phased schedule: 22.918

for 200 concurrency

Time
Tiered schedule: OOM
Phased schedule: 36.402

todo list

support adaptive phased schedule

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

This is an automatic backport of pull request #47868 done by [Mergify](https://mergify.com). ## Why I'm doing:

For particularly complex queries, the tried schedule approach will instantly consume a lot of cpu and memory, resulting in a very easy OOM.
An example is a union all 100 AGG, each AGG needs to consume about 100M memory, then this query at least 10G memory.

These 10 queries are concurrently sent down to the BE will cause the cpu of the BE to be overloaded and the performance will be degraded instead. If we limit N fragments to be sent down at a time, then it will reduce a lot of memory.

What I'm doing:

To solve above problem. we support a fixed phased scheduler

      │     
      │     
      ▼     
      A     
      │     
      │     
┌─────┴────┐
▼     ▼    ▼
D     B    C

Tired: schedule A, then schedule B,C,D
phased: schedule A, schedule C, when C finished shedule B...

select count(lo_orderkey),count(sp), count(k) from (
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder partition (p1) group by lo_orderkey 
    union all 
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k2' k from lineorder partition (p1) group by lo_orderkey 
    union all 
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k3' k from lineorder partition (p1) group by lo_orderkey 
    union all
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k4' k from lineorder partition (p1) group by lo_orderkey 
) tb;

for phased max concurrency = 1

dop=4 Memory Time cost
single AGG 1.690 GB 2.00
Tiered schedule: 6.822 GB 2.51s
Phased schedule: 2.630 GB 8.54

for 100 concurrency test union all 100 small AGG

select count(lo_orderkey),count(sp), count(k) from (
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder tablet (11068) group by lo_orderkey 
    union all
    ......
    select lo_orderkey, sum(lo_ordtotalprice) sp, 'k1' k from lineorder tablet (11068) group by lo_orderkey 
) tb;
Time
Tiered schedule: 43.442
Phased schedule: 22.918

for 200 concurrency

Time
Tiered schedule: OOM
Phased schedule: 36.402

todo list

support adaptive phased schedule

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Signed-off-by: stdpain <[email protected]>
(cherry picked from commit d7ad29e)

# Conflicts:
#	be/src/exec/pipeline/scan/olap_scan_context.cpp
#	fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java
@mergify mergify bot added the conflicts label Sep 14, 2024
Copy link
Contributor Author

mergify bot commented Sep 14, 2024

Cherry-pick of d7ad29e has failed:

On branch mergify/bp/branch-3.3/pr-47868
Your branch is up to date with 'origin/branch-3.3'.

You are currently cherry-picking commit d7ad29e758.
  (fix conflicts and run "git cherry-pick --continue")
  (use "git cherry-pick --skip" to skip this patch)
  (use "git cherry-pick --abort" to cancel the cherry-pick operation)

Changes to be committed:
	modified:   be/src/exec/CMakeLists.txt
	new file:   be/src/exec/capture_version_node.cpp
	new file:   be/src/exec/capture_version_node.h
	modified:   be/src/exec/exec_node.cpp
	modified:   be/src/exec/olap_scan_node.cpp
	modified:   be/src/exec/olap_scan_node.h
	new file:   be/src/exec/pipeline/capture_version_operator.cpp
	new file:   be/src/exec/pipeline/capture_version_operator.h
	modified:   be/src/exec/pipeline/fragment_context.cpp
	modified:   be/src/exec/pipeline/fragment_context.h
	modified:   be/src/exec/pipeline/fragment_executor.cpp
	modified:   be/src/exec/pipeline/query_context.cpp
	modified:   be/src/exec/pipeline/query_context.h
	modified:   be/src/exec/pipeline/scan/olap_scan_context.h
	modified:   be/src/service/internal_service.cpp
	modified:   be/src/storage/rowset/rowset.h
	modified:   fe/fe-core/src/main/java/com/starrocks/common/util/RuntimeProfile.java
	new file:   fe/fe-core/src/main/java/com/starrocks/planner/CaptureVersionNode.java
	modified:   fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
	modified:   fe/fe-core/src/main/java/com/starrocks/planner/PlanNodeId.java
	modified:   fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/QeProcessor.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/TFragmentInstanceFactory.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/CaptureVersionFragmentBuilder.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionDAG.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionFragment.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionSchedule.java
	modified:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstance.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentSequence.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ScheduleNextTurnRunner.java
	new file:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/slot/DeployState.java
	modified:   fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java
	modified:   fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
	modified:   fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java
	modified:   fe/fe-core/src/test/java/com/starrocks/common/util/ProfilingExecPlanTest.java
	new file:   fe/fe-core/src/test/java/com/starrocks/qe/scheduler/PhasedScheduleTest.java
	modified:   fe/fe-core/src/test/java/com/starrocks/qe/scheduler/QueryRuntimeProfileTest.java
	modified:   gensrc/thrift/FrontendService.thrift
	modified:   gensrc/thrift/InternalService.thrift
	modified:   gensrc/thrift/PlanNodes.thrift
	new file:   test/sql/test_phased_schedule/R/test_phased_schedule
	new file:   test/sql/test_phased_schedule/T/test_phased_schedule

Unmerged paths:
  (use "git add <file>..." to mark resolution)
	both modified:   be/src/exec/pipeline/scan/olap_scan_context.cpp
	both modified:   fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java

To fix up this pull request, you can check it out locally. See documentation: https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/checking-out-pull-requests-locally

@mergify mergify bot mentioned this pull request Sep 14, 2024
24 tasks
@mergify mergify bot closed this Sep 14, 2024
Copy link
Contributor Author

mergify bot commented Sep 14, 2024

@mergify[bot]: Backport conflict, please reslove the conflict and resubmit the pr

@mergify mergify bot deleted the mergify/bp/branch-3.3/pr-47868 branch September 14, 2024 02:20
@silverbullet233 silverbullet233 restored the mergify/bp/branch-3.3/pr-47868 branch September 14, 2024 02:36
@wanpengfei-git wanpengfei-git enabled auto-merge (squash) September 14, 2024 02:43
Signed-off-by: silverbullet233 <[email protected]>
Copy link

sonarcloud bot commented Sep 20, 2024

@wanpengfei-git wanpengfei-git merged commit 703feb2 into branch-3.3 Sep 23, 2024
30 of 32 checks passed
@wanpengfei-git wanpengfei-git deleted the mergify/bp/branch-3.3/pr-47868 branch September 23, 2024 01:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants