Skip to content

Commit b243011

Browse files
authored
refactor: SpillManager into a separate file (#15407)
* refactor: SpillManager into a separate file * chore
1 parent 14beb79 commit b243011

File tree

4 files changed

+235
-169
lines changed

4 files changed

+235
-169
lines changed

datafusion/physical-plan/src/sorts/sort.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use crate::metrics::{
3333
};
3434
use crate::projection::{make_with_child, update_expr, ProjectionExec};
3535
use crate::sorts::streaming_merge::StreamingMergeBuilder;
36-
use crate::spill::{get_record_batch_memory_size, InProgressSpillFile, SpillManager};
36+
use crate::spill::get_record_batch_memory_size;
37+
use crate::spill::in_progress_spill_file::InProgressSpillFile;
38+
use crate::spill::spill_manager::SpillManager;
3739
use crate::stream::RecordBatchStreamAdapter;
3840
use crate::topk::TopK;
3941
use crate::{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Define the `InProgressSpillFile` struct, which represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
19+
20+
use datafusion_common::Result;
21+
use std::sync::Arc;
22+
23+
use arrow::array::RecordBatch;
24+
use datafusion_common::exec_datafusion_err;
25+
use datafusion_execution::disk_manager::RefCountedTempFile;
26+
27+
use super::{spill_manager::SpillManager, IPCStreamWriter};
28+
29+
/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
30+
/// Caller is able to use this struct to incrementally append in-memory batches to
31+
/// the file, and then finalize the file by calling the `finish` method.
32+
pub struct InProgressSpillFile {
33+
pub(crate) spill_writer: Arc<SpillManager>,
34+
/// Lazily initialized writer
35+
writer: Option<IPCStreamWriter>,
36+
/// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
37+
in_progress_file: Option<RefCountedTempFile>,
38+
}
39+
40+
impl InProgressSpillFile {
41+
pub fn new(
42+
spill_writer: Arc<SpillManager>,
43+
in_progress_file: RefCountedTempFile,
44+
) -> Self {
45+
Self {
46+
spill_writer,
47+
in_progress_file: Some(in_progress_file),
48+
writer: None,
49+
}
50+
}
51+
52+
/// Appends a `RecordBatch` to the file, initializing the writer if necessary.
53+
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
54+
if self.in_progress_file.is_none() {
55+
return Err(exec_datafusion_err!(
56+
"Append operation failed: No active in-progress file. The file may have already been finalized."
57+
));
58+
}
59+
if self.writer.is_none() {
60+
let schema = batch.schema();
61+
if let Some(ref in_progress_file) = self.in_progress_file {
62+
self.writer = Some(IPCStreamWriter::new(
63+
in_progress_file.path(),
64+
schema.as_ref(),
65+
)?);
66+
67+
// Update metrics
68+
self.spill_writer.metrics.spill_file_count.add(1);
69+
}
70+
}
71+
if let Some(writer) = &mut self.writer {
72+
let (spilled_rows, spilled_bytes) = writer.write(batch)?;
73+
74+
// Update metrics
75+
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
76+
self.spill_writer.metrics.spilled_rows.add(spilled_rows);
77+
}
78+
Ok(())
79+
}
80+
81+
/// Finalizes the file, returning the completed file reference.
82+
/// If there are no batches spilled before, it returns `None`.
83+
pub fn finish(&mut self) -> Result<Option<RefCountedTempFile>> {
84+
if let Some(writer) = &mut self.writer {
85+
writer.finish()?;
86+
} else {
87+
return Ok(None);
88+
}
89+
90+
Ok(self.in_progress_file.take())
91+
}
92+
}

datafusion/physical-plan/src/spill.rs renamed to datafusion/physical-plan/src/spill/mod.rs

+7-168
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717

1818
//! Defines the spilling functions
1919
20+
pub(crate) mod in_progress_spill_file;
21+
pub(crate) mod spill_manager;
22+
2023
use std::fs::File;
2124
use std::io::BufReader;
2225
use std::path::{Path, PathBuf};
2326
use std::ptr::NonNull;
24-
use std::sync::Arc;
2527

2628
use arrow::array::ArrayData;
2729
use arrow::datatypes::{Schema, SchemaRef};
2830
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
2931
use arrow::record_batch::RecordBatch;
30-
use datafusion_execution::runtime_env::RuntimeEnv;
3132
use log::debug;
3233
use tokio::sync::mpsc::Sender;
3334

@@ -36,7 +37,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
3637
use datafusion_execution::memory_pool::human_readable_size;
3738
use datafusion_execution::SendableRecordBatchStream;
3839

39-
use crate::metrics::SpillMetrics;
4040
use crate::stream::RecordBatchReceiverStream;
4141

4242
/// Read spilled batches from the disk
@@ -229,182 +229,21 @@ impl IPCStreamWriter {
229229
}
230230
}
231231

232-
/// The `SpillManager` is responsible for the following tasks:
233-
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
234-
/// - Updating the associated metrics.
235-
///
236-
/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
237-
/// For example, all records within the same spill file are ordered according to a specific order.
238-
#[derive(Debug, Clone)]
239-
pub(crate) struct SpillManager {
240-
env: Arc<RuntimeEnv>,
241-
metrics: SpillMetrics,
242-
schema: SchemaRef,
243-
/// Number of batches to buffer in memory during disk reads
244-
batch_read_buffer_capacity: usize,
245-
// TODO: Add general-purpose compression options
246-
}
247-
248-
impl SpillManager {
249-
pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
250-
Self {
251-
env,
252-
metrics,
253-
schema,
254-
batch_read_buffer_capacity: 2,
255-
}
256-
}
257-
258-
/// Creates a temporary file for in-progress operations, returning an error
259-
/// message if file creation fails. The file can be used to append batches
260-
/// incrementally and then finish the file when done.
261-
pub fn create_in_progress_file(
262-
&self,
263-
request_msg: &str,
264-
) -> Result<InProgressSpillFile> {
265-
let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
266-
Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
267-
}
268-
269-
/// Spill input `batches` into a single file in a atomic operation. If it is
270-
/// intended to incrementally write in-memory batches into the same spill file,
271-
/// use [`Self::create_in_progress_file`] instead.
272-
/// None is returned if no batches are spilled.
273-
#[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager
274-
pub fn spill_record_batch_and_finish(
275-
&self,
276-
batches: &[RecordBatch],
277-
request_msg: &str,
278-
) -> Result<Option<RefCountedTempFile>> {
279-
let mut in_progress_file = self.create_in_progress_file(request_msg)?;
280-
281-
for batch in batches {
282-
in_progress_file.append_batch(batch)?;
283-
}
284-
285-
in_progress_file.finish()
286-
}
287-
288-
/// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
289-
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
290-
#[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager
291-
pub fn spill_record_batch_by_size(
292-
&self,
293-
batch: &RecordBatch,
294-
request_description: &str,
295-
row_limit: usize,
296-
) -> Result<Option<RefCountedTempFile>> {
297-
let total_rows = batch.num_rows();
298-
let mut batches = Vec::new();
299-
let mut offset = 0;
300-
301-
// It's ok to calculate all slices first, because slicing is zero-copy.
302-
while offset < total_rows {
303-
let length = std::cmp::min(total_rows - offset, row_limit);
304-
let sliced_batch = batch.slice(offset, length);
305-
batches.push(sliced_batch);
306-
offset += length;
307-
}
308-
309-
// Spill the sliced batches to disk
310-
self.spill_record_batch_and_finish(&batches, request_description)
311-
}
312-
313-
/// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
314-
/// This method will generate output in FIFO order: the batch appended first
315-
/// will be read first.
316-
pub fn read_spill_as_stream(
317-
&self,
318-
spill_file_path: RefCountedTempFile,
319-
) -> Result<SendableRecordBatchStream> {
320-
let mut builder = RecordBatchReceiverStream::builder(
321-
Arc::clone(&self.schema),
322-
self.batch_read_buffer_capacity,
323-
);
324-
let sender = builder.tx();
325-
326-
builder.spawn_blocking(move || read_spill(sender, spill_file_path.path()));
327-
328-
Ok(builder.build())
329-
}
330-
}
331-
332-
/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
333-
/// Caller is able to use this struct to incrementally append in-memory batches to
334-
/// the file, and then finalize the file by calling the `finish` method.
335-
pub(crate) struct InProgressSpillFile {
336-
spill_writer: Arc<SpillManager>,
337-
/// Lazily initialized writer
338-
writer: Option<IPCStreamWriter>,
339-
/// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
340-
in_progress_file: Option<RefCountedTempFile>,
341-
}
342-
343-
impl InProgressSpillFile {
344-
pub fn new(
345-
spill_writer: Arc<SpillManager>,
346-
in_progress_file: RefCountedTempFile,
347-
) -> Self {
348-
Self {
349-
spill_writer,
350-
in_progress_file: Some(in_progress_file),
351-
writer: None,
352-
}
353-
}
354-
355-
/// Appends a `RecordBatch` to the file, initializing the writer if necessary.
356-
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
357-
if self.in_progress_file.is_none() {
358-
return Err(exec_datafusion_err!(
359-
"Append operation failed: No active in-progress file. The file may have already been finalized."
360-
));
361-
}
362-
if self.writer.is_none() {
363-
let schema = batch.schema();
364-
if let Some(ref in_progress_file) = self.in_progress_file {
365-
self.writer = Some(IPCStreamWriter::new(
366-
in_progress_file.path(),
367-
schema.as_ref(),
368-
)?);
369-
370-
// Update metrics
371-
self.spill_writer.metrics.spill_file_count.add(1);
372-
}
373-
}
374-
if let Some(writer) = &mut self.writer {
375-
let (spilled_rows, spilled_bytes) = writer.write(batch)?;
376-
377-
// Update metrics
378-
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
379-
self.spill_writer.metrics.spilled_rows.add(spilled_rows);
380-
}
381-
Ok(())
382-
}
383-
384-
/// Finalizes the file, returning the completed file reference.
385-
/// If there are no batches spilled before, it returns `None`.
386-
pub fn finish(&mut self) -> Result<Option<RefCountedTempFile>> {
387-
if let Some(writer) = &mut self.writer {
388-
writer.finish()?;
389-
} else {
390-
return Ok(None);
391-
}
392-
393-
Ok(self.in_progress_file.take())
394-
}
395-
}
396-
397232
#[cfg(test)]
398233
mod tests {
234+
use super::in_progress_spill_file::InProgressSpillFile;
399235
use super::*;
400236
use crate::common::collect;
401237
use crate::metrics::ExecutionPlanMetricsSet;
238+
use crate::metrics::SpillMetrics;
239+
use crate::spill::spill_manager::SpillManager;
402240
use crate::test::build_table_i32;
403241
use arrow::array::{Float64Array, Int32Array, ListArray, StringArray};
404242
use arrow::compute::cast;
405243
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
406244
use arrow::record_batch::RecordBatch;
407245
use datafusion_common::Result;
246+
use datafusion_execution::runtime_env::RuntimeEnv;
408247

409248
use std::sync::Arc;
410249

0 commit comments

Comments
 (0)