-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Cascaded spill merge and re-spill #15610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
2010YOUY01
wants to merge
9
commits into
apache:main
Choose a base branch
from
2010YOUY01:cascade-merge-spill
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
ab6a3a3
Cascaded spill merge and re-spill
2010YOUY01 64288ec
typo
2010YOUY01 7abef7a
Review: better naming
2010YOUY01 0cd98d3
Reviews for naming, comment, and cleanup prints
2010YOUY01 348bbf5
Include a configuration option for max merge degree
2010YOUY01 6dbd7b0
Tests
2010YOUY01 76bbd0c
Merge branch 'main' into cascade-merge-spill
2010YOUY01 c367886
update submodule parquet-testing
2010YOUY01 1895b0e
CI
2010YOUY01 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ use datafusion_physical_plan::collect as collect_batches; | |
use datafusion_physical_plan::common::collect; | ||
use datafusion_physical_plan::spill::get_record_batch_memory_size; | ||
use rand::Rng; | ||
use rstest::rstest; | ||
use test_utils::AccessLogGenerator; | ||
|
||
use async_trait::async_trait; | ||
|
@@ -615,6 +616,104 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> { | |
Ok(()) | ||
} | ||
|
||
// Test configuration `sort_max_spill_merge_degree` in external sorting | ||
// ------------------------------------------------------------------- | ||
|
||
// Ensure invalid config value of `sort_max_spill_merge_degree` returns error | ||
#[rstest] | ||
#[case(0)] | ||
#[case(1)] | ||
#[tokio::test] | ||
async fn test_invalid_sort_max_spill_merge_degree( | ||
#[case] sort_max_spill_merge_degree: usize, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this #[case] syntax looks so elegant for writing repetitive tests |
||
) -> Result<()> { | ||
let config = SessionConfig::new() | ||
.with_sort_max_spill_merge_degree(sort_max_spill_merge_degree); | ||
let runtime = RuntimeEnvBuilder::new() | ||
.with_memory_pool(Arc::new(FairSpillPool::new(20 * 1024 * 1024))) | ||
.build_arc() | ||
.unwrap(); | ||
let ctx = SessionContext::new_with_config_rt(config, runtime); | ||
let df = ctx | ||
.sql("select * from generate_series(1, 10000000) as t1(v1) order by v1") | ||
.await | ||
.unwrap(); | ||
|
||
let err = df.collect().await.unwrap_err(); | ||
assert_contains!( | ||
err.to_string(), | ||
"sort_max_spill_merge_degree must be >= 2 in order to continue external sorting" | ||
); | ||
Ok(()) | ||
} | ||
|
||
// Create a `SessionContext` with a 1MB memory pool, and provided max merge degree. | ||
// | ||
// In order to let test case run faster and efficient, a memory pool with 1MB is used. | ||
// To let queries succeed under such a small memory limit, related configs should be | ||
// changed as follows. | ||
fn create_ctx_with_1mb_mem_pool( | ||
sort_max_spill_merge_degree: usize, | ||
) -> Result<SessionContext> { | ||
let config = SessionConfig::new() | ||
.with_sort_max_spill_merge_degree(sort_max_spill_merge_degree) | ||
.with_sort_spill_reservation_bytes(64 * 1024) // 64KB | ||
.with_sort_in_place_threshold_bytes(0) | ||
.with_batch_size(128) // To reduce test memory usage | ||
.with_target_partitions(1); | ||
|
||
let runtime = RuntimeEnvBuilder::new() | ||
.with_memory_pool(Arc::new(FairSpillPool::new(1024 * 1024))) // 1MB memory limit | ||
.build_arc() | ||
.unwrap(); | ||
|
||
let ctx = SessionContext::new_with_config_rt(config, runtime); | ||
|
||
Ok(ctx) | ||
} | ||
|
||
// Test different values of `sort_max_spill_merge_degree`. | ||
// The test setup and query will spill over 10 temporary files, the max merge degree is | ||
// varied to cover different number of passes in multi-pass spill merge. | ||
#[rstest] | ||
#[case(2)] | ||
#[case(3)] | ||
#[case(4)] | ||
#[case(7)] | ||
#[case(32)] | ||
#[case(310104)] | ||
#[tokio::test] | ||
async fn test_fuzz_sort_max_spill_merge_degree( | ||
#[case] sort_max_spill_merge_degree: usize, | ||
) -> Result<()> { | ||
let ctx = create_ctx_with_1mb_mem_pool(sort_max_spill_merge_degree)?; | ||
|
||
let dataset_size = 1000000; | ||
let sql = format!( | ||
"select * from generate_series(1, {}) as t1(v1) order by v1", | ||
dataset_size | ||
); | ||
let df = ctx.sql(&sql).await.unwrap(); | ||
|
||
let plan = df.create_physical_plan().await.unwrap(); | ||
|
||
let task_ctx = ctx.task_ctx(); | ||
|
||
// Ensure the query succeeds | ||
let batches = collect_batches(Arc::clone(&plan), task_ctx) | ||
.await | ||
.expect("Query execution failed"); | ||
|
||
// Quick check. More extensive tests will be covered in sort fuzz tests. | ||
let result_size = batches.iter().map(|b| b.num_rows()).sum::<usize>(); | ||
assert_eq!(result_size, dataset_size); | ||
|
||
let spill_count = plan.metrics().unwrap().spill_count().unwrap(); | ||
assert!(spill_count > 10); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Run the query with the specified memory limit, | ||
/// and verifies the expected errors are returned | ||
#[derive(Clone, Debug)] | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a concern about this that there can still be memory issue, if the batch from each stream together is above the memory limit
I have an implementation for this that is completely memory safe and will try to create a PR for that for inspiration
The way to decide on the degree is actually by storing for each spill file the largest amount of memory a single record batch taken, and then when deciding on the degree, you simply grow until you can no longer.
The reason why I'm picky about this is that it is a new configuration that will be hard to deprecate or change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@2010YOUY01 and @alamb I hope before you merge this PR to look at #15700 to see what I mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a solid point, this option is intended to be manually set, and it has to ensure
(max_batch_size * per_partition_merge_degree * partition_count) < total_memory_limit
. If it's set correctly for a query, then the query should succeed.The problem is the ever-growing number of configurations in DataFusion, and it seems impossible to set them all correctly. Enabling parallel merging optimization would require introducing yet another configuration, I'm also trying to avoid that (though too-many-configs problem might be a harsh reality we must accept).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm keeping open my alternative approach as it seems working in limited memory envs as well (tested it locally with more data)