Skip to content

Commit 8cc9aea

Browse files
committed
Fix double memory allocation caused by collecting the merged batches;
Fix batch memory consumption growth after sorting; Reserve memory more aggressively to compensate for memory needed for merging.
1 parent 28856e1 commit 8cc9aea

File tree

8 files changed

+304
-70
lines changed

8 files changed

+304
-70
lines changed

datafusion/core/tests/fuzz_cases/sort_fuzz.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ async fn test_sort_10k_mem() {
5454
#[tokio::test]
5555
#[cfg_attr(tarpaulin, ignore)]
5656
async fn test_sort_100k_mem() {
57-
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
57+
for (batch_size, should_spill) in
58+
[(5, false), (10000, false), (20000, true), (1000000, true)]
59+
{
5860
SortTest::new()
5961
.with_int32_batches(batch_size)
6062
.with_pool_size(100 * KB)

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async fn oom_sort() {
6969
.with_expected_errors(vec![
7070
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
7171
])
72-
.with_memory_limit(200_000)
72+
.with_memory_limit(400_000)
7373
.run()
7474
.await
7575
}
@@ -271,7 +271,8 @@ async fn sort_spill_reservation() {
271271

272272
// Merge operation needs extra memory to do row conversion, so make the
273273
// memory limit larger.
274-
let mem_limit = partition_size * 2;
274+
let mem_limit =
275+
((partition_size * 2 + 1024) as f64 / MEMORY_FRACTION).ceil() as usize;
275276
let test = TestCase::new()
276277
// This query uses a different order than the input table to
277278
// force a sort. It also needs to have multiple columns to
@@ -308,7 +309,8 @@ async fn sort_spill_reservation() {
308309

309310
test.clone()
310311
.with_expected_errors(vec![
311-
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
312+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
313+
"bytes for ExternalSorterMerge",
312314
])
313315
.with_config(config)
314316
.run()

datafusion/physical-plan/src/sorts/cursor.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,22 @@ pub struct ArrayValues<T: CursorValues> {
291291
// Otherwise, the first null index
292292
null_threshold: usize,
293293
options: SortOptions,
294+
295+
/// Tracks the memory used by the values array,
296+
/// freed on drop.
297+
_reservation: MemoryReservation,
294298
}
295299

296300
impl<T: CursorValues> ArrayValues<T> {
297301
/// Create a new [`ArrayValues`] from the provided `values` sorted according
298302
/// to `options`.
299303
///
300304
/// Panics if the array is empty
301-
pub fn new<A: CursorArray<Values = T>>(options: SortOptions, array: &A) -> Self {
305+
pub fn new<A: CursorArray<Values = T>>(
306+
options: SortOptions,
307+
array: &A,
308+
reservation: MemoryReservation,
309+
) -> Self {
302310
assert!(array.len() > 0, "Empty array passed to FieldCursor");
303311
let null_threshold = match options.nulls_first {
304312
true => array.null_count(),
@@ -309,6 +317,7 @@ impl<T: CursorValues> ArrayValues<T> {
309317
values: array.values(),
310318
null_threshold,
311319
options,
320+
_reservation: reservation,
312321
}
313322
}
314323

@@ -360,6 +369,12 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {
360369

361370
#[cfg(test)]
362371
mod tests {
372+
use std::sync::Arc;
373+
374+
use datafusion_execution::memory_pool::{
375+
GreedyMemoryPool, MemoryConsumer, MemoryPool,
376+
};
377+
363378
use super::*;
364379

365380
fn new_primitive(
@@ -372,10 +387,15 @@ mod tests {
372387
false => values.len() - null_count,
373388
};
374389

390+
let memory_pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(10000));
391+
let consumer = MemoryConsumer::new("test");
392+
let reservation = consumer.register(&memory_pool);
393+
375394
let values = ArrayValues {
376395
values: PrimitiveValues(values),
377396
null_threshold,
378397
options,
398+
_reservation: reservation,
379399
};
380400

381401
Cursor::new(values)

0 commit comments

Comments
 (0)