Skip to content

Commit 921c3b6

Browse files
wiedldalamb
andauthored
Add TrackedMemoryPool with better error messages on exhaustion (#11665)
* feat(11523): TrackConsumersPool impl which includes errors messages with top K of consumers * test(11523): unit tests for TrackConsumersPool * test(11523): integration test for tracked consumers oom message * chore(11523): use nonzero usize * chore(11523): document the what the memory insufficient_capacity_err is actually returning * chore(11523): improve test failure coverage for TrackConsumersPool * fix(11523): handle additive tracking of same hashed consumer, across different reservations * refactor(11523): update error message to delineate the multiple consumer with the same name, but different hash * test(11523): demonstrate the underlying pool behavior on deregister * chore: make explicit what the insufficient_capacity_err() logs * fix(11523): remove to_root() for the error, since the immediate inner child should be returning an OOM * chore(11523): add result to logging of failed CI tests * fix(11523): splice error message to get consumers prior to error message * Revert "fix(11523): splice error message to get consumers prior to error message" This reverts commit 09b20d2. * fix(11523): fix without splicing error messages, and instead handle the proper error bubbling (msg wrapping) * chore: update docs to explain purpose of TrackConsumersPool Co-authored-by: Andrew Lamb <[email protected]> * refactor(11523): enable TrackConsumersPool to be used in runtime metrics --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3fe1860 commit 921c3b6

File tree

3 files changed

+431
-3
lines changed

3 files changed

+431
-3
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@ use datafusion::assert_batches_eq;
2626
use datafusion::physical_optimizer::PhysicalOptimizerRule;
2727
use datafusion::physical_plan::memory::MemoryExec;
2828
use datafusion::physical_plan::streaming::PartitionStream;
29+
use datafusion_execution::memory_pool::{
30+
GreedyMemoryPool, MemoryPool, TrackConsumersPool,
31+
};
2932
use datafusion_expr::{Expr, TableType};
3033
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
3134
use futures::StreamExt;
3235
use std::any::Any;
36+
use std::num::NonZeroUsize;
3337
use std::sync::{Arc, OnceLock};
3438
use tokio::fs::File;
3539

@@ -371,13 +375,47 @@ async fn oom_parquet_sink() {
371375
.await
372376
}
373377

378+
#[tokio::test]
379+
async fn oom_with_tracked_consumer_pool() {
380+
let dir = tempfile::tempdir().unwrap();
381+
let path = dir.into_path().join("test.parquet");
382+
let _ = File::create(path.clone()).await.unwrap();
383+
384+
TestCase::new()
385+
.with_config(
386+
SessionConfig::new()
387+
)
388+
.with_query(format!(
389+
"
390+
COPY (select * from t)
391+
TO '{}'
392+
STORED AS PARQUET OPTIONS (compression 'uncompressed');
393+
",
394+
path.to_string_lossy()
395+
))
396+
.with_expected_errors(vec![
397+
"Failed to allocate additional",
398+
"for ParquetSink(ArrowColumnWriter)",
399+
"Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)"
400+
])
401+
.with_memory_pool(Arc::new(
402+
TrackConsumersPool::new(
403+
GreedyMemoryPool::new(200_000),
404+
NonZeroUsize::new(1).unwrap()
405+
)
406+
))
407+
.run()
408+
.await
409+
}
410+
374411
/// Run the query with the specified memory limit,
375412
/// and verifies the expected errors are returned
376413
#[derive(Clone, Debug)]
377414
struct TestCase {
378415
query: Option<String>,
379416
expected_errors: Vec<String>,
380417
memory_limit: usize,
418+
memory_pool: Option<Arc<dyn MemoryPool>>,
381419
config: SessionConfig,
382420
scenario: Scenario,
383421
/// How should the disk manager (that allows spilling) be
@@ -396,6 +434,7 @@ impl TestCase {
396434
expected_errors: vec![],
397435
memory_limit: 0,
398436
config: SessionConfig::new(),
437+
memory_pool: None,
399438
scenario: Scenario::AccessLog,
400439
disk_manager_config: DiskManagerConfig::Disabled,
401440
expected_plan: vec![],
@@ -425,6 +464,15 @@ impl TestCase {
425464
self
426465
}
427466

467+
/// Set the memory pool to be used
468+
///
469+
/// This will override the memory_limit requested,
470+
/// as the memory pool includes the limit.
471+
fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
472+
self.memory_pool = Some(memory_pool);
473+
self
474+
}
475+
428476
/// Specify the configuration to use
429477
pub fn with_config(mut self, config: SessionConfig) -> Self {
430478
self.config = config;
@@ -465,6 +513,7 @@ impl TestCase {
465513
query,
466514
expected_errors,
467515
memory_limit,
516+
memory_pool,
468517
config,
469518
scenario,
470519
disk_manager_config,
@@ -474,11 +523,15 @@ impl TestCase {
474523

475524
let table = scenario.table();
476525

477-
let rt_config = RuntimeConfig::new()
526+
let mut rt_config = RuntimeConfig::new()
478527
// disk manager setting controls the spilling
479528
.with_disk_manager(disk_manager_config)
480529
.with_memory_limit(memory_limit, MEMORY_FRACTION);
481530

531+
if let Some(pool) = memory_pool {
532+
rt_config = rt_config.with_memory_pool(pool);
533+
};
534+
482535
let runtime = RuntimeEnv::new(rt_config).unwrap();
483536

484537
// Configure execution

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
117117
/// For help with allocation accounting, see the [proxy] module.
118118
///
119119
/// [proxy]: crate::memory_pool::proxy
120-
#[derive(Debug)]
120+
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
121121
pub struct MemoryConsumer {
122122
name: String,
123123
can_spill: bool,

0 commit comments

Comments
 (0)