Skip to content

Commit cc413d5

Browse files
committed
use arrow IPC Stream format for spill files
1 parent aadb0b6 commit cc413d5

File tree

4 files changed

+93
-44
lines changed

4 files changed

+93
-44
lines changed

datafusion/physical-plan/src/common.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ pub fn compute_record_batch_statistics(
180180
}
181181
}
182182

183-
/// Write in Arrow IPC format.
183+
/// Write in Arrow IPC File format.
184184
pub struct IPCWriter {
185185
/// Path
186186
pub path: PathBuf,

datafusion/physical-plan/src/joins/sort_merge_join.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1394,7 +1394,7 @@ impl SortMergeJoinStream {
13941394

13951395
if let Some(batch) = buffered_batch.batch {
13961396
spill_record_batches(
1397-
vec![batch],
1397+
&[batch],
13981398
spill_file.path().into(),
13991399
Arc::clone(&self.buffered_schema),
14001400
)?;

datafusion/physical-plan/src/sorts/sort.rs

+28-28
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::fmt;
2424
use std::fmt::{Debug, Formatter};
2525
use std::sync::Arc;
2626

27-
use crate::common::{spawn_buffered, IPCWriter};
27+
use crate::common::spawn_buffered;
2828
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2929
use crate::expressions::PhysicalSortExpr;
3030
use crate::limit::LimitStream;
@@ -35,6 +35,7 @@ use crate::projection::{make_with_child, update_expr, ProjectionExec};
3535
use crate::sorts::streaming_merge::StreamingMergeBuilder;
3636
use crate::spill::{
3737
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
38+
IPCStreamWriter,
3839
};
3940
use crate::stream::RecordBatchStreamAdapter;
4041
use crate::topk::TopK;
@@ -402,7 +403,7 @@ impl ExternalSorter {
402403
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
403404
let batches = std::mem::take(&mut self.in_mem_batches);
404405
let (spilled_rows, spilled_bytes) = spill_record_batches(
405-
batches,
406+
&batches,
406407
spill_file.path().into(),
407408
Arc::clone(&self.schema),
408409
)?;
@@ -439,36 +440,35 @@ impl ExternalSorter {
439440
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
440441
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
441442
// write sorted batches to disk when the memory is insufficient.
442-
let mut spill_writer: Option<IPCWriter> = None;
443+
let mut spill_writer: Option<IPCStreamWriter> = None;
443444
while let Some(batch) = sorted_stream.next().await {
444445
let batch = batch?;
445-
match &mut spill_writer {
446-
None => {
447-
let sorted_size = get_reserved_byte_for_record_batch(&batch);
448-
if self.reservation.try_grow(sorted_size).is_err() {
449-
// Directly write in_mem_batches as well as all the remaining batches in
450-
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
451-
// be handled by the `Some(writer)` matching arm.
452-
let spill_file =
453-
self.runtime.disk_manager.create_tmp_file("Sorting")?;
454-
let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?;
455-
// Flush everything in memory to the spill file
456-
for batch in self.in_mem_batches.drain(..) {
457-
writer.write(&batch)?;
458-
}
459-
// as well as the newly sorted batch
460-
writer.write(&batch)?;
461-
spill_writer = Some(writer);
462-
self.reservation.free();
463-
self.spills.push(spill_file);
464-
} else {
465-
self.in_mem_batches.push(batch);
466-
self.in_mem_batches_sorted = true;
467-
}
468-
}
469-
Some(writer) => {
446+
447+
// If we've started spilling, just keep spilling
448+
if let Some(spill_writer) = &mut spill_writer {
449+
spill_writer.write(&batch)?;
450+
continue;
451+
}
452+
453+
let sorted_size = get_reserved_byte_for_record_batch(&batch);
454+
if self.reservation.try_grow(sorted_size).is_err() {
455+
// Directly write in_mem_batches as well as all the remaining batches in
456+
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
457+
// be handled by the `Some(writer)` matching arm.
458+
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
459+
let mut writer = IPCStreamWriter::new(spill_file.path(), &self.schema)?;
460+
// Flush everything in memory to the spill file
461+
for batch in self.in_mem_batches.drain(..) {
470462
writer.write(&batch)?;
471463
}
464+
// as well as the newly sorted batch
465+
writer.write(&batch)?;
466+
spill_writer = Some(writer);
467+
self.reservation.free();
468+
self.spills.push(spill_file);
469+
} else {
470+
self.in_mem_batches.push(batch);
471+
self.in_mem_batches_sorted = true;
472472
}
473473
}
474474

datafusion/physical-plan/src/spill.rs

+63-14
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use std::path::{Path, PathBuf};
2323
use std::ptr::NonNull;
2424

2525
use arrow::array::ArrayData;
26-
use arrow::datatypes::SchemaRef;
27-
use arrow::ipc::reader::FileReader;
26+
use arrow::datatypes::{Schema, SchemaRef};
27+
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
2828
use arrow::record_batch::RecordBatch;
2929
use log::debug;
3030
use tokio::sync::mpsc::Sender;
@@ -34,7 +34,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
3434
use datafusion_execution::memory_pool::human_readable_size;
3535
use datafusion_execution::SendableRecordBatchStream;
3636

37-
use crate::common::IPCWriter;
3837
use crate::stream::RecordBatchReceiverStream;
3938

4039
/// Read spilled batches from the disk
@@ -59,13 +58,13 @@ pub(crate) fn read_spill_as_stream(
5958
///
6059
/// Returns total number of the rows spilled to disk.
6160
pub(crate) fn spill_record_batches(
62-
batches: Vec<RecordBatch>,
61+
batches: &[RecordBatch],
6362
path: PathBuf,
6463
schema: SchemaRef,
6564
) -> Result<(usize, usize)> {
66-
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
65+
let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
6766
for batch in batches {
68-
writer.write(&batch)?;
67+
writer.write(batch)?;
6968
}
7069
writer.finish()?;
7170
debug!(
@@ -79,7 +78,7 @@ pub(crate) fn spill_record_batches(
7978

8079
fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
8180
let file = BufReader::new(File::open(path)?);
82-
let reader = FileReader::try_new(file, None)?;
81+
let reader = StreamReader::try_new(file, None)?;
8382
for batch in reader {
8483
sender
8584
.blocking_send(batch.map_err(Into::into))
@@ -98,7 +97,7 @@ pub fn spill_record_batch_by_size(
9897
) -> Result<()> {
9998
let mut offset = 0;
10099
let total_rows = batch.num_rows();
101-
let mut writer = IPCWriter::new(&path, schema.as_ref())?;
100+
let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;
102101

103102
while offset < total_rows {
104103
let length = std::cmp::min(total_rows - offset, batch_size_rows);
@@ -130,7 +129,7 @@ pub fn spill_record_batch_by_size(
130129
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
131130
/// ^ ^ ^ ^
132131
/// | | | |
133-
/// col1->{ } | |
132+
/// col1->{ } | |
134133
/// col2--------->{ }
135134
///
136135
/// In the above case, `get_record_batch_memory_size` will return the size of
@@ -179,6 +178,51 @@ fn count_array_data_memory_size(
179178
}
180179
}
181180

181+
/// Write in Arrow IPC Stream format to a file.
182+
///
183+
/// Stream format is used for spill because it supports dictionary replacement, and the random
184+
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
185+
pub(crate) struct IPCStreamWriter {
186+
/// Inner writer
187+
pub writer: StreamWriter<File>,
188+
/// Batches written
189+
pub num_batches: usize,
190+
/// Rows written
191+
pub num_rows: usize,
192+
/// Bytes written
193+
pub num_bytes: usize,
194+
}
195+
196+
impl IPCStreamWriter {
197+
/// Create new writer
198+
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
199+
let file = File::create(path).map_err(|e| {
200+
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
201+
})?;
202+
Ok(Self {
203+
num_batches: 0,
204+
num_rows: 0,
205+
num_bytes: 0,
206+
writer: StreamWriter::try_new(file, schema)?,
207+
})
208+
}
209+
210+
/// Write one single batch
211+
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
212+
self.writer.write(batch)?;
213+
self.num_batches += 1;
214+
self.num_rows += batch.num_rows();
215+
let num_bytes: usize = batch.get_array_memory_size();
216+
self.num_bytes += num_bytes;
217+
Ok(())
218+
}
219+
220+
/// Finish the writer
221+
pub fn finish(&mut self) -> Result<()> {
222+
self.writer.finish().map_err(Into::into)
223+
}
224+
}
225+
182226
#[cfg(test)]
183227
mod tests {
184228
use super::*;
@@ -190,6 +234,7 @@ mod tests {
190234
use datafusion_common::Result;
191235
use datafusion_execution::disk_manager::DiskManagerConfig;
192236
use datafusion_execution::DiskManager;
237+
use itertools::Itertools;
193238
use std::fs::File;
194239
use std::io::BufReader;
195240
use std::sync::Arc;
@@ -214,18 +259,20 @@ mod tests {
214259
let schema = batch1.schema();
215260
let num_rows = batch1.num_rows() + batch2.num_rows();
216261
let (spilled_rows, _) = spill_record_batches(
217-
vec![batch1, batch2],
262+
&[batch1, batch2],
218263
spill_file.path().into(),
219264
Arc::clone(&schema),
220265
)?;
221266
assert_eq!(spilled_rows, num_rows);
222267

223268
let file = BufReader::new(File::open(spill_file.path())?);
224-
let reader = FileReader::try_new(file, None)?;
269+
let reader = StreamReader::try_new(file, None)?;
225270

226-
assert_eq!(reader.num_batches(), 2);
227271
assert_eq!(reader.schema(), schema);
228272

273+
let batches = reader.collect_vec();
274+
assert!(batches.len() == 2);
275+
229276
Ok(())
230277
}
231278

@@ -249,11 +296,13 @@ mod tests {
249296
)?;
250297

251298
let file = BufReader::new(File::open(spill_file.path())?);
252-
let reader = FileReader::try_new(file, None)?;
299+
let reader = StreamReader::try_new(file, None)?;
253300

254-
assert_eq!(reader.num_batches(), 4);
255301
assert_eq!(reader.schema(), schema);
256302

303+
let batches = reader.collect_vec();
304+
assert!(batches.len() == 4);
305+
257306
Ok(())
258307
}
259308

0 commit comments

Comments
 (0)