Skip to content

fix: Redundant files spilled during external sort + introduce SpillManager #15355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions datafusion/physical-plan/src/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,31 @@ impl Drop for BaselineMetrics {
}
}

/// Helper for creating and tracking spill-related metrics for
/// each operator
#[derive(Debug, Clone)]
pub struct SpillMetrics {
/// count of spills during the execution of the operator
pub spill_file_count: Count,

/// total spilled bytes during the execution of the operator
pub spilled_bytes: Count,

/// total spilled rows during the execution of the operator
pub spilled_rows: Count,
}

impl SpillMetrics {
/// Create a new SpillMetrics structure
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
}
}
}

/// Trait for things that produce output rows as a result of execution.
pub trait RecordOutput {
/// Record that some number of output rows have been produced
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::{
use datafusion_common::HashMap;

// public exports
pub use baseline::{BaselineMetrics, RecordOutput};
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics};
pub use builder::MetricBuilder;
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};

Expand Down
132 changes: 86 additions & 46 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
};
use crate::projection::{make_with_child, update_expr, ProjectionExec};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
};
use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
Expand All @@ -50,7 +48,7 @@ use arrow::array::{
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::{internal_err, Result};
use datafusion_common::{internal_datafusion_err, internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
Expand All @@ -65,23 +63,14 @@ struct ExternalSorterMetrics {
/// metrics
baseline: BaselineMetrics,

/// count of spills during the execution of the operator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

spill_count: Count,

/// total spilled bytes during the execution of the operator
spilled_bytes: Count,

/// total spilled rows during the execution of the operator
spilled_rows: Count,
spill_metrics: SpillMetrics,
}

impl ExternalSorterMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
spill_metrics: SpillMetrics::new(metrics, partition),
}
}
}
Expand Down Expand Up @@ -230,9 +219,14 @@ struct ExternalSorter {
/// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,

/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
spills: Vec<RefCountedTempFile>,
/// During external sorting, in-memory intermediate data will be appended to
/// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
in_progress_spill_file: Option<InProgressSpillFile>,
/// If data has previously been spilled, the locations of the spill files (in
/// Arrow IPC format)
/// Within the same spill file, the data might be chunked into multiple batches,
/// and ordered by sort keys.
finished_spill_files: Vec<RefCountedTempFile>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to have the SpillManager own these files so there can't be different sets of references

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be hard to define the semantics of those temp files if we put them inside SpillManager, because different operators will interpret those files differently:

  • For SortExec, vec<RefCountedTempFile> is representing multiple sorted runs on sort keys.
  • For ShuffleWriterExec in datafusion-comet, since Spark's shuffle operator is blocking (due to spark's staged execution design), it might want to keep vec<InProgresSpillFile> instead.
  • Similarly, if we want to spill Rows to accelerate SortExec, or we want to implement spilling hash join, the temp files will have very different logical meanings.

Overall, the SpillManager is designed only to do RecordBatch <-> raw file with different configurations and stat accounting. Operators have more flexibility to implement specific utilities for managing raw files, which have diverse semantics.

Do you see any potential issues or improvements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different semantics for different operations makes sense to me

I was thinking more mechnically, like just storing the Vecas a field onSortManager` and allowing Sort and Hash, etc to access / manipulate it as required. I think it is fine to consider this in a future PR as well


// ========================================================================
// EXECUTION RESOURCES:
Expand All @@ -244,6 +238,7 @@ struct ExternalSorter {
runtime: Arc<RuntimeEnv>,
/// Reservation for in_mem_batches
reservation: MemoryReservation,
spill_manager: SpillManager,

/// Reservation for the merging of in-memory batches. If the sort
/// might spill, `sort_spill_reservation_bytes` will be
Expand Down Expand Up @@ -278,15 +273,23 @@ impl ExternalSorter {
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);

let spill_manager = SpillManager::new(
Arc::clone(&runtime),
metrics.spill_metrics.clone(),
Arc::clone(&schema),
);

Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: false,
spills: vec![],
in_progress_spill_file: None,
finished_spill_files: vec![],
expr: expr.into(),
metrics,
fetch,
reservation,
spill_manager,
merge_reservation,
runtime,
batch_size,
Expand Down Expand Up @@ -320,7 +323,7 @@ impl ExternalSorter {
}

fn spilled_before(&self) -> bool {
!self.spills.is_empty()
!self.finished_spill_files.is_empty()
}

/// Returns the final sorted output of all batches inserted via
Expand Down Expand Up @@ -348,11 +351,11 @@ impl ExternalSorter {
self.sort_or_spill_in_mem_batches(true).await?;
}

for spill in self.spills.drain(..) {
for spill in self.finished_spill_files.drain(..) {
if !spill.path().exists() {
return internal_err!("Spill file {:?} does not exist", spill.path());
}
let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?;
let stream = self.spill_manager.read_spill_as_stream(spill)?;
streams.push(stream);
}

Expand All @@ -379,46 +382,69 @@ impl ExternalSorter {

/// How many bytes have been spilled to disk?
fn spilled_bytes(&self) -> usize {
self.metrics.spilled_bytes.value()
self.metrics.spill_metrics.spilled_bytes.value()
}

/// How many rows have been spilled to disk?
fn spilled_rows(&self) -> usize {
self.metrics.spilled_rows.value()
self.metrics.spill_metrics.spilled_rows.value()
}

/// How many spill files have been created?
fn spill_count(&self) -> usize {
self.metrics.spill_count.value()
self.metrics.spill_metrics.spill_file_count.value()
}

/// Writes any `in_memory_batches` to a spill file and clears
/// the batches. The contents of the spill file are sorted.
/// When calling, all `in_mem_batches` must be sorted (*), and then all of them will
/// be appended to the in-progress spill file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they must all be sorted, then maybe you can put an assert/check that self.in_mem_batches_sorted is true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in bf4ab62

///
/// Returns the amount of memory freed.
async fn spill(&mut self) -> Result<usize> {
/// (*) 'Sorted' here means globally sorted for all buffered batches when the
/// memory limit is reached, instead of partially sorted within the batch.
async fn spill_append(&mut self) -> Result<()> {
assert!(self.in_mem_batches_sorted);

// we could always get a chance to free some memory as long as we are holding some
if self.in_mem_batches.is_empty() {
return Ok(0);
return Ok(());
}

// Lazily initialize the in-progress spill file
if self.in_progress_spill_file.is_none() {
self.in_progress_spill_file =
Some(self.spill_manager.create_in_progress_file("Sorting")?);
}

self.organize_stringview_arrays()?;

debug!("Spilling sort data of ExternalSorter to disk whilst inserting");

let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
let (spilled_rows, spilled_bytes) = spill_record_batches(
&batches,
spill_file.path().into(),
Arc::clone(&self.schema),
)?;
let used = self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(spilled_bytes);
self.metrics.spilled_rows.add(spilled_rows);
self.spills.push(spill_file);
Ok(used)
self.reservation.free();

let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| {
internal_datafusion_err!("In-progress spill file should be initialized")
})?;

for batch in batches {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this logic -- i thought that each individual self.in_mem_batches was sorted but they aren't sorted overall

Thus if we write write them back to back to the same spill file, the spill file itself won't be sorted

Like if the two in memory batches are

A B
1 10
2 10
2 10
A B
1 10
2 10
2 10

I think this code would produce a single spill file like

A B
1 10
2 10
2 10
1 10
2 10
2 10

Which is not sorted 🤔

On the other hand all the tests are passing so maybe I misunderstand what this is doing (or we have a testing gap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are globally sorted. In different stages, in_mem_batches can either represent unordered input, or globally sorted run (but chunked into smaller batches)
I agree this approach has poor understandability and is error-prone, I'll try to improve it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks -- maybe for this PR we could just add some comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #15372

in_progress_file.append_batch(&batch)?;
}

Ok(())
}

/// Finishes the in-progress spill file and moves it to the finished spill files.
async fn spill_finish(&mut self) -> Result<()> {
let mut in_progress_file =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am finding the various states of the ExternalSorter hard to track (specifically what are the valid combinations of in_mem_batches, in_progress_spill_file, spill, and sorted_in_mem

I wonder if we could move to some sort of state enum that would make this easier to understand

Like

struct SortState
  AllInMemory {...}
  InProgressSpill { ... }
  AllOnDisk {...}
...
}

Copy link
Contributor Author

@2010YOUY01 2010YOUY01 Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #15372

self.in_progress_spill_file.take().ok_or_else(|| {
internal_datafusion_err!("Should be called after `spill_append`")
})?;
let spill_file = in_progress_file.finish()?;

if let Some(spill_file) = spill_file {
self.finished_spill_files.push(spill_file);
}

Ok(())
}

/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
Expand Down Expand Up @@ -515,6 +541,7 @@ impl ExternalSorter {
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spilled = false;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
let sorted_size = get_reserved_byte_for_record_batch(&batch);
Expand All @@ -523,7 +550,8 @@ impl ExternalSorter {
// already in memory, so it's okay to combine it with previously
// sorted batches, and spill together.
self.in_mem_batches.push(batch);
self.spill().await?; // reservation is freed in spill()
self.spill_append().await?; // reservation is freed in spill()
spilled = true;
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
Expand All @@ -540,7 +568,12 @@ impl ExternalSorter {
if (self.reservation.size() > before / 2) || force_spill {
// We have not freed more than 50% of the memory, so we have to spill to
// free up more memory
self.spill().await?;
self.spill_append().await?;
spilled = true;
}

if spilled {
self.spill_finish().await?;
}

// Reserve headroom for next sort/merge
Expand Down Expand Up @@ -1489,7 +1522,14 @@ mod tests {
// batches.
// The number of spills is roughly calculated as:
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
assert!((12..=18).contains(&spill_count));

// If this assertion fail with large spill count, make sure the following
// case does not happen:
// During external sorting, one sorted run should be spilled to disk in a
// single file, due to memory limit we might need to append to the file
// multiple times to spill all the data. Make sure we're not writing each
// appending as a separate file.
assert!((4..=8).contains(&spill_count));
assert!((15000..=20000).contains(&spilled_rows));
assert!((900000..=1000000).contains(&spilled_bytes));

Expand Down
Loading