Skip to content

Cascaded spill merge and re-spill #15610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ config_namespace! {
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// When doing external sorting, the maximum number of spilled files to
/// read back at once. Those read files in the same merge step will be sort-
/// preserving-merged and re-spilled, and the step will be repeated to reduce
/// the number of spilled files in multiple passes, until a final sorted run
/// can be produced.
pub sort_max_spill_merge_degree: usize, default = 16
Copy link
Contributor

@rluvaton rluvaton Apr 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern about this that there can still be memory issue, if the batch from each stream together is above the memory limit

I have an implementation for this that is completely memory safe and will try to create a PR for that for inspiration

The way to decide on the degree is actually by storing for each spill file the largest amount of memory a single record batch taken, and then when deciding on the degree, you simply grow until you can no longer.

The reason why I'm picky about this is that it is a new configuration that will be hard to deprecate or change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2010YOUY01 and @alamb I hope before you merge this PR to look at #15700 to see what I mean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I'm picky about this is that it is a new configuration that will be hard to deprecate or change

This is a solid point, this option is intended to be manually set, and it has to ensure (max_batch_size * per_partition_merge_degree * partition_count) < total_memory_limit. If it's set correctly for a query, then the query should succeed.
The problem is the ever-growing number of configurations in DataFusion, and it seems impossible to set them all correctly. Enabling parallel merging optimization would require introducing yet another configuration, I'm also trying to avoid that (though too-many-configs problem might be a harsh reality we must accept).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm keeping open my alternative approach as it seems working in limited memory envs as well (tested it locally with more data)


/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

Expand Down
121 changes: 77 additions & 44 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Fuzz Test for various corner cases sorting RecordBatches exceeds available memory and should spill

use std::sync::Arc;
use std::{num::NonZeroUsize, sync::Arc};

use arrow::{
array::{as_string_array, ArrayRef, Int32Array, StringArray},
Expand All @@ -31,7 +31,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::cast::as_int32_array;
use datafusion_execution::memory_pool::GreedyMemoryPool;
use datafusion_execution::memory_pool::{FairSpillPool, TrackConsumersPool};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

Expand All @@ -43,17 +43,25 @@ const KB: usize = 1 << 10;
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_10k_mem() {
for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] {
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(10 * KB)
.with_should_spill(should_spill)
.run()
.await;
for sort_max_spill_merge_degree in [2, 5, 20, 1024] {
if batch_size > 20000 && sort_max_spill_merge_degree < 16 {
// takes too long to complete, skip it
continue;
}

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(10 * KB)
.with_should_spill(should_spill)
.with_sort_max_spill_merge_degree(sort_max_spill_merge_degree)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}
}

Expand Down Expand Up @@ -118,39 +126,43 @@ async fn test_sort_strings_100k_mem() {
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_multi_columns_100k_mem() {
fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> {
let mut rows: Vec<_> = Vec::new();
let i32_array = as_int32_array(b.column(0)).unwrap();
let string_array = as_string_array(b.column(1));
for i in 0..b.num_rows() {
let str = string_array.value(i).to_string();
let i32 = i32_array.value(i);
rows.push((i32, str));
}
rows
}

for (batch_size, should_spill) in
[(5, false), (1000, false), (10000, true), (20000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_utf8_batches(batch_size)
.with_sort_columns(vec!["x", "y"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> {
let mut rows: Vec<_> = Vec::new();
let i32_array = as_int32_array(b.column(0)).unwrap();
let string_array = as_string_array(b.column(1));
for i in 0..b.num_rows() {
let str = string_array.value(i).to_string();
let i32 = i32_array.value(i);
rows.push((i32, str));
}
rows
for sort_max_spill_merge_degree in [2, 5, 2048] {
let (input, collected) = SortTest::new()
.with_int32_utf8_batches(batch_size)
.with_sort_columns(vec!["x", "y"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.with_sort_max_spill_merge_degree(sort_max_spill_merge_degree)
.run()
.await;

let mut input = input
.iter()
.flat_map(|p| p.iter())
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
input.sort_unstable();
let actual = collected
.iter()
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
assert_eq!(input, actual);
}
let mut input = input
.iter()
.flat_map(|p| p.iter())
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
input.sort_unstable();
let actual = collected
.iter()
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
assert_eq!(input, actual);
}
}

Expand Down Expand Up @@ -180,11 +192,17 @@ struct SortTest {
pool_size: Option<usize>,
/// If true, expect the sort to spill
should_spill: bool,
/// Configuration `ExecutionOptions::sort_max_spill_merge_degree` to be used
/// in the test case run
sort_max_spill_merge_degree: usize,
}

impl SortTest {
fn new() -> Self {
Default::default()
Self {
sort_max_spill_merge_degree: 16, // Default::default() will be 1, which is invalid for this config
..Default::default()
}
}

fn with_sort_columns(mut self, sort_columns: Vec<&str>) -> Self {
Expand Down Expand Up @@ -221,6 +239,14 @@ impl SortTest {
self
}

fn with_sort_max_spill_merge_degree(
mut self,
sort_max_spill_merge_degree: usize,
) -> Self {
self.sort_max_spill_merge_degree = sort_max_spill_merge_degree;
self
}

/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) -> (Vec<Vec<RecordBatch>>, Vec<RecordBatch>) {
Expand Down Expand Up @@ -248,7 +274,8 @@ impl SortTest {
let exec = MemorySourceConfig::try_new_exec(&input, schema, None).unwrap();
let sort = Arc::new(SortExec::new(sort_ordering, exec));

let session_config = SessionConfig::new();
let session_config = SessionConfig::new()
.with_sort_max_spill_merge_degree(self.sort_max_spill_merge_degree);
let session_ctx = if let Some(pool_size) = self.pool_size {
// Make sure there is enough space for the initial spill
// reservation
Expand All @@ -258,9 +285,15 @@ impl SortTest {
.execution
.sort_spill_reservation_bytes,
);
println!("Pool size: {}", pool_size);

let inner_pool = FairSpillPool::new(pool_size);
let pool = Arc::new(TrackConsumersPool::new(
inner_pool,
NonZeroUsize::new(5).unwrap(),
));
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.with_memory_pool(pool)
.build_arc()
.unwrap();
SessionContext::new_with_config_rt(session_config, runtime)
Expand Down
99 changes: 99 additions & 0 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_physical_plan::collect as collect_batches;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use rstest::rstest;
use test_utils::AccessLogGenerator;

use async_trait::async_trait;
Expand Down Expand Up @@ -615,6 +616,104 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> {
Ok(())
}

// Test configuration `sort_max_spill_merge_degree` in external sorting
// -------------------------------------------------------------------

// Ensure invalid config value of `sort_max_spill_merge_degree` returns error
#[rstest]
#[case(0)]
#[case(1)]
#[tokio::test]
async fn test_invalid_sort_max_spill_merge_degree(
#[case] sort_max_spill_merge_degree: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this #[case] syntax looks so elegant for writing repetitive tests

) -> Result<()> {
let config = SessionConfig::new()
.with_sort_max_spill_merge_degree(sort_max_spill_merge_degree);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(20 * 1024 * 1024)))
.build_arc()
.unwrap();
let ctx = SessionContext::new_with_config_rt(config, runtime);
let df = ctx
.sql("select * from generate_series(1, 10000000) as t1(v1) order by v1")
.await
.unwrap();

let err = df.collect().await.unwrap_err();
assert_contains!(
err.to_string(),
"sort_max_spill_merge_degree must be >= 2 in order to continue external sorting"
);
Ok(())
}

// Create a `SessionContext` with a 1MB memory pool, and provided max merge degree.
//
// In order to let test case run faster and efficient, a memory pool with 1MB is used.
// To let queries succeed under such a small memory limit, related configs should be
// changed as follows.
fn create_ctx_with_1mb_mem_pool(
sort_max_spill_merge_degree: usize,
) -> Result<SessionContext> {
let config = SessionConfig::new()
.with_sort_max_spill_merge_degree(sort_max_spill_merge_degree)
.with_sort_spill_reservation_bytes(64 * 1024) // 64KB
.with_sort_in_place_threshold_bytes(0)
.with_batch_size(128) // To reduce test memory usage
.with_target_partitions(1);

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(1024 * 1024))) // 1MB memory limit
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(config, runtime);

Ok(ctx)
}

// Test different values of `sort_max_spill_merge_degree`.
// The test setup and query will spill over 10 temporary files, the max merge degree is
// varied to cover different number of passes in multi-pass spill merge.
#[rstest]
#[case(2)]
#[case(3)]
#[case(4)]
#[case(7)]
#[case(32)]
#[case(310104)]
#[tokio::test]
async fn test_fuzz_sort_max_spill_merge_degree(
#[case] sort_max_spill_merge_degree: usize,
) -> Result<()> {
let ctx = create_ctx_with_1mb_mem_pool(sort_max_spill_merge_degree)?;

let dataset_size = 1000000;
let sql = format!(
"select * from generate_series(1, {}) as t1(v1) order by v1",
dataset_size
);
let df = ctx.sql(&sql).await.unwrap();

let plan = df.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();

// Ensure the query succeeds
let batches = collect_batches(Arc::clone(&plan), task_ctx)
.await
.expect("Query execution failed");

// Quick check. More extensive tests will be covered in sort fuzz tests.
let result_size = batches.iter().map(|b| b.num_rows()).sum::<usize>();
assert_eq!(result_size, dataset_size);

let spill_count = plan.metrics().unwrap().spill_count().unwrap();
assert!(spill_count > 10);

Ok(())
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
11 changes: 11 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,17 @@ impl SessionConfig {
self
}

/// Set the `ExecutionOptions::sort_max_spill_merge_degree`
pub fn with_sort_max_spill_merge_degree(
mut self,
sort_max_spill_merge_degree: usize,
) -> Self {
// Validation (must be >= 2) is done during execution, because there are
// other ways to configure this option.
self.options.execution.sort_max_spill_merge_degree = sort_max_spill_merge_degree;
self
}

/// Enables or disables the enforcement of batch size in joins
pub fn with_enforce_batch_size_in_joins(
mut self,
Expand Down
Loading