Skip to content

fix: Assertion fail in external sort #15469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use test_utils::AccessLogGenerator;
Expand Down Expand Up @@ -493,6 +494,36 @@ async fn test_in_mem_buffer_almost_full() {
let _ = df.collect().await.unwrap();
}

/// External sort should be able to run if there is very little pre-reserved memory
/// for merge (set configuration sort_spill_reservation_bytes to 0).
#[tokio::test]
async fn test_external_sort_zero_merge_reservation() {
let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(0)
.with_target_partitions(14);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(config, runtime);

let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;";
let df = ctx.sql(query).await.unwrap();

let physical_plan = df.create_physical_plan().await.unwrap();
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = physical_plan.execute(0, task_ctx).unwrap();

// Ensures execution succeed
let _result = collect(stream).await;

// Ensures the query spilled during execution
let metrics = physical_plan.metrics().unwrap();
let spill_count = metrics.spill_count().unwrap();
assert!(spill_count > 0);
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
97 changes: 58 additions & 39 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,8 @@ struct ExternalSorter {
// STATE BUFFERS:
// Fields that hold intermediate data during sorting
// ========================================================================
/// Potentially unsorted in memory buffer
/// Unsorted input batches stored in the memory buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this field is an improvement 👍 thanks @2010YOUY01 and @comphead

Given there are still three fields whose relationship is tied, I wonder if it would improve the code if we encoded that relationship in an actual rust enum -- for example

enum SortState {
  /// All data is in memory
  Memory {
    batches: Vec<RecordBatch>,
  }, 
  /// intermediate data is spilling to disk
  Spilling {
    batches: Vec<RecordBatch>,
    in_progress_spill_file: InProgressSpillFile,
  }
...
}

🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 will try it.

in_mem_batches: Vec<RecordBatch>,
/// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,

/// During external sorting, in-memory intermediate data will be appended to
/// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
Expand Down Expand Up @@ -284,7 +282,6 @@ impl ExternalSorter {
Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: false,
in_progress_spill_file: None,
finished_spill_files: vec![],
expr: expr.into(),
Expand Down Expand Up @@ -320,7 +317,6 @@ impl ExternalSorter {
}

self.in_mem_batches.push(input);
self.in_mem_batches_sorted = false;
Ok(())
}

Expand Down Expand Up @@ -397,16 +393,13 @@ impl ExternalSorter {
self.metrics.spill_metrics.spill_file_count.value()
}

/// When calling, all `in_mem_batches` must be sorted (*), and then all of them will
/// be appended to the in-progress spill file.
///
/// (*) 'Sorted' here means globally sorted for all buffered batches when the
/// memory limit is reached, instead of partially sorted within the batch.
async fn spill_append(&mut self) -> Result<()> {
assert!(self.in_mem_batches_sorted);

// we could always get a chance to free some memory as long as we are holding some
if self.in_mem_batches.is_empty() {
/// Appending globally sorted batches to the in-progress spill file, and clears
/// the `globally_sorted_batches` (also its memory reservation) afterwards.
async fn consume_and_spill_append(
&mut self,
globally_sorted_batches: &mut Vec<RecordBatch>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the name here -- much better

) -> Result<()> {
if globally_sorted_batches.is_empty() {
return Ok(());
}

Expand All @@ -416,21 +409,25 @@ impl ExternalSorter {
Some(self.spill_manager.create_in_progress_file("Sorting")?);
}

self.organize_stringview_arrays()?;
Self::organize_stringview_arrays(globally_sorted_batches)?;

debug!("Spilling sort data of ExternalSorter to disk whilst inserting");

let batches = std::mem::take(&mut self.in_mem_batches);
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();

let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| {
internal_datafusion_err!("In-progress spill file should be initialized")
})?;

for batch in batches {
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
}

if !globally_sorted_batches.is_empty() {
return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
}

Ok(())
}

Expand All @@ -449,7 +446,7 @@ impl ExternalSorter {
Ok(())
}

/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
/// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
/// `StringViewArray` in sequential order by calling `gc()` on them.
///
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
Expand Down Expand Up @@ -478,10 +475,12 @@ impl ExternalSorter {
///
/// Then when spilling each batch, the writer has to write all referenced buffers
/// repeatedly.
fn organize_stringview_arrays(&mut self) -> Result<()> {
let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len());
fn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());

for batch in self.in_mem_batches.drain(..) {
for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());

Expand All @@ -507,20 +506,17 @@ impl ExternalSorter {
organized_batches.push(organized_batch);
}

self.in_mem_batches = organized_batches;
*globally_sorted_batches = organized_batches;

Ok(())
}

/// Sorts the in_mem_batches in place
/// Sorts the in_mem_batches and potentially spill the sorted batches.
///
/// Sorting may have freed memory, especially if fetch is `Some`. If
/// the memory usage has dropped by a factor of 2, then we don't have
/// to spill. Otherwise, we spill to free up memory for inserting
/// more batches.
/// The factor of 2 aims to avoid a degenerate case where the
/// memory required for `fetch` is just under the memory available,
/// causing repeated re-sorting of data
/// If the memory usage has dropped by a factor of 2, it might be a sort with
/// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
/// sorted entries inside `in_mem_batches` to be sorted in the next iteration.
/// Otherwise, we spill the sorted run to free up memory for inserting more batches.
///
/// # Arguments
///
Expand All @@ -539,10 +535,18 @@ impl ExternalSorter {

let mut sorted_stream =
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
// After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
// to construct a globally sorted stream.
if !self.in_mem_batches.is_empty() {
return internal_err!(
"in_mem_batches should be empty after constructing sorted stream"
);
}
// 'global' here refers to all buffered batches when the memory limit is
// reached. This variable will buffer the sorted batches after
// sort-preserving merge and incrementally append to spill files.
let mut globally_sorted_batches: Vec<RecordBatch> = vec![];

// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spilled = false;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
Expand All @@ -551,12 +555,12 @@ impl ExternalSorter {
// Although the reservation is not enough, the batch is
// already in memory, so it's okay to combine it with previously
// sorted batches, and spill together.
self.in_mem_batches.push(batch);
self.spill_append().await?; // reservation is freed in spill()
globally_sorted_batches.push(batch);
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?; // reservation is freed in spill()
spilled = true;
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
globally_sorted_batches.push(batch);
}
}

Expand All @@ -570,12 +574,27 @@ impl ExternalSorter {
if (self.reservation.size() > before / 2) || force_spill {
// We have not freed more than 50% of the memory, so we have to spill to
// free up more memory
self.spill_append().await?;
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?;
spilled = true;
}

if spilled {
// There might be some buffered batches that haven't trigger a spill yet.
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?;
self.spill_finish().await?;
} else {
// If the memory limit has reached before calling this function, and it
// didn't spill anything, it means this is a sorting with fetch top K
// element: after sorting only the top K elements will be kept in memory.
// For simplicity, those sorted top K entries are put back to unsorted
// `in_mem_batches` to be consumed by the next sort/merge.
if !self.in_mem_batches.is_empty() {
return internal_err!("in_mem_batches should be cleared before");
}

self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
}

// Reserve headroom for next sort/merge
Expand Down