diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index b26a08dd0fad..a4a83b84b655 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -143,6 +143,31 @@ impl Drop for BaselineMetrics { } } +/// Helper for creating and tracking spill-related metrics for +/// each operator +#[derive(Debug, Clone)] +pub struct SpillMetrics { + /// count of spills during the execution of the operator + pub spill_file_count: Count, + + /// total spilled bytes during the execution of the operator + pub spilled_bytes: Count, + + /// total spilled rows during the execution of the operator + pub spilled_rows: Count, +} + +impl SpillMetrics { + /// Create a new SpillMetrics structure + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + spill_file_count: MetricBuilder::new(metrics).spill_count(partition), + spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), + spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition), + } + } +} + /// Trait for things that produce output rows as a result of execution. pub trait RecordOutput { /// Record that some number of output rows have been produced diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 50252e8d973a..2ac7ac1299a0 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -31,7 +31,7 @@ use std::{ use datafusion_common::HashMap; // public exports -pub use baseline::{BaselineMetrics, RecordOutput}; +pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics}; pub use builder::MetricBuilder; pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3d2323eb4336..e2d665e1d814 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -29,13 +29,11 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, }; use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{ - get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, -}; +use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ @@ -50,7 +48,7 @@ use arrow::array::{ use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, SortField}; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; @@ -65,23 +63,14 @@ struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, - /// count of spills during the execution of the operator - spill_count: Count, - - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - - /// total spilled rows during the execution of the operator - spilled_rows: Count, + spill_metrics: SpillMetrics, } impl ExternalSorterMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), - spill_count: MetricBuilder::new(metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), - spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition), + spill_metrics: SpillMetrics::new(metrics, partition), } } } @@ -230,9 +219,14 @@ struct ExternalSorter { /// if `Self::in_mem_batches` are sorted in_mem_batches_sorted: bool, - /// If data has previously been spilled, the locations of the - /// spill files (in Arrow IPC format) - spills: Vec, + /// During external sorting, in-memory intermediate data will be appended to + /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`]. + in_progress_spill_file: Option, + /// If data has previously been spilled, the locations of the spill files (in + /// Arrow IPC format) + /// Within the same spill file, the data might be chunked into multiple batches, + /// and ordered by sort keys. + finished_spill_files: Vec, // ======================================================================== // EXECUTION RESOURCES: @@ -244,6 +238,7 @@ struct ExternalSorter { runtime: Arc, /// Reservation for in_mem_batches reservation: MemoryReservation, + spill_manager: SpillManager, /// Reservation for the merging of in-memory batches. If the sort /// might spill, `sort_spill_reservation_bytes` will be @@ -278,15 +273,23 @@ impl ExternalSorter { MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) .register(&runtime.memory_pool); + let spill_manager = SpillManager::new( + Arc::clone(&runtime), + metrics.spill_metrics.clone(), + Arc::clone(&schema), + ); + Self { schema, in_mem_batches: vec![], in_mem_batches_sorted: false, - spills: vec![], + in_progress_spill_file: None, + finished_spill_files: vec![], expr: expr.into(), metrics, fetch, reservation, + spill_manager, merge_reservation, runtime, batch_size, @@ -320,7 +323,7 @@ impl ExternalSorter { } fn spilled_before(&self) -> bool { - !self.spills.is_empty() + !self.finished_spill_files.is_empty() } /// Returns the final sorted output of all batches inserted via @@ -348,11 +351,11 @@ impl ExternalSorter { self.sort_or_spill_in_mem_batches(true).await?; } - for spill in self.spills.drain(..) { + for spill in self.finished_spill_files.drain(..) { if !spill.path().exists() { return internal_err!("Spill file {:?} does not exist", spill.path()); } - let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?; + let stream = self.spill_manager.read_spill_as_stream(spill)?; streams.push(stream); } @@ -379,46 +382,69 @@ impl ExternalSorter { /// How many bytes have been spilled to disk? fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes.value() + self.metrics.spill_metrics.spilled_bytes.value() } /// How many rows have been spilled to disk? fn spilled_rows(&self) -> usize { - self.metrics.spilled_rows.value() + self.metrics.spill_metrics.spilled_rows.value() } /// How many spill files have been created? fn spill_count(&self) -> usize { - self.metrics.spill_count.value() + self.metrics.spill_metrics.spill_file_count.value() } - /// Writes any `in_memory_batches` to a spill file and clears - /// the batches. The contents of the spill file are sorted. + /// When calling, all `in_mem_batches` must be sorted (*), and then all of them will + /// be appended to the in-progress spill file. /// - /// Returns the amount of memory freed. - async fn spill(&mut self) -> Result { + /// (*) 'Sorted' here means globally sorted for all buffered batches when the + /// memory limit is reached, instead of partially sorted within the batch. + async fn spill_append(&mut self) -> Result<()> { + assert!(self.in_mem_batches_sorted); + // we could always get a chance to free some memory as long as we are holding some if self.in_mem_batches.is_empty() { - return Ok(0); + return Ok(()); + } + + // Lazily initialize the in-progress spill file + if self.in_progress_spill_file.is_none() { + self.in_progress_spill_file = + Some(self.spill_manager.create_in_progress_file("Sorting")?); } self.organize_stringview_arrays()?; debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - let (spilled_rows, spilled_bytes) = spill_record_batches( - &batches, - spill_file.path().into(), - Arc::clone(&self.schema), - )?; - let used = self.reservation.free(); - self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); - self.metrics.spilled_rows.add(spilled_rows); - self.spills.push(spill_file); - Ok(used) + self.reservation.free(); + + let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { + internal_datafusion_err!("In-progress spill file should be initialized") + })?; + + for batch in batches { + in_progress_file.append_batch(&batch)?; + } + + Ok(()) + } + + /// Finishes the in-progress spill file and moves it to the finished spill files. + async fn spill_finish(&mut self) -> Result<()> { + let mut in_progress_file = + self.in_progress_spill_file.take().ok_or_else(|| { + internal_datafusion_err!("Should be called after `spill_append`") + })?; + let spill_file = in_progress_file.finish()?; + + if let Some(spill_file) = spill_file { + self.finished_spill_files.push(spill_file); + } + + Ok(()) } /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each @@ -515,6 +541,7 @@ impl ExternalSorter { // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. // We'll gradually collect the sorted stream into self.in_mem_batches, or directly // write sorted batches to disk when the memory is insufficient. + let mut spilled = false; while let Some(batch) = sorted_stream.next().await { let batch = batch?; let sorted_size = get_reserved_byte_for_record_batch(&batch); @@ -523,7 +550,8 @@ impl ExternalSorter { // already in memory, so it's okay to combine it with previously // sorted batches, and spill together. self.in_mem_batches.push(batch); - self.spill().await?; // reservation is freed in spill() + self.spill_append().await?; // reservation is freed in spill() + spilled = true; } else { self.in_mem_batches.push(batch); self.in_mem_batches_sorted = true; @@ -540,7 +568,12 @@ impl ExternalSorter { if (self.reservation.size() > before / 2) || force_spill { // We have not freed more than 50% of the memory, so we have to spill to // free up more memory - self.spill().await?; + self.spill_append().await?; + spilled = true; + } + + if spilled { + self.spill_finish().await?; } // Reserve headroom for next sort/merge @@ -1489,7 +1522,14 @@ mod tests { // batches. // The number of spills is roughly calculated as: // `number_of_batches / (sort_spill_reservation_bytes / batch_size)` - assert!((12..=18).contains(&spill_count)); + + // If this assertion fail with large spill count, make sure the following + // case does not happen: + // During external sorting, one sorted run should be spilled to disk in a + // single file, due to memory limit we might need to append to the file + // multiple times to spill all the data. Make sure we're not writing each + // appending as a separate file. + assert!((4..=8).contains(&spill_count)); assert!((15000..=20000).contains(&spilled_rows)); assert!((900000..=1000000).contains(&spilled_bytes)); diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index fa1b8a91cec7..381761203786 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -21,11 +21,13 @@ use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::ptr::NonNull; +use std::sync::Arc; use arrow::array::ArrayData; use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; +use datafusion_execution::runtime_env::RuntimeEnv; use log::debug; use tokio::sync::mpsc::Sender; @@ -34,6 +36,7 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::SendableRecordBatchStream; +use crate::metrics::SpillMetrics; use crate::stream::RecordBatchReceiverStream; /// Read spilled batches from the disk @@ -207,14 +210,17 @@ impl IPCStreamWriter { }) } - /// Write one single batch - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + /// Writes a single batch to the IPC stream and updates the internal counters. + /// + /// Returns a tuple containing the change in the number of rows and bytes written. + pub fn write(&mut self, batch: &RecordBatch) -> Result<(usize, usize)> { self.writer.write(batch)?; self.num_batches += 1; - self.num_rows += batch.num_rows(); - let num_bytes: usize = batch.get_array_memory_size(); - self.num_bytes += num_bytes; - Ok(()) + let delta_num_rows = batch.num_rows(); + self.num_rows += delta_num_rows; + let delta_num_bytes: usize = batch.get_array_memory_size(); + self.num_bytes += delta_num_bytes; + Ok((delta_num_rows, delta_num_bytes)) } /// Finish the writer @@ -223,25 +229,187 @@ impl IPCStreamWriter { } } +/// The `SpillManager` is responsible for the following tasks: +/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. +/// - Updating the associated metrics. +/// +/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files. +/// For example, all records within the same spill file are ordered according to a specific order. +#[derive(Debug, Clone)] +pub(crate) struct SpillManager { + env: Arc, + metrics: SpillMetrics, + schema: SchemaRef, + /// Number of batches to buffer in memory during disk reads + batch_read_buffer_capacity: usize, + // TODO: Add general-purpose compression options +} + +impl SpillManager { + pub fn new(env: Arc, metrics: SpillMetrics, schema: SchemaRef) -> Self { + Self { + env, + metrics, + schema, + batch_read_buffer_capacity: 2, + } + } + + /// Creates a temporary file for in-progress operations, returning an error + /// message if file creation fails. The file can be used to append batches + /// incrementally and then finish the file when done. + pub fn create_in_progress_file( + &self, + request_msg: &str, + ) -> Result { + let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?; + Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file)) + } + + /// Spill input `batches` into a single file in a atomic operation. If it is + /// intended to incrementally write in-memory batches into the same spill file, + /// use [`Self::create_in_progress_file`] instead. + /// None is returned if no batches are spilled. + #[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager + pub fn spill_record_batch_and_finish( + &self, + batches: &[RecordBatch], + request_msg: &str, + ) -> Result> { + let mut in_progress_file = self.create_in_progress_file(request_msg)?; + + for batch in batches { + in_progress_file.append_batch(batch)?; + } + + in_progress_file.finish() + } + + /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method + /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. + #[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager + pub fn spill_record_batch_by_size( + &self, + batch: &RecordBatch, + request_description: &str, + row_limit: usize, + ) -> Result> { + let total_rows = batch.num_rows(); + let mut batches = Vec::new(); + let mut offset = 0; + + // It's ok to calculate all slices first, because slicing is zero-copy. + while offset < total_rows { + let length = std::cmp::min(total_rows - offset, row_limit); + let sliced_batch = batch.slice(offset, length); + batches.push(sliced_batch); + offset += length; + } + + // Spill the sliced batches to disk + self.spill_record_batch_and_finish(&batches, request_description) + } + + /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. + /// This method will generate output in FIFO order: the batch appended first + /// will be read first. + pub fn read_spill_as_stream( + &self, + spill_file_path: RefCountedTempFile, + ) -> Result { + let mut builder = RecordBatchReceiverStream::builder( + Arc::clone(&self.schema), + self.batch_read_buffer_capacity, + ); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, spill_file_path.path())); + + Ok(builder.build()) + } +} + +/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. +/// Caller is able to use this struct to incrementally append in-memory batches to +/// the file, and then finalize the file by calling the `finish` method. +pub(crate) struct InProgressSpillFile { + spill_writer: Arc, + /// Lazily initialized writer + writer: Option, + /// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked + in_progress_file: Option, +} + +impl InProgressSpillFile { + pub fn new( + spill_writer: Arc, + in_progress_file: RefCountedTempFile, + ) -> Self { + Self { + spill_writer, + in_progress_file: Some(in_progress_file), + writer: None, + } + } + + /// Appends a `RecordBatch` to the file, initializing the writer if necessary. + pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if self.in_progress_file.is_none() { + return Err(exec_datafusion_err!( + "Append operation failed: No active in-progress file. The file may have already been finalized." + )); + } + if self.writer.is_none() { + let schema = batch.schema(); + if let Some(ref in_progress_file) = self.in_progress_file { + self.writer = Some(IPCStreamWriter::new( + in_progress_file.path(), + schema.as_ref(), + )?); + + // Update metrics + self.spill_writer.metrics.spill_file_count.add(1); + } + } + if let Some(writer) = &mut self.writer { + let (spilled_rows, spilled_bytes) = writer.write(batch)?; + + // Update metrics + self.spill_writer.metrics.spilled_bytes.add(spilled_bytes); + self.spill_writer.metrics.spilled_rows.add(spilled_rows); + } + Ok(()) + } + + /// Finalizes the file, returning the completed file reference. + /// If there are no batches spilled before, it returns `None`. + pub fn finish(&mut self) -> Result> { + if let Some(writer) = &mut self.writer { + writer.finish()?; + } else { + return Ok(None); + } + + Ok(self.in_progress_file.take()) + } +} + #[cfg(test)] mod tests { use super::*; - use crate::spill::{spill_record_batch_by_size, spill_record_batches}; + use crate::common::collect; + use crate::metrics::ExecutionPlanMetricsSet; use crate::test::build_table_i32; - use arrow::array::{Float64Array, Int32Array, ListArray}; + use arrow::array::{Float64Array, Int32Array, ListArray, StringArray}; use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; - use datafusion_execution::disk_manager::DiskManagerConfig; - use datafusion_execution::DiskManager; - use itertools::Itertools; - use std::fs::File; - use std::io::BufReader; + use std::sync::Arc; - #[test] - fn test_batch_spill_and_read() -> Result<()> { + #[tokio::test] + async fn test_batch_spill_and_read() -> Result<()> { let batch1 = build_table_i32( ("a2", &vec![0, 1, 2]), ("b2", &vec![3, 4, 5]), @@ -254,31 +422,32 @@ mod tests { ("c2", &vec![14, 15, 16]), ); - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - - let spill_file = disk_manager.create_tmp_file("Test Spill")?; let schema = batch1.schema(); let num_rows = batch1.num_rows() + batch2.num_rows(); - let (spilled_rows, _) = spill_record_batches( - &[batch1, batch2], - spill_file.path().into(), - Arc::clone(&schema), - )?; - assert_eq!(spilled_rows, num_rows); - let file = BufReader::new(File::open(spill_file.path())?); - let reader = StreamReader::try_new(file, None)?; + // Construct SpillManager + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); - assert_eq!(reader.schema(), schema); + let spill_file = spill_manager + .spill_record_batch_and_finish(&[batch1, batch2], "Test")? + .unwrap(); + assert!(spill_file.path().exists()); + let spilled_rows = spill_manager.metrics.spilled_rows.value(); + assert_eq!(spilled_rows, num_rows); + + let stream = spill_manager.read_spill_as_stream(spill_file)?; + assert_eq!(stream.schema(), schema); - let batches = reader.collect_vec(); - assert!(batches.len() == 2); + let batches = collect(stream).await?; + assert_eq!(batches.len(), 2); Ok(()) } - #[test] - fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> { + #[tokio::test] + async fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> { // See https://github.com/apache/datafusion/issues/4658 let batch1 = build_table_i32( @@ -320,54 +489,49 @@ mod tests { .collect::>()?, )?; - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + // Construct SpillManager + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&dict_schema)); - let spill_file = disk_manager.create_tmp_file("Test Spill")?; let num_rows = batch1.num_rows() + batch2.num_rows(); - let (spilled_rows, _) = spill_record_batches( - &[batch1, batch2], - spill_file.path().into(), - Arc::clone(&dict_schema), - )?; + let spill_file = spill_manager + .spill_record_batch_and_finish(&[batch1, batch2], "Test")? + .unwrap(); + let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let file = BufReader::new(File::open(spill_file.path())?); - let reader = StreamReader::try_new(file, None)?; - - assert_eq!(reader.schema(), dict_schema); - - let batches = reader.collect_vec(); - assert!(batches.len() == 2); + let stream = spill_manager.read_spill_as_stream(spill_file)?; + assert_eq!(stream.schema(), dict_schema); + let batches = collect(stream).await?; + assert_eq!(batches.len(), 2); Ok(()) } - #[test] - fn test_batch_spill_by_size() -> Result<()> { + #[tokio::test] + async fn test_batch_spill_by_size() -> Result<()> { let batch1 = build_table_i32( ("a2", &vec![0, 1, 2, 3]), ("b2", &vec![3, 4, 5, 6]), ("c2", &vec![4, 5, 6, 7]), ); - let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - - let spill_file = disk_manager.create_tmp_file("Test Spill")?; let schema = batch1.schema(); - spill_record_batch_by_size( - &batch1, - spill_file.path().into(), - Arc::clone(&schema), - 1, - )?; + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); - let file = BufReader::new(File::open(spill_file.path())?); - let reader = StreamReader::try_new(file, None)?; + let spill_file = spill_manager + .spill_record_batch_by_size(&batch1, "Test Spill", 1)? + .unwrap(); + assert!(spill_file.path().exists()); - assert_eq!(reader.schema(), schema); + let stream = spill_manager.read_spill_as_stream(spill_file)?; + assert_eq!(stream.schema(), schema); - let batches = reader.collect_vec(); - assert!(batches.len() == 4); + let batches = collect(stream).await?; + assert_eq!(batches.len(), 4); Ok(()) } @@ -498,4 +662,130 @@ mod tests { let size = get_record_batch_memory_size(&batch); assert_eq!(size, 8320); } + + // ==== Spill manager tests ==== + + #[test] + fn test_spill_manager_spill_record_batch_and_finish() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + let temp_file = spill_manager.spill_record_batch_and_finish(&[batch], "Test")?; + assert!(temp_file.is_some()); + assert!(temp_file.unwrap().path().exists()); + Ok(()) + } + + fn verify_metrics( + in_progress_file: &InProgressSpillFile, + expected_spill_file_count: usize, + expected_spilled_bytes: usize, + expected_spilled_rows: usize, + ) -> Result<()> { + let actual_spill_file_count = in_progress_file + .spill_writer + .metrics + .spill_file_count + .value(); + let actual_spilled_bytes = + in_progress_file.spill_writer.metrics.spilled_bytes.value(); + let actual_spilled_rows = + in_progress_file.spill_writer.metrics.spilled_rows.value(); + + assert_eq!( + actual_spill_file_count, expected_spill_file_count, + "Spill file count mismatch" + ); + assert_eq!( + actual_spilled_bytes, expected_spilled_bytes, + "Spilled bytes mismatch" + ); + assert_eq!( + actual_spilled_rows, expected_spilled_rows, + "Spilled rows mismatch" + ); + + Ok(()) + } + + #[test] + fn test_in_progress_spill_file_append_and_finish() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + let spill_manager = + Arc::new(SpillManager::new(env, metrics, Arc::clone(&schema))); + let mut in_progress_file = spill_manager.create_in_progress_file("Test")?; + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(StringArray::from(vec!["d", "e", "f"])), + ], + )?; + + in_progress_file.append_batch(&batch1)?; + verify_metrics(&in_progress_file, 1, 356, 3)?; + + in_progress_file.append_batch(&batch2)?; + verify_metrics(&in_progress_file, 1, 712, 6)?; + + let completed_file = in_progress_file.finish()?; + assert!(completed_file.is_some()); + assert!(completed_file.unwrap().path().exists()); + verify_metrics(&in_progress_file, 1, 712, 6)?; + // Double finish produce error + let result = in_progress_file.finish(); + assert!(result.is_err()); + + Ok(()) + } + + // Test write no batches + #[test] + fn test_in_progress_spill_file_write_no_batches() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + let spill_manager = + Arc::new(SpillManager::new(env, metrics, Arc::clone(&schema))); + let mut in_progress_file = spill_manager.create_in_progress_file("Test")?; + + // Attempt to finish without appending any batches + let completed_file = in_progress_file.finish()?; + assert!(completed_file.is_none()); + + Ok(()) + } }