Skip to content

Commit 4baa901

Browse files
authored
Use tracked-consumers memory pool be the default. (#11949)
* feat(11523): set the default memory pool to the tracked-consumer pool * test(11523): update tests for the OOM message including the top consumers * chore(11523): remove duplicate wording from OOM messages
1 parent cb3ec77 commit 4baa901

File tree

6 files changed

+38
-46
lines changed

6 files changed

+38
-46
lines changed

datafusion/core/tests/memory_limit/mod.rs

+12-23
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ async fn group_by_none() {
7676
TestCase::new()
7777
.with_query("select median(request_bytes) from t")
7878
.with_expected_errors(vec![
79-
"Resources exhausted: Failed to allocate additional",
80-
"AggregateStream",
79+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: AggregateStream"
8180
])
8281
.with_memory_limit(2_000)
8382
.run()
@@ -89,8 +88,7 @@ async fn group_by_row_hash() {
8988
TestCase::new()
9089
.with_query("select count(*) from t GROUP BY response_bytes")
9190
.with_expected_errors(vec![
92-
"Resources exhausted: Failed to allocate additional",
93-
"GroupedHashAggregateStream",
91+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream"
9492
])
9593
.with_memory_limit(2_000)
9694
.run()
@@ -103,8 +101,7 @@ async fn group_by_hash() {
103101
// group by dict column
104102
.with_query("select count(*) from t GROUP BY service, host, pod, container")
105103
.with_expected_errors(vec![
106-
"Resources exhausted: Failed to allocate additional",
107-
"GroupedHashAggregateStream",
104+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream"
108105
])
109106
.with_memory_limit(1_000)
110107
.run()
@@ -117,8 +114,7 @@ async fn join_by_key_multiple_partitions() {
117114
TestCase::new()
118115
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
119116
.with_expected_errors(vec![
120-
"Resources exhausted: Failed to allocate additional",
121-
"HashJoinInput[0]",
117+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[0]",
122118
])
123119
.with_memory_limit(1_000)
124120
.with_config(config)
@@ -132,8 +128,7 @@ async fn join_by_key_single_partition() {
132128
TestCase::new()
133129
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
134130
.with_expected_errors(vec![
135-
"Resources exhausted: Failed to allocate additional",
136-
"HashJoinInput",
131+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput",
137132
])
138133
.with_memory_limit(1_000)
139134
.with_config(config)
@@ -146,8 +141,7 @@ async fn join_by_expression() {
146141
TestCase::new()
147142
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
148143
.with_expected_errors(vec![
149-
"Resources exhausted: Failed to allocate additional",
150-
"NestedLoopJoinLoad[0]",
144+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]",
151145
])
152146
.with_memory_limit(1_000)
153147
.run()
@@ -159,8 +153,7 @@ async fn cross_join() {
159153
TestCase::new()
160154
.with_query("select t1.* from t t1 CROSS JOIN t t2")
161155
.with_expected_errors(vec![
162-
"Resources exhausted: Failed to allocate additional",
163-
"CrossJoinExec",
156+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec",
164157
])
165158
.with_memory_limit(1_000)
166159
.run()
@@ -216,8 +209,7 @@ async fn symmetric_hash_join() {
216209
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
217210
)
218211
.with_expected_errors(vec![
219-
"Resources exhausted: Failed to allocate additional",
220-
"SymmetricHashJoinStream",
212+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SymmetricHashJoinStream",
221213
])
222214
.with_memory_limit(1_000)
223215
.with_scenario(Scenario::AccessLogStreaming)
@@ -235,8 +227,7 @@ async fn sort_preserving_merge() {
235227
// so only a merge is needed
236228
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
237229
.with_expected_errors(vec![
238-
"Resources exhausted: Failed to allocate additional",
239-
"SortPreservingMergeExec",
230+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SortPreservingMergeExec",
240231
])
241232
// provide insufficient memory to merge
242233
.with_memory_limit(partition_size / 2)
@@ -313,8 +304,7 @@ async fn sort_spill_reservation() {
313304

314305
test.clone()
315306
.with_expected_errors(vec![
316-
"Resources exhausted: Failed to allocate additional",
317-
"ExternalSorterMerge", // merging in sort fails
307+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
318308
])
319309
.with_config(config)
320310
.run()
@@ -343,8 +333,7 @@ async fn oom_recursive_cte() {
343333
SELECT * FROM nodes;",
344334
)
345335
.with_expected_errors(vec![
346-
"Resources exhausted: Failed to allocate additional",
347-
"RecursiveQuery",
336+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: RecursiveQuery",
348337
])
349338
.with_memory_limit(2_000)
350339
.run()
@@ -396,7 +385,7 @@ async fn oom_with_tracked_consumer_pool() {
396385
.with_expected_errors(vec![
397386
"Failed to allocate additional",
398387
"for ParquetSink(ArrowColumnWriter)",
399-
"Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)"
388+
"Additional allocation failed with top memory consumers (across reservations) as: ParquetSink(ArrowColumnWriter)"
400389
])
401390
.with_memory_pool(Arc::new(
402391
TrackConsumersPool::new(

datafusion/execution/src/memory_pool/pool.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ fn provide_top_memory_consumers_to_error_msg(
392392
error_msg: String,
393393
top_consumers: String,
394394
) -> String {
395-
format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg)
395+
format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg)
396396
}
397397

398398
#[cfg(test)]
@@ -501,7 +501,7 @@ mod tests {
501501
// Test: reports if new reservation causes error
502502
// using the previously set sizes for other consumers
503503
let mut r5 = MemoryConsumer::new("r5").register(&pool);
504-
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
504+
let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
505505
let res = r5.try_grow(150);
506506
assert!(
507507
matches!(
@@ -524,7 +524,7 @@ mod tests {
524524

525525
// Test: see error message when no consumers recorded yet
526526
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
527-
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
527+
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
528528
let res = r0.try_grow(150);
529529
assert!(
530530
matches!(
@@ -543,7 +543,7 @@ mod tests {
543543
let mut r1 = new_consumer_same_name.clone().register(&pool);
544544
// TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
545545
// a followup PR will clarify this message "0 bytes already allocated for this reservation"
546-
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
546+
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
547547
let res = r1.try_grow(150);
548548
assert!(
549549
matches!(
@@ -555,7 +555,7 @@ mod tests {
555555

556556
// Test: will accumulate size changes per consumer, not per reservation
557557
r1.grow(20);
558-
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
558+
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
559559
let res = r1.try_grow(150);
560560
assert!(
561561
matches!(
@@ -570,7 +570,7 @@ mod tests {
570570
let consumer_with_same_name_but_different_hash =
571571
MemoryConsumer::new(same_name).with_can_spill(true);
572572
let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
573-
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
573+
let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
574574
let res = r2.try_grow(150);
575575
assert!(
576576
matches!(
@@ -590,7 +590,7 @@ mod tests {
590590
let r1_consumer = MemoryConsumer::new("r1");
591591
let mut r1 = r1_consumer.clone().register(&pool);
592592
r1.grow(20);
593-
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
593+
let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
594594
let res = r0.try_grow(150);
595595
assert!(
596596
matches!(
@@ -604,7 +604,7 @@ mod tests {
604604
// Test: unregister one
605605
// only the remaining one should be listed
606606
pool.unregister(&r1_consumer);
607-
let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes";
607+
let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes";
608608
let res = r0.try_grow(150);
609609
assert!(
610610
matches!(

datafusion/execution/src/runtime_env.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@
2020
2121
use crate::{
2222
disk_manager::{DiskManager, DiskManagerConfig},
23-
memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
23+
memory_pool::{
24+
GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
25+
},
2426
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
2527
};
2628

2729
use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
2830
use datafusion_common::{DataFusionError, Result};
2931
use object_store::ObjectStore;
30-
use std::fmt::{Debug, Formatter};
3132
use std::path::PathBuf;
3233
use std::sync::Arc;
34+
use std::{
35+
fmt::{Debug, Formatter},
36+
num::NonZeroUsize,
37+
};
3338
use url::Url;
3439

3540
#[derive(Clone)]
@@ -213,7 +218,10 @@ impl RuntimeConfig {
213218
/// Note DataFusion does not yet respect this limit in all cases.
214219
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
215220
let pool_size = (max_memory as f64 * memory_fraction) as usize;
216-
self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
221+
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
222+
GreedyMemoryPool::new(pool_size),
223+
NonZeroUsize::new(5).unwrap(),
224+
)))
217225
}
218226

219227
/// Use the specified path to create any needed temporary files

datafusion/physical-plan/src/joins/cross_join.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -693,9 +693,8 @@ mod tests {
693693

694694
assert_contains!(
695695
err.to_string(),
696-
"External error: Resources exhausted: Failed to allocate additional"
696+
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec"
697697
);
698-
assert_contains!(err.to_string(), "CrossJoinExec");
699698

700699
Ok(())
701700
}

datafusion/physical-plan/src/joins/hash_join.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -3821,13 +3821,11 @@ mod tests {
38213821
let stream = join.execute(0, task_ctx)?;
38223822
let err = common::collect(stream).await.unwrap_err();
38233823

3824+
// Asserting that operator-level reservation attempting to overallocate
38243825
assert_contains!(
38253826
err.to_string(),
3826-
"External error: Resources exhausted: Failed to allocate additional"
3827+
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput"
38273828
);
3828-
3829-
// Asserting that operator-level reservation attempting to overallocate
3830-
assert_contains!(err.to_string(), "HashJoinInput");
38313829
}
38323830

38333831
Ok(())
@@ -3902,13 +3900,12 @@ mod tests {
39023900
let stream = join.execute(1, task_ctx)?;
39033901
let err = common::collect(stream).await.unwrap_err();
39043902

3903+
// Asserting that stream-level reservation attempting to overallocate
39053904
assert_contains!(
39063905
err.to_string(),
3907-
"External error: Resources exhausted: Failed to allocate additional"
3908-
);
3906+
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]"
39093907

3910-
// Asserting that stream-level reservation attempting to overallocate
3911-
assert_contains!(err.to_string(), "HashJoinInput[1]");
3908+
);
39123909
}
39133910

39143911
Ok(())

datafusion/physical-plan/src/joins/nested_loop_join.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1039,9 +1039,8 @@ mod tests {
10391039

10401040
assert_contains!(
10411041
err.to_string(),
1042-
"External error: Resources exhausted: Failed to allocate additional"
1042+
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
10431043
);
1044-
assert_contains!(err.to_string(), "NestedLoopJoinLoad[0]");
10451044
}
10461045

10471046
Ok(())

0 commit comments

Comments
 (0)