diff --git a/Cargo.lock b/Cargo.lock index 1cca34e1899e..bf0de5c6315b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3877,9 +3877,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.171" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libflate" diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e0f63d6d81c..2dc6ca2745c2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -345,6 +345,13 @@ config_namespace! { /// batches and merged. pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + /// When doing external sorting, the maximum number of spilled files to + /// read back at once. Those read files in the same merge step will be sort- + /// preserving-merged and re-spilled, and the step will be repeated to reduce + /// the number of spilled files in multiple passes, until a final sorted run + /// can be produced. + pub sort_max_spill_merge_degree: usize, default = 16 + /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 0b0f0aa2f105..ed36f2335f8b 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -17,7 +17,7 @@ //! Fuzz Test for various corner cases sorting RecordBatches exceeds available memory and should spill -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc}; use arrow::{ array::{as_string_array, ArrayRef, Int32Array, StringArray}, @@ -31,7 +31,7 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::cast::as_int32_array; -use datafusion_execution::memory_pool::GreedyMemoryPool; +use datafusion_execution::memory_pool::{FairSpillPool, TrackConsumersPool}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -43,17 +43,25 @@ const KB: usize = 1 << 10; #[cfg_attr(tarpaulin, ignore)] async fn test_sort_10k_mem() { for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] { - let (input, collected) = SortTest::new() - .with_int32_batches(batch_size) - .with_sort_columns(vec!["x"]) - .with_pool_size(10 * KB) - .with_should_spill(should_spill) - .run() - .await; + for sort_max_spill_merge_degree in [2, 5, 20, 1024] { + if batch_size > 20000 && sort_max_spill_merge_degree < 16 { + // takes too long to complete, skip it + continue; + } - let expected = partitions_to_sorted_vec(&input); - let actual = batches_to_vec(&collected); - assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); + let (input, collected) = SortTest::new() + .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) + .with_pool_size(10 * KB) + .with_should_spill(should_spill) + .with_sort_max_spill_merge_degree(sort_max_spill_merge_degree) + .run() + .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); + } } } @@ -118,39 +126,43 @@ async fn test_sort_strings_100k_mem() { #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_multi_columns_100k_mem() { + fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> { + let mut rows: Vec<_> = Vec::new(); + let i32_array = as_int32_array(b.column(0)).unwrap(); + let string_array = as_string_array(b.column(1)); + for i in 0..b.num_rows() { + let str = string_array.value(i).to_string(); + let i32 = i32_array.value(i); + rows.push((i32, str)); + } + rows + } + for (batch_size, should_spill) in [(5, false), (1000, false), (10000, true), (20000, true)] { - let (input, collected) = SortTest::new() - .with_int32_utf8_batches(batch_size) - .with_sort_columns(vec!["x", "y"]) - .with_pool_size(100 * KB) - .with_should_spill(should_spill) - .run() - .await; - - fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> { - let mut rows: Vec<_> = Vec::new(); - let i32_array = as_int32_array(b.column(0)).unwrap(); - let string_array = as_string_array(b.column(1)); - for i in 0..b.num_rows() { - let str = string_array.value(i).to_string(); - let i32 = i32_array.value(i); - rows.push((i32, str)); - } - rows + for sort_max_spill_merge_degree in [2, 5, 2048] { + let (input, collected) = SortTest::new() + .with_int32_utf8_batches(batch_size) + .with_sort_columns(vec!["x", "y"]) + .with_pool_size(100 * KB) + .with_should_spill(should_spill) + .with_sort_max_spill_merge_degree(sort_max_spill_merge_degree) + .run() + .await; + + let mut input = input + .iter() + .flat_map(|p| p.iter()) + .flat_map(record_batch_to_vec) + .collect::>(); + input.sort_unstable(); + let actual = collected + .iter() + .flat_map(record_batch_to_vec) + .collect::>(); + assert_eq!(input, actual); } - let mut input = input - .iter() - .flat_map(|p| p.iter()) - .flat_map(record_batch_to_vec) - .collect::>(); - input.sort_unstable(); - let actual = collected - .iter() - .flat_map(record_batch_to_vec) - .collect::>(); - assert_eq!(input, actual); } } @@ -180,11 +192,17 @@ struct SortTest { pool_size: Option, /// If true, expect the sort to spill should_spill: bool, + /// Configuration `ExecutionOptions::sort_max_spill_merge_degree` to be used + /// in the test case run + sort_max_spill_merge_degree: usize, } impl SortTest { fn new() -> Self { - Default::default() + Self { + sort_max_spill_merge_degree: 16, // Default::default() will be 1, which is invalid for this config + ..Default::default() + } } fn with_sort_columns(mut self, sort_columns: Vec<&str>) -> Self { @@ -221,6 +239,14 @@ impl SortTest { self } + fn with_sort_max_spill_merge_degree( + mut self, + sort_max_spill_merge_degree: usize, + ) -> Self { + self.sort_max_spill_merge_degree = sort_max_spill_merge_degree; + self + } + /// Sort the input using SortExec and ensure the results are /// correct according to `Vec::sort` both with and without spilling async fn run(&self) -> (Vec>, Vec) { @@ -248,7 +274,8 @@ impl SortTest { let exec = MemorySourceConfig::try_new_exec(&input, schema, None).unwrap(); let sort = Arc::new(SortExec::new(sort_ordering, exec)); - let session_config = SessionConfig::new(); + let session_config = SessionConfig::new() + .with_sort_max_spill_merge_degree(self.sort_max_spill_merge_degree); let session_ctx = if let Some(pool_size) = self.pool_size { // Make sure there is enough space for the initial spill // reservation @@ -258,9 +285,15 @@ impl SortTest { .execution .sort_spill_reservation_bytes, ); + println!("Pool size: {}", pool_size); + let inner_pool = FairSpillPool::new(pool_size); + let pool = Arc::new(TrackConsumersPool::new( + inner_pool, + NonZeroUsize::new(5).unwrap(), + )); let runtime = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) + .with_memory_pool(pool) .build_arc() .unwrap(); SessionContext::new_with_config_rt(session_config, runtime) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 01342d1604fc..0627dba1334a 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -54,6 +54,7 @@ use datafusion_physical_plan::collect as collect_batches; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; +use rstest::rstest; use test_utils::AccessLogGenerator; use async_trait::async_trait; @@ -615,6 +616,104 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> { Ok(()) } +// Test configuration `sort_max_spill_merge_degree` in external sorting +// ------------------------------------------------------------------- + +// Ensure invalid config value of `sort_max_spill_merge_degree` returns error +#[rstest] +#[case(0)] +#[case(1)] +#[tokio::test] +async fn test_invalid_sort_max_spill_merge_degree( + #[case] sort_max_spill_merge_degree: usize, +) -> Result<()> { + let config = SessionConfig::new() + .with_sort_max_spill_merge_degree(sort_max_spill_merge_degree); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(20 * 1024 * 1024))) + .build_arc() + .unwrap(); + let ctx = SessionContext::new_with_config_rt(config, runtime); + let df = ctx + .sql("select * from generate_series(1, 10000000) as t1(v1) order by v1") + .await + .unwrap(); + + let err = df.collect().await.unwrap_err(); + assert_contains!( + err.to_string(), + "sort_max_spill_merge_degree must be >= 2 in order to continue external sorting" + ); + Ok(()) +} + +// Create a `SessionContext` with a 1MB memory pool, and provided max merge degree. +// +// In order to let test case run faster and efficient, a memory pool with 1MB is used. +// To let queries succeed under such a small memory limit, related configs should be +// changed as follows. +fn create_ctx_with_1mb_mem_pool( + sort_max_spill_merge_degree: usize, +) -> Result { + let config = SessionConfig::new() + .with_sort_max_spill_merge_degree(sort_max_spill_merge_degree) + .with_sort_spill_reservation_bytes(64 * 1024) // 64KB + .with_sort_in_place_threshold_bytes(0) + .with_batch_size(128) // To reduce test memory usage + .with_target_partitions(1); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(1024 * 1024))) // 1MB memory limit + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(config, runtime); + + Ok(ctx) +} + +// Test different values of `sort_max_spill_merge_degree`. +// The test setup and query will spill over 10 temporary files, the max merge degree is +// varied to cover different number of passes in multi-pass spill merge. +#[rstest] +#[case(2)] +#[case(3)] +#[case(4)] +#[case(7)] +#[case(32)] +#[case(310104)] +#[tokio::test] +async fn test_fuzz_sort_max_spill_merge_degree( + #[case] sort_max_spill_merge_degree: usize, +) -> Result<()> { + let ctx = create_ctx_with_1mb_mem_pool(sort_max_spill_merge_degree)?; + + let dataset_size = 1000000; + let sql = format!( + "select * from generate_series(1, {}) as t1(v1) order by v1", + dataset_size + ); + let df = ctx.sql(&sql).await.unwrap(); + + let plan = df.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + + // Ensure the query succeeds + let batches = collect_batches(Arc::clone(&plan), task_ctx) + .await + .expect("Query execution failed"); + + // Quick check. More extensive tests will be covered in sort fuzz tests. + let result_size = batches.iter().map(|b| b.num_rows()).sum::(); + assert_eq!(result_size, dataset_size); + + let spill_count = plan.metrics().unwrap().spill_count().unwrap(); + assert!(spill_count > 10); + + Ok(()) +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 1e00a1ce4725..23af3c91caef 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -434,6 +434,17 @@ impl SessionConfig { self } + /// Set the `ExecutionOptions::sort_max_spill_merge_degree` + pub fn with_sort_max_spill_merge_degree( + mut self, + sort_max_spill_merge_degree: usize, + ) -> Self { + // Validation (must be >= 2) is done during execution, because there are + // other ways to configure this option. + self.options.execution.sort_max_spill_merge_degree = sort_max_spill_merge_degree; + self + } + /// Enables or disables the enforcement of batch size in joins pub fn with_enforce_batch_size_in_joins( mut self, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9d0f34cc7f0f..3ca4b952ec37 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -34,7 +34,6 @@ use crate::metrics::{ use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::get_record_batch_memory_size; -use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::SpillManager; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; @@ -51,7 +50,8 @@ use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::{ - exec_datafusion_err, internal_datafusion_err, internal_err, DataFusionError, Result, + config_err, exec_datafusion_err, internal_datafusion_err, internal_err, + DataFusionError, Result, }; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -61,7 +61,7 @@ use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; -use log::{debug, trace}; +use log::trace; struct ExternalSorterMetrics { /// metrics @@ -214,6 +214,8 @@ struct ExternalSorter { /// the data will be concatenated and sorted in place rather than /// sort/merged. sort_in_place_threshold_bytes: usize, + /// See the doc in `ExecutionOptions::sort_max_spill_merge_degree` for more details. + sort_max_spill_merge_degree: usize, // ======================================================================== // STATE BUFFERS: @@ -222,9 +224,6 @@ struct ExternalSorter { /// Unsorted input batches stored in the memory buffer in_mem_batches: Vec, - /// During external sorting, in-memory intermediate data will be appended to - /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`]. - in_progress_spill_file: Option, /// If data has previously been spilled, the locations of the spill files (in /// Arrow IPC format) /// Within the same spill file, the data might be chunked into multiple batches, @@ -263,6 +262,7 @@ impl ExternalSorter { batch_size: usize, sort_spill_reservation_bytes: usize, sort_in_place_threshold_bytes: usize, + sort_max_spill_merge_degree: usize, metrics: &ExecutionPlanMetricsSet, runtime: Arc, ) -> Result { @@ -300,7 +300,6 @@ impl ExternalSorter { Ok(Self { schema, in_mem_batches: vec![], - in_progress_spill_file: None, finished_spill_files: vec![], expr: expr.into(), sort_keys_row_converter: Arc::new(converter), @@ -312,6 +311,7 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, + sort_max_spill_merge_degree, }) } @@ -351,8 +351,6 @@ impl ExternalSorter { self.merge_reservation.free(); if self.spilled_before() { - let mut streams = vec![]; - // Sort `in_mem_batches` and spill it first. If there are many // `in_mem_batches` and the memory limit is almost reached, merging // them with the spilled files at the same time might cause OOM. @@ -360,25 +358,7 @@ impl ExternalSorter { self.sort_and_spill_in_mem_batches().await?; } - for spill in self.finished_spill_files.drain(..) { - if !spill.path().exists() { - return internal_err!("Spill file {:?} does not exist", spill.path()); - } - let stream = self.spill_manager.read_spill_as_stream(spill)?; - streams.push(stream); - } - - let expressions: LexOrdering = self.expr.iter().cloned().collect(); - - StreamingMergeBuilder::new() - .with_streams(streams) - .with_schema(Arc::clone(&self.schema)) - .with_expressions(expressions.as_ref()) - .with_metrics(self.metrics.baseline.clone()) - .with_batch_size(self.batch_size) - .with_fetch(None) - .with_reservation(self.merge_reservation.new_empty()) - .build() + return self.merge_spilled_files_multi_pass().await; } else { self.in_mem_sort_stream(self.metrics.baseline.clone()) } @@ -404,59 +384,6 @@ impl ExternalSorter { self.metrics.spill_metrics.spill_file_count.value() } - /// Appending globally sorted batches to the in-progress spill file, and clears - /// the `globally_sorted_batches` (also its memory reservation) afterwards. - async fn consume_and_spill_append( - &mut self, - globally_sorted_batches: &mut Vec, - ) -> Result<()> { - if globally_sorted_batches.is_empty() { - return Ok(()); - } - - // Lazily initialize the in-progress spill file - if self.in_progress_spill_file.is_none() { - self.in_progress_spill_file = - Some(self.spill_manager.create_in_progress_file("Sorting")?); - } - - Self::organize_stringview_arrays(globally_sorted_batches)?; - - debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - - let batches_to_spill = std::mem::take(globally_sorted_batches); - self.reservation.free(); - - let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { - internal_datafusion_err!("In-progress spill file should be initialized") - })?; - - for batch in batches_to_spill { - in_progress_file.append_batch(&batch)?; - } - - if !globally_sorted_batches.is_empty() { - return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking."); - } - - Ok(()) - } - - /// Finishes the in-progress spill file and moves it to the finished spill files. - async fn spill_finish(&mut self) -> Result<()> { - let mut in_progress_file = - self.in_progress_spill_file.take().ok_or_else(|| { - internal_datafusion_err!("Should be called after `spill_append`") - })?; - let spill_file = in_progress_file.finish()?; - - if let Some(spill_file) = spill_file { - self.finished_spill_files.push(spill_file); - } - - Ok(()) - } - /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each /// `StringViewArray` in sequential order by calling `gc()` on them. /// @@ -537,8 +464,9 @@ impl ExternalSorter { // reserved again for the next spill. self.merge_reservation.free(); - let mut sorted_stream = + let sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. if !self.in_mem_batches.is_empty() { @@ -546,47 +474,254 @@ impl ExternalSorter { "in_mem_batches should be empty after constructing sorted stream" ); } - // 'global' here refers to all buffered batches when the memory limit is - // reached. This variable will buffer the sorted batches after - // sort-preserving merge and incrementally append to spill files. - let mut globally_sorted_batches: Vec = vec![]; + let spill_file = self.write_stream_to_spill_file(sorted_stream).await?; + self.finished_spill_files.push(spill_file); + + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + Ok(()) + } + + /// Create a new spill file, and write all batches from the stream to the file. + /// + /// Note: After the spill is done, the memory reservation will be freed to 0, + /// because `sorted_stream` holds all buffered batches. + async fn write_stream_to_spill_file( + &mut self, + mut sorted_stream: SendableRecordBatchStream, + ) -> Result { + // Release the memory reserved for merge back to the pool so there is some + // left when the executed stream requests an allocation (now the stream to + // write are SortPreservingMergeStream, which requires memory). + // At the end of this function, memory will be reserved again for the next spill. + self.merge_reservation.free(); + + let mut in_progress_spill_file = + self.spill_manager.create_in_progress_file("Sorting")?; + + // Incrementally append globally sorted batches to the spill file, because + // there might not be enough memory to materialize all batches at once. while let Some(batch) = sorted_stream.next().await { - let batch = batch?; - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Although the reservation is not enough, the batch is - // already in memory, so it's okay to combine it with previously - // sorted batches, and spill together. - globally_sorted_batches.push(batch); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; // reservation is freed in spill() - } else { - globally_sorted_batches.push(batch); - } + let mut batch = vec![batch?]; + Self::organize_stringview_arrays(&mut batch)?; + in_progress_spill_file.append_batch(&batch[0])?; } // Drop early to free up memory reserved by the sorted stream, otherwise the // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. drop(sorted_stream); - self.consume_and_spill_append(&mut globally_sorted_batches) - .await?; - self.spill_finish().await?; + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| { + internal_datafusion_err!("Writing stream with 0 batch is not allowed") + })?; + + Ok(spill_file) + } + + /// Sort-preserving merges the spilled files into a single file. + /// + /// All of input spill files are sorted by sort keys within each file, and the + /// returned file is also sorted by sort keys. + /// + /// This method consumes the input spill files, and returns a new compacted + /// spill file. After returnning, the input files will be cleaned up (deleted). + /// + /// # Example: + /// Input spill files: + /// SpillFile1 (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + /// SpillFile2 (sorted by SortKeys): + /// [batch1(100 rows)] + /// + /// After merging, it returns a new spill file: + /// returns MergedSpillFile (sorted by SortKeys): + /// [batch1(100 rows)], [batch2(100 rows)] + async fn consume_and_merge_spill_files( + &mut self, + input_spill_files: Vec, + ) -> Result { + // ==== Convert each spill file into a stream ==== + let partially_sorted_streams = input_spill_files + .into_iter() + .map(|spill_file| { + if !spill_file.path().exists() { + return internal_err!( + "Spill file {:?} does not exist", + spill_file.path() + ); + } + + self.spill_manager.read_spill_as_stream(spill_file) + }) + .collect::>>()?; + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + // ==== Doing sort-preserving merge on input partially sorted streams ==== + let spm_stream = StreamingMergeBuilder::new() + .with_streams(partially_sorted_streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(sort_exprs.as_ref()) + .with_metrics(self.metrics.baseline.clone()) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build()?; + + // ==== Write to a single merged spill file ==== + let merged_spill_file = self.write_stream_to_spill_file(spm_stream).await?; - // Sanity check after spilling - let buffers_cleared_property = - self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty(); - if !buffers_cleared_property { + Ok(merged_spill_file) + } + + /// Performs a multi-pass merge of spilled files to create a globally sorted stream. The merge degree is limited by memory constraints. + /// - In each pass, existing spill files are split into groups, then sort-preserving merged, and re-spilled to a smaller number of spill files. + /// - For each combining step, up to the maximum merge degree of spill files are merged. + /// + /// # Example + /// ```text + /// Notation: batch(n) means a batch with n rows + /// + /// Max merge degree: 2 + /// Initial spill files: + /// spill_file_1: batch(100) + /// spill_file_2: batch(100) + /// spill_file_3: batch(100) + /// spill_file_4: batch(100) + /// + /// After pass 1: + /// spill_file_1: batch(100), batch(100) + /// spill_file_2: batch(100), batch(100) + /// + /// After pass 2: + /// merged_stream: batch(100), batch(100), batch(100), batch(100) + /// ``` + async fn merge_spilled_files_multi_pass( + &mut self, + ) -> Result { + // ────────────────────────────────────────────────────────────────────── + // Edge cases + // ────────────────────────────────────────────────────────────────────── + if self.finished_spill_files.is_empty() { return internal_err!( - "in_mem_batches and globally_sorted_batches should be cleared before" + "No spilled files to merge at the beginning of multi-pass merge" ); } + if self.finished_spill_files.len() == 1 { + return self + .spill_manager + .read_spill_as_stream(self.finished_spill_files.remove(0)); + } - // Reserve headroom for next sort/merge - self.reserve_memory_for_merge()?; + // ────────────────────────────────────────────────────────────────────── + // Merge spilled files in multiple pass + // ────────────────────────────────────────────────────────────────────── + let spill_files = std::mem::take(&mut self.finished_spill_files); + let spill_files = self.merge_spill_files_multi_pass(spill_files).await?; + + // ────────────────────────────────────────────────────────────────────── + // Finally, <= max merge degree spilled files are left, merge them into a + // single globally sorted stream + // ────────────────────────────────────────────────────────────────────── + let partially_sorted_streams = spill_files + .into_iter() + .map(|spill_file| self.spill_manager.read_spill_as_stream(spill_file)) + .collect::>>()?; - Ok(()) + // Edge cases + if partially_sorted_streams.is_empty() { + return internal_err!( + "No spilled files to merge at the final stage of multi-pass merge" + ); + } + if partially_sorted_streams.len() == 1 { + return Ok(partially_sorted_streams.into_iter().next().unwrap()); + } + + let sort_exprs: LexOrdering = self.expr.iter().cloned().collect(); + + let spm_stream = StreamingMergeBuilder::new() + .with_streams(partially_sorted_streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(sort_exprs.as_ref()) + .with_metrics(self.metrics.baseline.clone()) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build()?; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + spm_stream, + ))) + } + + /// Iteratively merges and re-spills files until the number of spill files is ≤ MAX_SPILL_MERGE_DEGREE + async fn merge_spill_files_multi_pass( + &mut self, + mut spill_files_cur_pass: Vec, + ) -> Result> { + let mut spill_files_next_pass: Vec = vec![]; + + // Merge spill files to the closest power of the configured max merge + // degree, until the number of spill files is <= max merge degree + // + // Example: + // initial spill files count: 30 + // max merge degree: 4 + // pass 1: merge 30 files into 16(4^2) files + // pass 2: merge 16 files into 4(4^1) files + // pass 3: now the number of spill files is <= max merge degree: merge them into a single sorted stream + while spill_files_cur_pass.len() > self.sort_max_spill_merge_degree { + let log_base = self.sort_max_spill_merge_degree as f64; + let num_files = spill_files_cur_pass.len() as f64; + let num_passes = num_files.log(log_base).ceil() as usize; + // For the example above: + // - In pass 1, there are 30 files, `num_passes` is 3, so + // `next_pass_merge_degree` is 4^(3-1) = 16 + // - In pass 2, `next_pass_merge_degree` can be calculated to 4 similarly + let next_pass_merge_degree = log_base.powi((num_passes - 1) as i32); + + // Distribute spill files into `next_pass_merge_degree` groups as evenly as possible. + // For example, when splitting 11 files into 3 groups: + // 1. Base group file count = floor(11 / 3) = 3 files per group + // Initial distribution: [3, 3, 3] files per group + // 2. Remainder = 11 % 3 = 2 files + // 3. Distribute remainder by adding 1 file to first 2 groups + // Final distribution: [4, 4, 3] files per group + let base_size = (num_files / next_pass_merge_degree) as usize; + let remainder = (num_files % next_pass_merge_degree) as usize; + + let num_files_per_group: Vec = (0..next_pass_merge_degree as usize) + .map(|i| base_size + if i < remainder { 1 } else { 0 }) + .collect(); + + for num_files_to_merge in num_files_per_group { + // take the first num_files_to_merge files from spill_files + let files_to_merge = + spill_files_cur_pass.drain(0..num_files_to_merge).collect(); + let merged_spill_file = + self.consume_and_merge_spill_files(files_to_merge).await?; + spill_files_next_pass.push(merged_spill_file); + } + + if !spill_files_cur_pass.is_empty() { + return internal_err!( + "Spill files should all be compacted, but there are {} files left", + spill_files_cur_pass.len() + ); + } + + // Prepare for the next iteration + spill_files_cur_pass = std::mem::take(&mut spill_files_next_pass); + } + + Ok(spill_files_cur_pass) } /// Consumes in_mem_batches returning a sorted stream of @@ -1267,6 +1402,16 @@ impl ExecutionPlan for SortExec { ))) } (false, None) => { + // Validate `ExecutionOptions::sort_max_spill_merge_degree` (must be >= 2) + let spill_max_merge_degree = + execution_options.sort_max_spill_merge_degree; + if spill_max_merge_degree < 2 { + return config_err!( + "sort_max_spill_merge_degree must be >= 2 in order to continue external sorting, but got {}", + spill_max_merge_degree + ); + } + let mut sorter = ExternalSorter::new( partition, input.schema(), @@ -1274,6 +1419,7 @@ impl ExecutionPlan for SortExec { context.session_config().batch_size(), execution_options.sort_spill_reservation_bytes, execution_options.sort_in_place_threshold_bytes, + spill_max_merge_degree, &self.metrics_set, context.runtime_env(), )?; diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a55ac079aa74..35bbc758141b 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -6795,4 +6795,3 @@ select c2, count(*) from test WHERE 1 = 1 group by c2; 4 1 5 1 6 1 - diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 87abaadb516f..57a515993e87 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -261,6 +261,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_max_spill_merge_degree 16 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 @@ -361,6 +362,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. +datafusion.execution.sort_max_spill_merge_degree 16 When doing external sorting, the maximum number of spilled files to read back at once. Those read files in the same merge step will be sort- preserving-merged and re-spilled, and the step will be repeated to reduce the number of spilled files in multiple passes, until a final sorted run can be produced. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 7a46d59d893e..56f97403853a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -85,6 +85,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.sort_max_spill_merge_degree | 16 | When doing external sorting, the maximum number of spilled files to read back at once. Those read files in the same merge step will be sort- preserving-merged and re-spilled, and the step will be repeated to reduce the number of spilled files in multiple passes, until a final sorted run can be produced. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | diff --git a/parquet-testing b/parquet-testing index 6e851ddd768d..e845e41789ec 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff +Subproject commit e845e41789ec8d3aa6317e6464cdbbca987bf91d