-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: Redundant files spilled during external sort + introduce SpillManager
#15355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -29,13 +29,11 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; | |||||||||||||||||||||||||||||||
use crate::expressions::PhysicalSortExpr; | ||||||||||||||||||||||||||||||||
use crate::limit::LimitStream; | ||||||||||||||||||||||||||||||||
use crate::metrics::{ | ||||||||||||||||||||||||||||||||
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, | ||||||||||||||||||||||||||||||||
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
use crate::projection::{make_with_child, update_expr, ProjectionExec}; | ||||||||||||||||||||||||||||||||
use crate::sorts::streaming_merge::StreamingMergeBuilder; | ||||||||||||||||||||||||||||||||
use crate::spill::{ | ||||||||||||||||||||||||||||||||
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, | ||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||
use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager}; | ||||||||||||||||||||||||||||||||
use crate::stream::RecordBatchStreamAdapter; | ||||||||||||||||||||||||||||||||
use crate::topk::TopK; | ||||||||||||||||||||||||||||||||
use crate::{ | ||||||||||||||||||||||||||||||||
|
@@ -50,7 +48,7 @@ use arrow::array::{ | |||||||||||||||||||||||||||||||
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; | ||||||||||||||||||||||||||||||||
use arrow::datatypes::{DataType, SchemaRef}; | ||||||||||||||||||||||||||||||||
use arrow::row::{RowConverter, SortField}; | ||||||||||||||||||||||||||||||||
use datafusion_common::{internal_err, Result}; | ||||||||||||||||||||||||||||||||
use datafusion_common::{internal_datafusion_err, internal_err, Result}; | ||||||||||||||||||||||||||||||||
use datafusion_execution::disk_manager::RefCountedTempFile; | ||||||||||||||||||||||||||||||||
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; | ||||||||||||||||||||||||||||||||
use datafusion_execution::runtime_env::RuntimeEnv; | ||||||||||||||||||||||||||||||||
|
@@ -65,23 +63,14 @@ struct ExternalSorterMetrics { | |||||||||||||||||||||||||||||||
/// metrics | ||||||||||||||||||||||||||||||||
baseline: BaselineMetrics, | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// count of spills during the execution of the operator | ||||||||||||||||||||||||||||||||
spill_count: Count, | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// total spilled bytes during the execution of the operator | ||||||||||||||||||||||||||||||||
spilled_bytes: Count, | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// total spilled rows during the execution of the operator | ||||||||||||||||||||||||||||||||
spilled_rows: Count, | ||||||||||||||||||||||||||||||||
spill_metrics: SpillMetrics, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
impl ExternalSorterMetrics { | ||||||||||||||||||||||||||||||||
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { | ||||||||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||||||||
baseline: BaselineMetrics::new(metrics, partition), | ||||||||||||||||||||||||||||||||
spill_count: MetricBuilder::new(metrics).spill_count(partition), | ||||||||||||||||||||||||||||||||
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), | ||||||||||||||||||||||||||||||||
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition), | ||||||||||||||||||||||||||||||||
spill_metrics: SpillMetrics::new(metrics, partition), | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
@@ -230,9 +219,14 @@ struct ExternalSorter { | |||||||||||||||||||||||||||||||
/// if `Self::in_mem_batches` are sorted | ||||||||||||||||||||||||||||||||
in_mem_batches_sorted: bool, | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// If data has previously been spilled, the locations of the | ||||||||||||||||||||||||||||||||
/// spill files (in Arrow IPC format) | ||||||||||||||||||||||||||||||||
spills: Vec<RefCountedTempFile>, | ||||||||||||||||||||||||||||||||
/// During external sorting, in-memory intermediate data will be appended to | ||||||||||||||||||||||||||||||||
/// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`]. | ||||||||||||||||||||||||||||||||
in_progress_spill_file: Option<InProgressSpillFile>, | ||||||||||||||||||||||||||||||||
/// If data has previously been spilled, the locations of the spill files (in | ||||||||||||||||||||||||||||||||
/// Arrow IPC format) | ||||||||||||||||||||||||||||||||
/// Within the same spill file, the data might be chunked into multiple batches, | ||||||||||||||||||||||||||||||||
/// and ordered by sort keys. | ||||||||||||||||||||||||||||||||
finished_spill_files: Vec<RefCountedTempFile>, | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might make more sense to have the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it will be hard to define the semantics of those temp files if we put them inside
Overall, the Do you see any potential issues or improvements? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The different semantics for different operations makes sense to me I was thinking more mechnically, like just storing the Vec |
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// ======================================================================== | ||||||||||||||||||||||||||||||||
// EXECUTION RESOURCES: | ||||||||||||||||||||||||||||||||
|
@@ -244,6 +238,7 @@ struct ExternalSorter { | |||||||||||||||||||||||||||||||
runtime: Arc<RuntimeEnv>, | ||||||||||||||||||||||||||||||||
/// Reservation for in_mem_batches | ||||||||||||||||||||||||||||||||
reservation: MemoryReservation, | ||||||||||||||||||||||||||||||||
spill_manager: SpillManager, | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// Reservation for the merging of in-memory batches. If the sort | ||||||||||||||||||||||||||||||||
/// might spill, `sort_spill_reservation_bytes` will be | ||||||||||||||||||||||||||||||||
|
@@ -278,15 +273,23 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) | ||||||||||||||||||||||||||||||||
.register(&runtime.memory_pool); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let spill_manager = SpillManager::new( | ||||||||||||||||||||||||||||||||
Arc::clone(&runtime), | ||||||||||||||||||||||||||||||||
metrics.spill_metrics.clone(), | ||||||||||||||||||||||||||||||||
Arc::clone(&schema), | ||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||||||||
schema, | ||||||||||||||||||||||||||||||||
in_mem_batches: vec![], | ||||||||||||||||||||||||||||||||
in_mem_batches_sorted: false, | ||||||||||||||||||||||||||||||||
spills: vec![], | ||||||||||||||||||||||||||||||||
in_progress_spill_file: None, | ||||||||||||||||||||||||||||||||
finished_spill_files: vec![], | ||||||||||||||||||||||||||||||||
expr: expr.into(), | ||||||||||||||||||||||||||||||||
metrics, | ||||||||||||||||||||||||||||||||
fetch, | ||||||||||||||||||||||||||||||||
reservation, | ||||||||||||||||||||||||||||||||
spill_manager, | ||||||||||||||||||||||||||||||||
merge_reservation, | ||||||||||||||||||||||||||||||||
runtime, | ||||||||||||||||||||||||||||||||
batch_size, | ||||||||||||||||||||||||||||||||
|
@@ -320,7 +323,7 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
fn spilled_before(&self) -> bool { | ||||||||||||||||||||||||||||||||
!self.spills.is_empty() | ||||||||||||||||||||||||||||||||
!self.finished_spill_files.is_empty() | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// Returns the final sorted output of all batches inserted via | ||||||||||||||||||||||||||||||||
|
@@ -348,11 +351,11 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
self.sort_or_spill_in_mem_batches(true).await?; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
for spill in self.spills.drain(..) { | ||||||||||||||||||||||||||||||||
for spill in self.finished_spill_files.drain(..) { | ||||||||||||||||||||||||||||||||
if !spill.path().exists() { | ||||||||||||||||||||||||||||||||
return internal_err!("Spill file {:?} does not exist", spill.path()); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?; | ||||||||||||||||||||||||||||||||
let stream = self.spill_manager.read_spill_as_stream(spill)?; | ||||||||||||||||||||||||||||||||
streams.push(stream); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
|
@@ -379,46 +382,69 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// How many bytes have been spilled to disk? | ||||||||||||||||||||||||||||||||
fn spilled_bytes(&self) -> usize { | ||||||||||||||||||||||||||||||||
self.metrics.spilled_bytes.value() | ||||||||||||||||||||||||||||||||
self.metrics.spill_metrics.spilled_bytes.value() | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// How many rows have been spilled to disk? | ||||||||||||||||||||||||||||||||
fn spilled_rows(&self) -> usize { | ||||||||||||||||||||||||||||||||
self.metrics.spilled_rows.value() | ||||||||||||||||||||||||||||||||
self.metrics.spill_metrics.spilled_rows.value() | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// How many spill files have been created? | ||||||||||||||||||||||||||||||||
fn spill_count(&self) -> usize { | ||||||||||||||||||||||||||||||||
self.metrics.spill_count.value() | ||||||||||||||||||||||||||||||||
self.metrics.spill_metrics.spill_file_count.value() | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// Writes any `in_memory_batches` to a spill file and clears | ||||||||||||||||||||||||||||||||
/// the batches. The contents of the spill file are sorted. | ||||||||||||||||||||||||||||||||
/// When calling, all `in_mem_batches` must be sorted (*), and then all of them will | ||||||||||||||||||||||||||||||||
/// be appended to the in-progress spill file. | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If they must all be sorted, then maybe you can put an assert/check that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in bf4ab62 |
||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||
/// Returns the amount of memory freed. | ||||||||||||||||||||||||||||||||
async fn spill(&mut self) -> Result<usize> { | ||||||||||||||||||||||||||||||||
/// (*) 'Sorted' here means globally sorted for all buffered batches when the | ||||||||||||||||||||||||||||||||
/// memory limit is reached, instead of partially sorted within the batch. | ||||||||||||||||||||||||||||||||
async fn spill_append(&mut self) -> Result<()> { | ||||||||||||||||||||||||||||||||
assert!(self.in_mem_batches_sorted); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// we could always get a chance to free some memory as long as we are holding some | ||||||||||||||||||||||||||||||||
if self.in_mem_batches.is_empty() { | ||||||||||||||||||||||||||||||||
return Ok(0); | ||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// Lazily initialize the in-progress spill file | ||||||||||||||||||||||||||||||||
if self.in_progress_spill_file.is_none() { | ||||||||||||||||||||||||||||||||
self.in_progress_spill_file = | ||||||||||||||||||||||||||||||||
Some(self.spill_manager.create_in_progress_file("Sorting")?); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
self.organize_stringview_arrays()?; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; | ||||||||||||||||||||||||||||||||
let batches = std::mem::take(&mut self.in_mem_batches); | ||||||||||||||||||||||||||||||||
let (spilled_rows, spilled_bytes) = spill_record_batches( | ||||||||||||||||||||||||||||||||
&batches, | ||||||||||||||||||||||||||||||||
spill_file.path().into(), | ||||||||||||||||||||||||||||||||
Arc::clone(&self.schema), | ||||||||||||||||||||||||||||||||
)?; | ||||||||||||||||||||||||||||||||
let used = self.reservation.free(); | ||||||||||||||||||||||||||||||||
self.metrics.spill_count.add(1); | ||||||||||||||||||||||||||||||||
self.metrics.spilled_bytes.add(spilled_bytes); | ||||||||||||||||||||||||||||||||
self.metrics.spilled_rows.add(spilled_rows); | ||||||||||||||||||||||||||||||||
self.spills.push(spill_file); | ||||||||||||||||||||||||||||||||
Ok(used) | ||||||||||||||||||||||||||||||||
self.reservation.free(); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { | ||||||||||||||||||||||||||||||||
internal_datafusion_err!("In-progress spill file should be initialized") | ||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
for batch in batches { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this logic -- i thought that each individual Thus if we write write them back to back to the same spill file, the spill file itself won't be sorted Like if the two in memory batches are
I think this code would produce a single spill file like
Which is not sorted 🤔 On the other hand all the tests are passing so maybe I misunderstand what this is doing (or we have a testing gap) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, they are globally sorted. In different stages, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks -- maybe for this PR we could just add some comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed #15372 |
||||||||||||||||||||||||||||||||
in_progress_file.append_batch(&batch)?; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// Finishes the in-progress spill file and moves it to the finished spill files. | ||||||||||||||||||||||||||||||||
async fn spill_finish(&mut self) -> Result<()> { | ||||||||||||||||||||||||||||||||
let mut in_progress_file = | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am finding the various states of the ExternalSorter hard to track (specifically what are the valid combinations of I wonder if we could move to some sort of state enum that would make this easier to understand Like struct SortState
AllInMemory {...}
InProgressSpill { ... }
AllOnDisk {...}
...
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed #15372 |
||||||||||||||||||||||||||||||||
self.in_progress_spill_file.take().ok_or_else(|| { | ||||||||||||||||||||||||||||||||
internal_datafusion_err!("Should be called after `spill_append`") | ||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||
let spill_file = in_progress_file.finish()?; | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if let Some(spill_file) = spill_file { | ||||||||||||||||||||||||||||||||
self.finished_spill_files.push(spill_file); | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each | ||||||||||||||||||||||||||||||||
|
@@ -515,6 +541,7 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. | ||||||||||||||||||||||||||||||||
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly | ||||||||||||||||||||||||||||||||
// write sorted batches to disk when the memory is insufficient. | ||||||||||||||||||||||||||||||||
let mut spilled = false; | ||||||||||||||||||||||||||||||||
while let Some(batch) = sorted_stream.next().await { | ||||||||||||||||||||||||||||||||
let batch = batch?; | ||||||||||||||||||||||||||||||||
let sorted_size = get_reserved_byte_for_record_batch(&batch); | ||||||||||||||||||||||||||||||||
|
@@ -523,7 +550,8 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
// already in memory, so it's okay to combine it with previously | ||||||||||||||||||||||||||||||||
// sorted batches, and spill together. | ||||||||||||||||||||||||||||||||
self.in_mem_batches.push(batch); | ||||||||||||||||||||||||||||||||
self.spill().await?; // reservation is freed in spill() | ||||||||||||||||||||||||||||||||
self.spill_append().await?; // reservation is freed in spill() | ||||||||||||||||||||||||||||||||
spilled = true; | ||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||
self.in_mem_batches.push(batch); | ||||||||||||||||||||||||||||||||
self.in_mem_batches_sorted = true; | ||||||||||||||||||||||||||||||||
|
@@ -540,7 +568,12 @@ impl ExternalSorter { | |||||||||||||||||||||||||||||||
if (self.reservation.size() > before / 2) || force_spill { | ||||||||||||||||||||||||||||||||
// We have not freed more than 50% of the memory, so we have to spill to | ||||||||||||||||||||||||||||||||
// free up more memory | ||||||||||||||||||||||||||||||||
self.spill().await?; | ||||||||||||||||||||||||||||||||
self.spill_append().await?; | ||||||||||||||||||||||||||||||||
spilled = true; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if spilled { | ||||||||||||||||||||||||||||||||
self.spill_finish().await?; | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// Reserve headroom for next sort/merge | ||||||||||||||||||||||||||||||||
|
@@ -1489,7 +1522,14 @@ mod tests { | |||||||||||||||||||||||||||||||
// batches. | ||||||||||||||||||||||||||||||||
// The number of spills is roughly calculated as: | ||||||||||||||||||||||||||||||||
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)` | ||||||||||||||||||||||||||||||||
assert!((12..=18).contains(&spill_count)); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// If this assertion fail with large spill count, make sure the following | ||||||||||||||||||||||||||||||||
// case does not happen: | ||||||||||||||||||||||||||||||||
// During external sorting, one sorted run should be spilled to disk in a | ||||||||||||||||||||||||||||||||
// single file, due to memory limit we might need to append to the file | ||||||||||||||||||||||||||||||||
// multiple times to spill all the data. Make sure we're not writing each | ||||||||||||||||||||||||||||||||
// appending as a separate file. | ||||||||||||||||||||||||||||||||
assert!((4..=8).contains(&spill_count)); | ||||||||||||||||||||||||||||||||
assert!((15000..=20000).contains(&spilled_rows)); | ||||||||||||||||||||||||||||||||
assert!((900000..=1000000).contains(&spilled_bytes)); | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice