Skip to content

Commit 387541c

Browse files
2010YOUY01Copilot
andauthored
feat: Add config max_temp_directory_size to limit max disk usage for spilling queries (#15520)
* Add disk limit field inside disk manager * Implement disk usage tracking * Update datafusion/execution/src/disk_manager.rs Co-authored-by: Copilot <[email protected]> * Let unit-test use less memory * reduce UT's memory and disk usage to < 1MB * typo --------- Co-authored-by: Copilot <[email protected]>
1 parent 56af232 commit 387541c

File tree

4 files changed

+210
-7
lines changed

4 files changed

+210
-7
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@ use datafusion_common::{assert_contains, Result};
4444
use datafusion_execution::memory_pool::{
4545
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
4646
};
47-
use datafusion_execution::TaskContext;
47+
use datafusion_execution::runtime_env::RuntimeEnv;
48+
use datafusion_execution::{DiskManager, TaskContext};
4849
use datafusion_expr::{Expr, TableType};
4950
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
5051
use datafusion_physical_optimizer::join_selection::JoinSelection;
5152
use datafusion_physical_optimizer::PhysicalOptimizerRule;
53+
use datafusion_physical_plan::collect as collect_batches;
5254
use datafusion_physical_plan::common::collect;
5355
use datafusion_physical_plan::spill::get_record_batch_memory_size;
5456
use rand::Rng;
@@ -524,6 +526,95 @@ async fn test_external_sort_zero_merge_reservation() {
524526
assert!(spill_count > 0);
525527
}
526528

529+
// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
530+
// ------------------------------------------------------------------
531+
532+
// Create a new `SessionContext` with speicified disk limit and memory pool limit
533+
async fn setup_context(
534+
disk_limit: u64,
535+
memory_pool_limit: usize,
536+
) -> Result<SessionContext> {
537+
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
538+
539+
let disk_manager = Arc::try_unwrap(disk_manager)
540+
.expect("DiskManager should be a single instance")
541+
.with_max_temp_directory_size(disk_limit)?;
542+
543+
let runtime = RuntimeEnvBuilder::new()
544+
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
545+
.build_arc()
546+
.unwrap();
547+
548+
let runtime = Arc::new(RuntimeEnv {
549+
memory_pool: runtime.memory_pool.clone(),
550+
disk_manager: Arc::new(disk_manager),
551+
cache_manager: runtime.cache_manager.clone(),
552+
object_store_registry: runtime.object_store_registry.clone(),
553+
});
554+
555+
let config = SessionConfig::new()
556+
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
557+
.with_sort_in_place_threshold_bytes(0)
558+
.with_batch_size(64) // To reduce test memory usage
559+
.with_target_partitions(1);
560+
561+
Ok(SessionContext::new_with_config_rt(config, runtime))
562+
}
563+
564+
/// If the spilled bytes exceed the disk limit, the query should fail
565+
/// (specified by `max_temp_directory_size` in `DiskManager`)
566+
#[tokio::test]
567+
async fn test_disk_spill_limit_reached() -> Result<()> {
568+
let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit
569+
570+
let df = ctx
571+
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
572+
.await
573+
.unwrap();
574+
575+
let err = df.collect().await.unwrap_err();
576+
assert_contains!(
577+
err.to_string(),
578+
"The used disk space during the spilling process has exceeded the allowable limit"
579+
);
580+
581+
Ok(())
582+
}
583+
584+
/// External query should succeed, if the spilled bytes is less than the disk limit
585+
/// Also verify that after the query is finished, all the disk usage accounted by
586+
/// tempfiles are cleaned up.
587+
#[tokio::test]
588+
async fn test_disk_spill_limit_not_reached() -> Result<()> {
589+
let disk_spill_limit = 1024 * 1024; // 1MB
590+
let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit
591+
592+
let df = ctx
593+
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
594+
.await
595+
.unwrap();
596+
let plan = df.create_physical_plan().await.unwrap();
597+
598+
let task_ctx = ctx.task_ctx();
599+
let _ = collect_batches(Arc::clone(&plan), task_ctx)
600+
.await
601+
.expect("Query execution failed");
602+
603+
let spill_count = plan.metrics().unwrap().spill_count().unwrap();
604+
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
605+
606+
println!("spill count {}, spill bytes {}", spill_count, spilled_bytes);
607+
assert!(spill_count > 0);
608+
assert!((spilled_bytes as u64) < disk_spill_limit);
609+
610+
// Verify that all temporary files have been properly cleaned up by checking
611+
// that the total disk usage tracked by the disk manager is zero
612+
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
613+
assert_eq!(current_disk_usage, 0);
614+
615+
Ok(())
616+
}
617+
527618
/// Run the query with the specified memory limit,
528619
/// and verifies the expected errors are returned
529620
#[derive(Clone, Debug)]

datafusion/execution/src/disk_manager.rs

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717

1818
//! [`DiskManager`]: Manages files generated during query execution
1919
20-
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
20+
use datafusion_common::{
21+
config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22+
};
2123
use log::debug;
2224
use parking_lot::Mutex;
2325
use rand::{thread_rng, Rng};
2426
use std::path::{Path, PathBuf};
27+
use std::sync::atomic::{AtomicU64, Ordering};
2528
use std::sync::Arc;
2629
use tempfile::{Builder, NamedTempFile, TempDir};
2730

31+
use crate::memory_pool::human_readable_size;
32+
33+
const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34+
2835
/// Configuration for temporary disk access
2936
#[derive(Debug, Clone)]
3037
pub enum DiskManagerConfig {
@@ -75,6 +82,12 @@ pub struct DiskManager {
7582
/// If `Some(vec![])` a new OS specified temporary directory will be created
7683
/// If `None` an error will be returned (configured not to spill)
7784
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
85+
/// The maximum amount of data (in bytes) stored inside the temporary directories.
86+
/// Default to 100GB
87+
max_temp_directory_size: u64,
88+
/// Used disk space in the temporary directories. Now only spilled data for
89+
/// external executors are counted.
90+
used_disk_space: Arc<AtomicU64>,
7891
}
7992

8093
impl DiskManager {
@@ -84,6 +97,8 @@ impl DiskManager {
8497
DiskManagerConfig::Existing(manager) => Ok(manager),
8598
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
8699
local_dirs: Mutex::new(Some(vec![])),
100+
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
101+
used_disk_space: Arc::new(AtomicU64::new(0)),
87102
})),
88103
DiskManagerConfig::NewSpecified(conf_dirs) => {
89104
let local_dirs = create_local_dirs(conf_dirs)?;
@@ -93,14 +108,38 @@ impl DiskManager {
93108
);
94109
Ok(Arc::new(Self {
95110
local_dirs: Mutex::new(Some(local_dirs)),
111+
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
112+
used_disk_space: Arc::new(AtomicU64::new(0)),
96113
}))
97114
}
98115
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
99116
local_dirs: Mutex::new(None),
117+
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
118+
used_disk_space: Arc::new(AtomicU64::new(0)),
100119
})),
101120
}
102121
}
103122

123+
pub fn with_max_temp_directory_size(
124+
mut self,
125+
max_temp_directory_size: u64,
126+
) -> Result<Self> {
127+
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
128+
// this operation is not meaningful, fail early.
129+
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
130+
return config_err!(
131+
"Cannot set max temp directory size for a disk manager that spilling is disabled"
132+
);
133+
}
134+
135+
self.max_temp_directory_size = max_temp_directory_size;
136+
Ok(self)
137+
}
138+
139+
pub fn used_disk_space(&self) -> u64 {
140+
self.used_disk_space.load(Ordering::Relaxed)
141+
}
142+
104143
/// Return true if this disk manager supports creating temporary
105144
/// files. If this returns false, any call to `create_tmp_file`
106145
/// will error.
@@ -113,7 +152,7 @@ impl DiskManager {
113152
/// If the file can not be created for some reason, returns an
114153
/// error message referencing the request description
115154
pub fn create_tmp_file(
116-
&self,
155+
self: &Arc<Self>,
117156
request_description: &str,
118157
) -> Result<RefCountedTempFile> {
119158
let mut guard = self.local_dirs.lock();
@@ -142,18 +181,31 @@ impl DiskManager {
142181
tempfile: Builder::new()
143182
.tempfile_in(local_dirs[dir_index].as_ref())
144183
.map_err(DataFusionError::IoError)?,
184+
current_file_disk_usage: 0,
185+
disk_manager: Arc::clone(self),
145186
})
146187
}
147188
}
148189

149190
/// A wrapper around a [`NamedTempFile`] that also contains
150-
/// a reference to its parent temporary directory
191+
/// a reference to its parent temporary directory.
192+
///
193+
/// # Note
194+
/// After any modification to the underlying file (e.g., writing data to it), the caller
195+
/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
196+
/// This ensures the disk manager can properly enforce usage limits configured by
197+
/// [`DiskManager::with_max_temp_directory_size`].
151198
#[derive(Debug)]
152199
pub struct RefCountedTempFile {
153200
/// The reference to the directory in which temporary files are created to ensure
154201
/// it is not cleaned up prior to the NamedTempFile
155202
_parent_temp_dir: Arc<TempDir>,
156203
tempfile: NamedTempFile,
204+
/// Tracks the current disk usage of this temporary file. See
205+
/// [`Self::update_disk_usage`] for more details.
206+
current_file_disk_usage: u64,
207+
/// The disk manager that created and manages this temporary file
208+
disk_manager: Arc<DiskManager>,
157209
}
158210

159211
impl RefCountedTempFile {
@@ -164,6 +216,50 @@ impl RefCountedTempFile {
164216
pub fn inner(&self) -> &NamedTempFile {
165217
&self.tempfile
166218
}
219+
220+
/// Updates the global disk usage counter after modifications to the underlying file.
221+
///
222+
/// # Errors
223+
/// - Returns an error if the global disk usage exceeds the configured limit.
224+
pub fn update_disk_usage(&mut self) -> Result<()> {
225+
// Get new file size from OS
226+
let metadata = self.tempfile.as_file().metadata()?;
227+
let new_disk_usage = metadata.len();
228+
229+
// Update the global disk usage by:
230+
// 1. Subtracting the old file size from the global counter
231+
self.disk_manager
232+
.used_disk_space
233+
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
234+
// 2. Adding the new file size to the global counter
235+
self.disk_manager
236+
.used_disk_space
237+
.fetch_add(new_disk_usage, Ordering::Relaxed);
238+
239+
// 3. Check if the updated global disk usage exceeds the configured limit
240+
let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
241+
if global_disk_usage > self.disk_manager.max_temp_directory_size {
242+
return resources_err!(
243+
"The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
244+
human_readable_size(self.disk_manager.max_temp_directory_size as usize)
245+
);
246+
}
247+
248+
// 4. Update the local file size tracking
249+
self.current_file_disk_usage = new_disk_usage;
250+
251+
Ok(())
252+
}
253+
}
254+
255+
/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
256+
impl Drop for RefCountedTempFile {
257+
fn drop(&mut self) {
258+
// Subtract the current file's disk usage from the global counter
259+
self.disk_manager
260+
.used_disk_space
261+
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
262+
}
167263
}
168264

169265
/// Setup local dirs by creating one new dir in each of the given dirs

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ impl InProgressSpillFile {
4949
}
5050
}
5151

52-
/// Appends a `RecordBatch` to the file, initializing the writer if necessary.
52+
/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
53+
///
54+
/// # Errors
55+
/// - Returns an error if the file is not active (has been finalized)
56+
/// - Returns an error if appending would exceed the disk usage limit configured
57+
/// by `max_temp_directory_size` in `DiskManager`
5358
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
5459
if self.in_progress_file.is_none() {
5560
return Err(exec_datafusion_err!(
@@ -70,6 +75,11 @@ impl InProgressSpillFile {
7075
}
7176
if let Some(writer) = &mut self.writer {
7277
let (spilled_rows, spilled_bytes) = writer.write(batch)?;
78+
if let Some(in_progress_file) = &mut self.in_progress_file {
79+
in_progress_file.update_disk_usage()?;
80+
} else {
81+
unreachable!() // Already checked inside current function
82+
}
7383

7484
// Update metrics
7585
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ impl SpillManager {
7373
/// intended to incrementally write in-memory batches into the same spill file,
7474
/// use [`Self::create_in_progress_file`] instead.
7575
/// None is returned if no batches are spilled.
76-
#[allow(dead_code)] // TODO: remove after change SMJ to use SpillManager
76+
///
77+
/// # Errors
78+
/// - Returns an error if spilling would exceed the disk usage limit configured
79+
/// by `max_temp_directory_size` in `DiskManager`
7780
pub fn spill_record_batch_and_finish(
7881
&self,
7982
batches: &[RecordBatch],
@@ -90,7 +93,10 @@ impl SpillManager {
9093

9194
/// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
9295
/// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
93-
#[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager
96+
///
97+
/// # Errors
98+
/// - Returns an error if spilling would exceed the disk usage limit configured
99+
/// by `max_temp_directory_size` in `DiskManager`
94100
pub fn spill_record_batch_by_size(
95101
&self,
96102
batch: &RecordBatch,

0 commit comments

Comments
 (0)