diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 6a0a797d4ded..65cc08cfb275 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -49,6 +49,7 @@ use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; use test_utils::AccessLogGenerator; @@ -493,6 +494,36 @@ async fn test_in_mem_buffer_almost_full() { let _ = df.collect().await.unwrap(); } +/// External sort should be able to run if there is very little pre-reserved memory +/// for merge (set configuration sort_spill_reservation_bytes to 0). +#[tokio::test] +async fn test_external_sort_zero_merge_reservation() { + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(0) + .with_target_partitions(14); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024))) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(config, runtime); + + let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;"; + let df = ctx.sql(query).await.unwrap(); + + let physical_plan = df.create_physical_plan().await.unwrap(); + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = physical_plan.execute(0, task_ctx).unwrap(); + + // Ensures execution succeed + let _result = collect(stream).await; + + // Ensures the query spilled during execution + let metrics = physical_plan.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + assert!(spill_count > 0); +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index ed35492041be..adcf4cac43f1 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -216,10 +216,8 @@ struct ExternalSorter { // STATE BUFFERS: // Fields that hold intermediate data during sorting // ======================================================================== - /// Potentially unsorted in memory buffer + /// Unsorted input batches stored in the memory buffer in_mem_batches: Vec, - /// if `Self::in_mem_batches` are sorted - in_mem_batches_sorted: bool, /// During external sorting, in-memory intermediate data will be appended to /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`]. @@ -284,7 +282,6 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], - in_mem_batches_sorted: false, in_progress_spill_file: None, finished_spill_files: vec![], expr: expr.into(), @@ -320,7 +317,6 @@ impl ExternalSorter { } self.in_mem_batches.push(input); - self.in_mem_batches_sorted = false; Ok(()) } @@ -397,16 +393,13 @@ impl ExternalSorter { self.metrics.spill_metrics.spill_file_count.value() } - /// When calling, all `in_mem_batches` must be sorted (*), and then all of them will - /// be appended to the in-progress spill file. - /// - /// (*) 'Sorted' here means globally sorted for all buffered batches when the - /// memory limit is reached, instead of partially sorted within the batch. - async fn spill_append(&mut self) -> Result<()> { - assert!(self.in_mem_batches_sorted); - - // we could always get a chance to free some memory as long as we are holding some - if self.in_mem_batches.is_empty() { + /// Appending globally sorted batches to the in-progress spill file, and clears + /// the `globally_sorted_batches` (also its memory reservation) afterwards. + async fn consume_and_spill_append( + &mut self, + globally_sorted_batches: &mut Vec, + ) -> Result<()> { + if globally_sorted_batches.is_empty() { return Ok(()); } @@ -416,21 +409,25 @@ impl ExternalSorter { Some(self.spill_manager.create_in_progress_file("Sorting")?); } - self.organize_stringview_arrays()?; + Self::organize_stringview_arrays(globally_sorted_batches)?; debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let batches = std::mem::take(&mut self.in_mem_batches); + let batches_to_spill = std::mem::take(globally_sorted_batches); self.reservation.free(); let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { internal_datafusion_err!("In-progress spill file should be initialized") })?; - for batch in batches { + for batch in batches_to_spill { in_progress_file.append_batch(&batch)?; } + if !globally_sorted_batches.is_empty() { + return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking."); + } + Ok(()) } @@ -449,7 +446,7 @@ impl ExternalSorter { Ok(()) } - /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each + /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each /// `StringViewArray` in sequential order by calling `gc()` on them. /// /// Note this is a workaround until is @@ -478,10 +475,12 @@ impl ExternalSorter { /// /// Then when spilling each batch, the writer has to write all referenced buffers /// repeatedly. - fn organize_stringview_arrays(&mut self) -> Result<()> { - let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len()); + fn organize_stringview_arrays( + globally_sorted_batches: &mut Vec, + ) -> Result<()> { + let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len()); - for batch in self.in_mem_batches.drain(..) { + for batch in globally_sorted_batches.drain(..) { let mut new_columns: Vec> = Vec::with_capacity(batch.num_columns()); @@ -507,20 +506,17 @@ impl ExternalSorter { organized_batches.push(organized_batch); } - self.in_mem_batches = organized_batches; + *globally_sorted_batches = organized_batches; Ok(()) } - /// Sorts the in_mem_batches in place + /// Sorts the in_mem_batches and potentially spill the sorted batches. /// - /// Sorting may have freed memory, especially if fetch is `Some`. If - /// the memory usage has dropped by a factor of 2, then we don't have - /// to spill. Otherwise, we spill to free up memory for inserting - /// more batches. - /// The factor of 2 aims to avoid a degenerate case where the - /// memory required for `fetch` is just under the memory available, - /// causing repeated re-sorting of data + /// If the memory usage has dropped by a factor of 2, it might be a sort with + /// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the + /// sorted entries inside `in_mem_batches` to be sorted in the next iteration. + /// Otherwise, we spill the sorted run to free up memory for inserting more batches. /// /// # Arguments /// @@ -539,10 +535,18 @@ impl ExternalSorter { let mut sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken + // to construct a globally sorted stream. + if !self.in_mem_batches.is_empty() { + return internal_err!( + "in_mem_batches should be empty after constructing sorted stream" + ); + } + // 'global' here refers to all buffered batches when the memory limit is + // reached. This variable will buffer the sorted batches after + // sort-preserving merge and incrementally append to spill files. + let mut globally_sorted_batches: Vec = vec![]; - // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. - // We'll gradually collect the sorted stream into self.in_mem_batches, or directly - // write sorted batches to disk when the memory is insufficient. let mut spilled = false; while let Some(batch) = sorted_stream.next().await { let batch = batch?; @@ -551,12 +555,12 @@ impl ExternalSorter { // Although the reservation is not enough, the batch is // already in memory, so it's okay to combine it with previously // sorted batches, and spill together. - self.in_mem_batches.push(batch); - self.spill_append().await?; // reservation is freed in spill() + globally_sorted_batches.push(batch); + self.consume_and_spill_append(&mut globally_sorted_batches) + .await?; // reservation is freed in spill() spilled = true; } else { - self.in_mem_batches.push(batch); - self.in_mem_batches_sorted = true; + globally_sorted_batches.push(batch); } } @@ -570,12 +574,27 @@ impl ExternalSorter { if (self.reservation.size() > before / 2) || force_spill { // We have not freed more than 50% of the memory, so we have to spill to // free up more memory - self.spill_append().await?; + self.consume_and_spill_append(&mut globally_sorted_batches) + .await?; spilled = true; } if spilled { + // There might be some buffered batches that haven't trigger a spill yet. + self.consume_and_spill_append(&mut globally_sorted_batches) + .await?; self.spill_finish().await?; + } else { + // If the memory limit has reached before calling this function, and it + // didn't spill anything, it means this is a sorting with fetch top K + // element: after sorting only the top K elements will be kept in memory. + // For simplicity, those sorted top K entries are put back to unsorted + // `in_mem_batches` to be consumed by the next sort/merge. + if !self.in_mem_batches.is_empty() { + return internal_err!("in_mem_batches should be cleared before"); + } + + self.in_mem_batches = std::mem::take(&mut globally_sorted_batches); } // Reserve headroom for next sort/merge