Skip to content

Commit 0c2aa0c

Browse files
authored
fix: Redundant files spilled during external sort + introduce SpillManager (#15355)
* implement SpillManager * more comments
1 parent 45ed5aa commit 0c2aa0c

File tree

4 files changed

+463
-108
lines changed

4 files changed

+463
-108
lines changed

datafusion/physical-plan/src/metrics/baseline.rs

+25
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,31 @@ impl Drop for BaselineMetrics {
143143
}
144144
}
145145

146+
/// Helper for creating and tracking spill-related metrics for
147+
/// each operator
148+
#[derive(Debug, Clone)]
149+
pub struct SpillMetrics {
150+
/// count of spills during the execution of the operator
151+
pub spill_file_count: Count,
152+
153+
/// total spilled bytes during the execution of the operator
154+
pub spilled_bytes: Count,
155+
156+
/// total spilled rows during the execution of the operator
157+
pub spilled_rows: Count,
158+
}
159+
160+
impl SpillMetrics {
161+
/// Create a new SpillMetrics structure
162+
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
163+
Self {
164+
spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
165+
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
166+
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
167+
}
168+
}
169+
}
170+
146171
/// Trait for things that produce output rows as a result of execution.
147172
pub trait RecordOutput {
148173
/// Record that some number of output rows have been produced

datafusion/physical-plan/src/metrics/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use std::{
3131
use datafusion_common::HashMap;
3232

3333
// public exports
34-
pub use baseline::{BaselineMetrics, RecordOutput};
34+
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics};
3535
pub use builder::MetricBuilder;
3636
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
3737

datafusion/physical-plan/src/sorts/sort.rs

+86-46
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2929
use crate::expressions::PhysicalSortExpr;
3030
use crate::limit::LimitStream;
3131
use crate::metrics::{
32-
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
32+
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
3333
};
3434
use crate::projection::{make_with_child, update_expr, ProjectionExec};
3535
use crate::sorts::streaming_merge::StreamingMergeBuilder;
36-
use crate::spill::{
37-
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
38-
};
36+
use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager};
3937
use crate::stream::RecordBatchStreamAdapter;
4038
use crate::topk::TopK;
4139
use crate::{
@@ -50,7 +48,7 @@ use arrow::array::{
5048
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
5149
use arrow::datatypes::{DataType, SchemaRef};
5250
use arrow::row::{RowConverter, SortField};
53-
use datafusion_common::{internal_err, Result};
51+
use datafusion_common::{internal_datafusion_err, internal_err, Result};
5452
use datafusion_execution::disk_manager::RefCountedTempFile;
5553
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
5654
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -65,23 +63,14 @@ struct ExternalSorterMetrics {
6563
/// metrics
6664
baseline: BaselineMetrics,
6765

68-
/// count of spills during the execution of the operator
69-
spill_count: Count,
70-
71-
/// total spilled bytes during the execution of the operator
72-
spilled_bytes: Count,
73-
74-
/// total spilled rows during the execution of the operator
75-
spilled_rows: Count,
66+
spill_metrics: SpillMetrics,
7667
}
7768

7869
impl ExternalSorterMetrics {
7970
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
8071
Self {
8172
baseline: BaselineMetrics::new(metrics, partition),
82-
spill_count: MetricBuilder::new(metrics).spill_count(partition),
83-
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
84-
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
73+
spill_metrics: SpillMetrics::new(metrics, partition),
8574
}
8675
}
8776
}
@@ -230,9 +219,14 @@ struct ExternalSorter {
230219
/// if `Self::in_mem_batches` are sorted
231220
in_mem_batches_sorted: bool,
232221

233-
/// If data has previously been spilled, the locations of the
234-
/// spill files (in Arrow IPC format)
235-
spills: Vec<RefCountedTempFile>,
222+
/// During external sorting, in-memory intermediate data will be appended to
223+
/// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
224+
in_progress_spill_file: Option<InProgressSpillFile>,
225+
/// If data has previously been spilled, the locations of the spill files (in
226+
/// Arrow IPC format)
227+
/// Within the same spill file, the data might be chunked into multiple batches,
228+
/// and ordered by sort keys.
229+
finished_spill_files: Vec<RefCountedTempFile>,
236230

237231
// ========================================================================
238232
// EXECUTION RESOURCES:
@@ -244,6 +238,7 @@ struct ExternalSorter {
244238
runtime: Arc<RuntimeEnv>,
245239
/// Reservation for in_mem_batches
246240
reservation: MemoryReservation,
241+
spill_manager: SpillManager,
247242

248243
/// Reservation for the merging of in-memory batches. If the sort
249244
/// might spill, `sort_spill_reservation_bytes` will be
@@ -278,15 +273,23 @@ impl ExternalSorter {
278273
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
279274
.register(&runtime.memory_pool);
280275

276+
let spill_manager = SpillManager::new(
277+
Arc::clone(&runtime),
278+
metrics.spill_metrics.clone(),
279+
Arc::clone(&schema),
280+
);
281+
281282
Self {
282283
schema,
283284
in_mem_batches: vec![],
284285
in_mem_batches_sorted: false,
285-
spills: vec![],
286+
in_progress_spill_file: None,
287+
finished_spill_files: vec![],
286288
expr: expr.into(),
287289
metrics,
288290
fetch,
289291
reservation,
292+
spill_manager,
290293
merge_reservation,
291294
runtime,
292295
batch_size,
@@ -320,7 +323,7 @@ impl ExternalSorter {
320323
}
321324

322325
fn spilled_before(&self) -> bool {
323-
!self.spills.is_empty()
326+
!self.finished_spill_files.is_empty()
324327
}
325328

326329
/// Returns the final sorted output of all batches inserted via
@@ -348,11 +351,11 @@ impl ExternalSorter {
348351
self.sort_or_spill_in_mem_batches(true).await?;
349352
}
350353

351-
for spill in self.spills.drain(..) {
354+
for spill in self.finished_spill_files.drain(..) {
352355
if !spill.path().exists() {
353356
return internal_err!("Spill file {:?} does not exist", spill.path());
354357
}
355-
let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?;
358+
let stream = self.spill_manager.read_spill_as_stream(spill)?;
356359
streams.push(stream);
357360
}
358361

@@ -379,46 +382,69 @@ impl ExternalSorter {
379382

380383
/// How many bytes have been spilled to disk?
381384
fn spilled_bytes(&self) -> usize {
382-
self.metrics.spilled_bytes.value()
385+
self.metrics.spill_metrics.spilled_bytes.value()
383386
}
384387

385388
/// How many rows have been spilled to disk?
386389
fn spilled_rows(&self) -> usize {
387-
self.metrics.spilled_rows.value()
390+
self.metrics.spill_metrics.spilled_rows.value()
388391
}
389392

390393
/// How many spill files have been created?
391394
fn spill_count(&self) -> usize {
392-
self.metrics.spill_count.value()
395+
self.metrics.spill_metrics.spill_file_count.value()
393396
}
394397

395-
/// Writes any `in_memory_batches` to a spill file and clears
396-
/// the batches. The contents of the spill file are sorted.
398+
/// When calling, all `in_mem_batches` must be sorted (*), and then all of them will
399+
/// be appended to the in-progress spill file.
397400
///
398-
/// Returns the amount of memory freed.
399-
async fn spill(&mut self) -> Result<usize> {
401+
/// (*) 'Sorted' here means globally sorted for all buffered batches when the
402+
/// memory limit is reached, instead of partially sorted within the batch.
403+
async fn spill_append(&mut self) -> Result<()> {
404+
assert!(self.in_mem_batches_sorted);
405+
400406
// we could always get a chance to free some memory as long as we are holding some
401407
if self.in_mem_batches.is_empty() {
402-
return Ok(0);
408+
return Ok(());
409+
}
410+
411+
// Lazily initialize the in-progress spill file
412+
if self.in_progress_spill_file.is_none() {
413+
self.in_progress_spill_file =
414+
Some(self.spill_manager.create_in_progress_file("Sorting")?);
403415
}
404416

405417
self.organize_stringview_arrays()?;
406418

407419
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
408420

409-
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
410421
let batches = std::mem::take(&mut self.in_mem_batches);
411-
let (spilled_rows, spilled_bytes) = spill_record_batches(
412-
&batches,
413-
spill_file.path().into(),
414-
Arc::clone(&self.schema),
415-
)?;
416-
let used = self.reservation.free();
417-
self.metrics.spill_count.add(1);
418-
self.metrics.spilled_bytes.add(spilled_bytes);
419-
self.metrics.spilled_rows.add(spilled_rows);
420-
self.spills.push(spill_file);
421-
Ok(used)
422+
self.reservation.free();
423+
424+
let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| {
425+
internal_datafusion_err!("In-progress spill file should be initialized")
426+
})?;
427+
428+
for batch in batches {
429+
in_progress_file.append_batch(&batch)?;
430+
}
431+
432+
Ok(())
433+
}
434+
435+
/// Finishes the in-progress spill file and moves it to the finished spill files.
436+
async fn spill_finish(&mut self) -> Result<()> {
437+
let mut in_progress_file =
438+
self.in_progress_spill_file.take().ok_or_else(|| {
439+
internal_datafusion_err!("Should be called after `spill_append`")
440+
})?;
441+
let spill_file = in_progress_file.finish()?;
442+
443+
if let Some(spill_file) = spill_file {
444+
self.finished_spill_files.push(spill_file);
445+
}
446+
447+
Ok(())
422448
}
423449

424450
/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
@@ -515,6 +541,7 @@ impl ExternalSorter {
515541
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
516542
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
517543
// write sorted batches to disk when the memory is insufficient.
544+
let mut spilled = false;
518545
while let Some(batch) = sorted_stream.next().await {
519546
let batch = batch?;
520547
let sorted_size = get_reserved_byte_for_record_batch(&batch);
@@ -523,7 +550,8 @@ impl ExternalSorter {
523550
// already in memory, so it's okay to combine it with previously
524551
// sorted batches, and spill together.
525552
self.in_mem_batches.push(batch);
526-
self.spill().await?; // reservation is freed in spill()
553+
self.spill_append().await?; // reservation is freed in spill()
554+
spilled = true;
527555
} else {
528556
self.in_mem_batches.push(batch);
529557
self.in_mem_batches_sorted = true;
@@ -540,7 +568,12 @@ impl ExternalSorter {
540568
if (self.reservation.size() > before / 2) || force_spill {
541569
// We have not freed more than 50% of the memory, so we have to spill to
542570
// free up more memory
543-
self.spill().await?;
571+
self.spill_append().await?;
572+
spilled = true;
573+
}
574+
575+
if spilled {
576+
self.spill_finish().await?;
544577
}
545578

546579
// Reserve headroom for next sort/merge
@@ -1489,7 +1522,14 @@ mod tests {
14891522
// batches.
14901523
// The number of spills is roughly calculated as:
14911524
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1492-
assert!((12..=18).contains(&spill_count));
1525+
1526+
// If this assertion fail with large spill count, make sure the following
1527+
// case does not happen:
1528+
// During external sorting, one sorted run should be spilled to disk in a
1529+
// single file, due to memory limit we might need to append to the file
1530+
// multiple times to spill all the data. Make sure we're not writing each
1531+
// appending as a separate file.
1532+
assert!((4..=8).contains(&spill_count));
14931533
assert!((15000..=20000).contains(&spilled_rows));
14941534
assert!((900000..=1000000).contains(&spilled_bytes));
14951535

0 commit comments

Comments
 (0)