Skip to content

Commit 2cba3ad

Browse files
kumarlokeshalamb
andauthored
Set DataFusion runtime configurations through SQL interface (#15594)
* Set DataFusion runtime configurations through SQL interface * fix clippy warnings * use spill count based tests for checking applied memory limit --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3e08664 commit 2cba3ad

File tree

10 files changed

+439
-6
lines changed

10 files changed

+439
-6
lines changed

.github/workflows/rust.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,11 @@ jobs:
693693
# If you encounter an error, run './dev/update_function_docs.sh' and commit
694694
./dev/update_function_docs.sh
695695
git diff --exit-code
696+
- name: Check if runtime_configs.md has been modified
697+
run: |
698+
# If you encounter an error, run './dev/update_runtime_config_docs.sh' and commit
699+
./dev/update_runtime_config_docs.sh
700+
git diff --exit-code
696701
697702
# Verify MSRV for the crates which are directly used by other projects:
698703
# - datafusion
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
19+
20+
fn main() {
21+
let docs = RuntimeEnvBuilder::generate_config_markdown();
22+
println!("{}", docs);
23+
}

datafusion/core/src/execution/context/mod.rs

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ use crate::{
3535
},
3636
datasource::{provider_as_source, MemTable, ViewTable},
3737
error::{DataFusionError, Result},
38-
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
38+
execution::{
39+
options::ArrowReadOptions,
40+
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
41+
FunctionRegistry,
42+
},
3943
logical_expr::AggregateUDF,
4044
logical_expr::ScalarUDF,
4145
logical_expr::{
@@ -1036,13 +1040,73 @@ impl SessionContext {
10361040
variable, value, ..
10371041
} = stmt;
10381042

1039-
let mut state = self.state.write();
1040-
state.config_mut().options_mut().set(&variable, &value)?;
1041-
drop(state);
1043+
// Check if this is a runtime configuration
1044+
if variable.starts_with("datafusion.runtime.") {
1045+
self.set_runtime_variable(&variable, &value)?;
1046+
} else {
1047+
let mut state = self.state.write();
1048+
state.config_mut().options_mut().set(&variable, &value)?;
1049+
drop(state);
1050+
}
10421051

10431052
self.return_empty_dataframe()
10441053
}
10451054

1055+
fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> {
1056+
let key = variable.strip_prefix("datafusion.runtime.").unwrap();
1057+
1058+
match key {
1059+
"memory_limit" => {
1060+
let memory_limit = Self::parse_memory_limit(value)?;
1061+
1062+
let mut state = self.state.write();
1063+
let mut builder =
1064+
RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
1065+
builder = builder.with_memory_limit(memory_limit, 1.0);
1066+
*state = SessionStateBuilder::from(state.clone())
1067+
.with_runtime_env(Arc::new(builder.build()?))
1068+
.build();
1069+
}
1070+
_ => {
1071+
return Err(DataFusionError::Plan(format!(
1072+
"Unknown runtime configuration: {}",
1073+
variable
1074+
)))
1075+
}
1076+
}
1077+
Ok(())
1078+
}
1079+
1080+
/// Parse memory limit from string to number of bytes
1081+
/// Supports formats like '1.5G', '100M', '512K'
1082+
///
1083+
/// # Examples
1084+
/// ```
1085+
/// use datafusion::execution::context::SessionContext;
1086+
///
1087+
/// assert_eq!(SessionContext::parse_memory_limit("1M").unwrap(), 1024 * 1024);
1088+
/// assert_eq!(SessionContext::parse_memory_limit("1.5G").unwrap(), (1.5 * 1024.0 * 1024.0 * 1024.0) as usize);
1089+
/// ```
1090+
pub fn parse_memory_limit(limit: &str) -> Result<usize> {
1091+
let (number, unit) = limit.split_at(limit.len() - 1);
1092+
let number: f64 = number.parse().map_err(|_| {
1093+
DataFusionError::Plan(format!(
1094+
"Failed to parse number from memory limit '{}'",
1095+
limit
1096+
))
1097+
})?;
1098+
1099+
match unit {
1100+
"K" => Ok((number * 1024.0) as usize),
1101+
"M" => Ok((number * 1024.0 * 1024.0) as usize),
1102+
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
1103+
_ => Err(DataFusionError::Plan(format!(
1104+
"Unsupported unit '{}' in memory limit '{}'",
1105+
unit, limit
1106+
))),
1107+
}
1108+
}
1109+
10461110
async fn create_custom_table(
10471111
&self,
10481112
cmd: &CreateExternalTable,
@@ -1833,7 +1897,6 @@ mod tests {
18331897
use crate::test;
18341898
use crate::test_util::{plan_and_collect, populate_csv_partitions};
18351899
use arrow::datatypes::{DataType, TimeUnit};
1836-
use std::env;
18371900
use std::error::Error;
18381901
use std::path::PathBuf;
18391902

datafusion/core/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,12 @@ doc_comment::doctest!(
872872
user_guide_configs
873873
);
874874

875+
#[cfg(doctest)]
876+
doc_comment::doctest!(
877+
"../../../docs/source/user-guide/runtime_configs.md",
878+
user_guide_runtime_configs
879+
);
880+
875881
#[cfg(doctest)]
876882
doc_comment::doctest!(
877883
"../../../docs/source/user-guide/crate-configuration.md",

datafusion/core/tests/sql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub mod create_drop;
6363
pub mod explain_analyze;
6464
pub mod joins;
6565
mod path_partition;
66+
mod runtime_config;
6667
pub mod select;
6768
mod sql_api;
6869

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Tests for runtime configuration SQL interface
19+
20+
use std::sync::Arc;
21+
22+
use datafusion::execution::context::SessionContext;
23+
use datafusion::execution::context::TaskContext;
24+
use datafusion_physical_plan::common::collect;
25+
26+
#[tokio::test]
27+
async fn test_memory_limit_with_spill() {
28+
let ctx = SessionContext::new();
29+
30+
ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
31+
.await
32+
.unwrap()
33+
.collect()
34+
.await
35+
.unwrap();
36+
37+
ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0")
38+
.await
39+
.unwrap()
40+
.collect()
41+
.await
42+
.unwrap();
43+
44+
let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;";
45+
let df = ctx.sql(query).await.unwrap();
46+
47+
let plan = df.create_physical_plan().await.unwrap();
48+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
49+
let stream = plan.execute(0, task_ctx).unwrap();
50+
51+
let _results = collect(stream).await;
52+
let metrics = plan.metrics().unwrap();
53+
let spill_count = metrics.spill_count().unwrap();
54+
assert!(spill_count > 0, "Expected spills but none occurred");
55+
}
56+
57+
#[tokio::test]
58+
async fn test_no_spill_with_adequate_memory() {
59+
let ctx = SessionContext::new();
60+
61+
ctx.sql("SET datafusion.runtime.memory_limit = '10M'")
62+
.await
63+
.unwrap()
64+
.collect()
65+
.await
66+
.unwrap();
67+
ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0")
68+
.await
69+
.unwrap()
70+
.collect()
71+
.await
72+
.unwrap();
73+
74+
let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
75+
let df = ctx.sql(query).await.unwrap();
76+
77+
let plan = df.create_physical_plan().await.unwrap();
78+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
79+
let stream = plan.execute(0, task_ctx).unwrap();
80+
81+
let _results = collect(stream).await;
82+
let metrics = plan.metrics().unwrap();
83+
let spill_count = metrics.spill_count().unwrap();
84+
assert_eq!(spill_count, 0, "Expected no spills but some occurred");
85+
}
86+
87+
#[tokio::test]
88+
async fn test_multiple_configs() {
89+
let ctx = SessionContext::new();
90+
91+
ctx.sql("SET datafusion.runtime.memory_limit = '100M'")
92+
.await
93+
.unwrap()
94+
.collect()
95+
.await
96+
.unwrap();
97+
ctx.sql("SET datafusion.execution.batch_size = '2048'")
98+
.await
99+
.unwrap()
100+
.collect()
101+
.await
102+
.unwrap();
103+
104+
let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
105+
let result = ctx.sql(query).await.unwrap().collect().await;
106+
107+
assert!(result.is_ok(), "Should not fail due to memory limit");
108+
109+
let state = ctx.state();
110+
let batch_size = state.config().options().execution.batch_size;
111+
assert_eq!(batch_size, 2048);
112+
}
113+
114+
#[tokio::test]
115+
async fn test_memory_limit_enforcement() {
116+
let ctx = SessionContext::new();
117+
118+
ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
119+
.await
120+
.unwrap()
121+
.collect()
122+
.await
123+
.unwrap();
124+
125+
let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
126+
let result = ctx.sql(query).await.unwrap().collect().await;
127+
128+
assert!(result.is_err(), "Should fail due to memory limit");
129+
130+
ctx.sql("SET datafusion.runtime.memory_limit = '100M'")
131+
.await
132+
.unwrap()
133+
.collect()
134+
.await
135+
.unwrap();
136+
137+
let result = ctx.sql(query).await.unwrap().collect().await;
138+
139+
assert!(result.is_ok(), "Should not fail due to memory limit");
140+
}
141+
142+
#[tokio::test]
143+
async fn test_invalid_memory_limit() {
144+
let ctx = SessionContext::new();
145+
146+
let result = ctx
147+
.sql("SET datafusion.runtime.memory_limit = '100X'")
148+
.await;
149+
150+
assert!(result.is_err());
151+
let error_message = result.unwrap_err().to_string();
152+
assert!(error_message.contains("Unsupported unit 'X'"));
153+
}
154+
155+
#[tokio::test]
156+
async fn test_unknown_runtime_config() {
157+
let ctx = SessionContext::new();
158+
159+
let result = ctx
160+
.sql("SET datafusion.runtime.unknown_config = 'value'")
161+
.await;
162+
163+
assert!(result.is_err());
164+
let error_message = result.unwrap_err().to_string();
165+
assert!(error_message.contains("Unknown runtime configuration"));
166+
}

datafusion/execution/src/runtime_env.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
};
2828

2929
use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
30-
use datafusion_common::Result;
30+
use datafusion_common::{config::ConfigEntry, Result};
3131
use object_store::ObjectStore;
3232
use std::path::PathBuf;
3333
use std::sync::Arc;
@@ -268,4 +268,56 @@ impl RuntimeEnvBuilder {
268268
pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
269269
self.build().map(Arc::new)
270270
}
271+
272+
/// Create a new RuntimeEnvBuilder from an existing RuntimeEnv
273+
pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
274+
let cache_config = CacheManagerConfig {
275+
table_files_statistics_cache: runtime_env
276+
.cache_manager
277+
.get_file_statistic_cache(),
278+
list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
279+
};
280+
281+
Self {
282+
disk_manager: DiskManagerConfig::Existing(Arc::clone(
283+
&runtime_env.disk_manager,
284+
)),
285+
memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
286+
cache_manager: cache_config,
287+
object_store_registry: Arc::clone(&runtime_env.object_store_registry),
288+
}
289+
}
290+
291+
/// Returns a list of all available runtime configurations with their current values and descriptions
292+
pub fn entries(&self) -> Vec<ConfigEntry> {
293+
// Memory pool configuration
294+
vec![ConfigEntry {
295+
key: "datafusion.runtime.memory_limit".to_string(),
296+
value: None, // Default is system-dependent
297+
description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
298+
}]
299+
}
300+
301+
/// Generate documentation that can be included in the user guide
302+
pub fn generate_config_markdown() -> String {
303+
use std::fmt::Write as _;
304+
305+
let s = Self::default();
306+
307+
let mut docs = "| key | default | description |\n".to_string();
308+
docs += "|-----|---------|-------------|\n";
309+
let mut entries = s.entries();
310+
entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
311+
312+
for entry in &entries {
313+
let _ = writeln!(
314+
&mut docs,
315+
"| {} | {} | {} |",
316+
entry.key,
317+
entry.value.as_deref().unwrap_or("NULL"),
318+
entry.description
319+
);
320+
}
321+
docs
322+
}
271323
}

0 commit comments

Comments
 (0)