diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index a1985a1aa447..2deb8fde2da6 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -23,9 +23,10 @@ use std::sync::{Arc, LazyLock}; #[cfg(feature = "extended_tests")] mod memory_limit_validation; -use arrow::array::{ArrayRef, DictionaryArray, RecordBatch}; +use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; +use arrow_schema::{DataType, Field, Schema}; use datafusion::assert_batches_eq; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; @@ -41,7 +42,7 @@ use datafusion_catalog::streaming::StreamingTable; use datafusion_catalog::Session; use datafusion_common::{assert_contains, Result}; use datafusion_execution::memory_pool::{ - GreedyMemoryPool, MemoryPool, TrackConsumersPool, + FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; use datafusion_execution::TaskContext; use datafusion_expr::{Expr, TableType}; @@ -49,6 +50,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::spill::get_record_batch_memory_size; +use rand::Rng; use test_utils::AccessLogGenerator; use async_trait::async_trait; @@ -403,6 +405,69 @@ async fn oom_with_tracked_consumer_pool() { .await } +/// For regression case: if spilled `StringViewArray`'s buffer will be referenced by +/// other batches which are also need to be spilled, then the spill writer will +/// repeatedly write out the same buffer, and after reading back, each batch's size +/// will explode. +/// +/// This test setup will cause 10 spills, each spill will sort around 20 batches. +/// If there is memory explosion for spilled record batch, this test will fail. +#[tokio::test] +async fn test_stringview_external_sort() { + let mut rng = rand::thread_rng(); + let array_length = 1000; + let num_batches = 200; + // Batches contain two columns: random 100-byte string, and random i32 + let mut batches = Vec::with_capacity(num_batches); + + for _ in 0..num_batches { + let strings: Vec = (0..array_length) + .map(|_| { + (0..100) + .map(|_| rng.gen_range(0..=u8::MAX) as char) + .collect() + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let array_ref: ArrayRef = Arc::new(string_array); + + let random_numbers: Vec = + (0..array_length).map(|_| rng.gen_range(0..=1000)).collect(); + let int_array = Int32Array::from(random_numbers); + let int_array_ref: ArrayRef = Arc::new(int_array); + + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("strings", DataType::Utf8View, false), + Field::new("random_numbers", DataType::Int32, false), + ])), + vec![array_ref, int_array_ref], + ) + .unwrap(); + batches.push(batch); + } + + // Run a sql query that sorts the batches by the int column + let schema = batches[0].schema(); + let table = MemTable::try_new(schema, vec![batches]).unwrap(); + let builder = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024))); + let runtime = builder.build_arc().unwrap(); + + let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024); + + let ctx = SessionContext::new_with_config_rt(config, runtime); + ctx.register_table("t", Arc::new(table)).unwrap(); + + let df = ctx + .sql("explain analyze SELECT * FROM t ORDER BY random_numbers") + .await + .unwrap(); + + let _ = df.collect().await.expect("Query execution failed"); +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 30b5abcf8897..d84068527a64 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -24,7 +24,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use crate::common::{spawn_buffered, IPCWriter}; +use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; @@ -44,7 +44,9 @@ use crate::{ Statistics, }; -use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array}; +use arrow::array::{ + Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, +}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, SortField}; @@ -300,6 +302,7 @@ impl ExternalSorter { if input.num_rows() == 0 { return Ok(()); } + self.reserve_memory_for_merge()?; let size = get_reserved_byte_for_record_batch(&input); @@ -397,6 +400,8 @@ impl ExternalSorter { return Ok(0); } + self.organize_stringview_arrays()?; + debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; @@ -414,6 +419,69 @@ impl ExternalSorter { Ok(used) } + /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each + /// `StringViewArray` in sequential order by calling `gc()` on them. + /// + /// Note this is a workaround until is + /// available + /// + /// # Rationale + /// After (merge-based) sorting, all batches will be sorted into a single run, + /// but physically this sorted run is chunked into many small batches. For + /// `StringViewArray`s inside each sorted run, their inner buffers are not + /// re-constructed by default, leading to non-sequential payload locations + /// (permutated by `interleave()` Arrow kernel). A single payload buffer might + /// be shared by multiple `RecordBatch`es. + /// When writing each batch to disk, the writer has to write all referenced buffers, + /// because they have to be read back one by one to reduce memory usage. This + /// causes extra disk reads and writes, and potentially execution failure. + /// + /// # Example + /// Before sorting: + /// batch1 -> buffer1 + /// batch2 -> buffer2 + /// + /// sorted_batch1 -> buffer1 + /// -> buffer2 + /// sorted_batch2 -> buffer1 + /// -> buffer2 + /// + /// Then when spilling each batch, the writer has to write all referenced buffers + /// repeatedly. + fn organize_stringview_arrays(&mut self) -> Result<()> { + let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len()); + + for batch in self.in_mem_batches.drain(..) { + let mut new_columns: Vec> = + Vec::with_capacity(batch.num_columns()); + + let mut arr_mutated = false; + for array in batch.columns() { + if let Some(string_view_array) = + array.as_any().downcast_ref::() + { + let new_array = string_view_array.gc(); + new_columns.push(Arc::new(new_array)); + arr_mutated = true; + } else { + new_columns.push(Arc::clone(array)); + } + } + + let organized_batch = if arr_mutated { + RecordBatch::try_new(batch.schema(), new_columns)? + } else { + batch + }; + + organized_batches.push(organized_batch); + } + + self.in_mem_batches = organized_batches; + + Ok(()) + } + /// Sorts the in_mem_batches in place /// /// Sorting may have freed memory, especially if fetch is `Some`. If @@ -439,36 +507,18 @@ impl ExternalSorter { // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. // We'll gradually collect the sorted stream into self.in_mem_batches, or directly // write sorted batches to disk when the memory is insufficient. - let mut spill_writer: Option = None; while let Some(batch) = sorted_stream.next().await { let batch = batch?; - match &mut spill_writer { - None => { - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Directly write in_mem_batches as well as all the remaining batches in - // sorted_stream to disk. Further batches fetched from `sorted_stream` will - // be handled by the `Some(writer)` matching arm. - let spill_file = - self.runtime.disk_manager.create_tmp_file("Sorting")?; - let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?; - // Flush everything in memory to the spill file - for batch in self.in_mem_batches.drain(..) { - writer.write(&batch)?; - } - // as well as the newly sorted batch - writer.write(&batch)?; - spill_writer = Some(writer); - self.reservation.free(); - self.spills.push(spill_file); - } else { - self.in_mem_batches.push(batch); - self.in_mem_batches_sorted = true; - } - } - Some(writer) => { - writer.write(&batch)?; - } + let sorted_size = get_reserved_byte_for_record_batch(&batch); + if self.reservation.try_grow(sorted_size).is_err() { + // Although the reservation is not enough, the batch is + // already in memory, so it's okay to combine it with previously + // sorted batches, and spill together. + self.in_mem_batches.push(batch); + self.spill().await?; // reservation is freed in spill() + } else { + self.in_mem_batches.push(batch); + self.in_mem_batches_sorted = true; } } @@ -476,17 +526,10 @@ impl ExternalSorter { // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. drop(sorted_stream); - if let Some(writer) = &mut spill_writer { - writer.finish()?; - self.metrics.spill_count.add(1); - self.metrics.spilled_rows.add(writer.num_rows); - self.metrics.spilled_bytes.add(writer.num_bytes); - } - // Sorting may free up some memory especially when fetch is `Some`. If we have // not freed more than 50% of the memory, then we have to spill to free up more // memory for inserting more batches. - if spill_writer.is_none() && self.reservation.size() > before / 2 { + if self.reservation.size() > before / 2 { // We have not freed more than 50% of the memory, so we have to spill to // free up more memory self.spill().await?; @@ -1422,10 +1465,14 @@ mod tests { let spill_count = metrics.spill_count().unwrap(); let spilled_rows = metrics.spilled_rows().unwrap(); let spilled_bytes = metrics.spilled_bytes().unwrap(); - // Processing 840 KB of data using 400 KB of memory requires at least 2 spills - // It will spill roughly 18000 rows and 800 KBytes. - // We leave a little wiggle room for the actual numbers. - assert!((2..=10).contains(&spill_count)); + + // This test case is processing 840KB of data using 400KB of memory. Note + // that buffered batches can't be dropped until all sorted batches are + // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted + // batches. + // The number of spills is roughly calculated as: + // `number_of_batches / (sort_spill_reservation_bytes / batch_size)` + assert!((12..=18).contains(&spill_count)); assert!((15000..=20000).contains(&spilled_rows)); assert!((700000..=900000).contains(&spilled_bytes));