Skip to content

Commit b652cee

Browse files
committed
Don't reserve additional memory for merging after fetching the first mergd batch
1 parent 8cc9aea commit b652cee

File tree

2 files changed

+2
-16
lines changed
  • datafusion

2 files changed

+2
-16
lines changed

datafusion/core/tests/memory_limit/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async fn oom_sort() {
6969
.with_expected_errors(vec![
7070
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
7171
])
72-
.with_memory_limit(400_000)
72+
.with_memory_limit(500_000)
7373
.run()
7474
.await
7575
}

datafusion/physical-plan/src/sorts/sort.rs

+1-15
Original file line numberDiff line numberDiff line change
@@ -435,23 +435,12 @@ impl ExternalSorter {
435435
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
436436
// write sorted batches to disk when the memory is insufficient.
437437
let mut spill_writer: Option<IPCWriter> = None;
438-
// Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the
439-
// 1/3 is simply an arbitrary chosen number.
440-
let sort_merge_minimum_overhead = self.sort_spill_reservation_bytes / 3;
441438
while let Some(batch) = sorted_stream.next().await {
442439
let batch = batch?;
443440
match &mut spill_writer {
444441
None => {
445442
let sorted_size = get_reserved_byte_for_record_batch(&batch);
446-
447-
// We reserve more memory to ensure that we'll have enough memory for
448-
// `SortPreservingMergeStream` after consuming this batch, otherwise we'll
449-
// start spilling everything to disk.
450-
if self
451-
.reservation
452-
.try_grow(sorted_size + sort_merge_minimum_overhead)
453-
.is_err()
454-
{
443+
if self.reservation.try_grow(sorted_size).is_err() {
455444
// Directly write in_mem_batches as well as all the remaining batches in
456445
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
457446
// be handled by the `Some(writer)` matching arm.
@@ -469,9 +458,6 @@ impl ExternalSorter {
469458
self.spills.push(spill_file);
470459
} else {
471460
self.in_mem_batches.push(batch);
472-
473-
// Gives back memory for merging the next batch.
474-
self.reservation.shrink(sort_merge_minimum_overhead);
475461
}
476462
}
477463
Some(writer) => {

0 commit comments

Comments
 (0)