Skip to content

Commit c9442ce

Browse files
waitingkuoalamb
andauthored
support SET variable (#4069)
* support SET * remove useless comment * add test cases * Update datafusion/core/src/config.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/src/execution/context.rs Co-authored-by: Andrew Lamb <[email protected]> * fix test cases * fmt * Update datafusion/expr/src/logical_plan/plan.rs Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 761e167 commit c9442ce

File tree

13 files changed

+483
-7
lines changed

13 files changed

+483
-7
lines changed

datafusion/core/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,11 @@ impl ConfigOptions {
360360
self.set(key, ScalarValue::UInt64(Some(value)))
361361
}
362362

363+
/// set a `String` configuration option
364+
pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
365+
self.set(key, ScalarValue::Utf8(Some(value.into())))
366+
}
367+
363368
/// get a configuration option
364369
pub fn get(&self, key: &str) -> Option<ScalarValue> {
365370
self.options.get(key).cloned()

datafusion/core/src/execution/context.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ use crate::error::{DataFusionError, Result};
6767
use crate::logical_expr::{
6868
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
6969
CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
70-
TableSource, TableType, UNNAMED_TABLE,
70+
SetVariable, TableSource, TableType, UNNAMED_TABLE,
7171
};
7272
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
7373
use datafusion_sql::{ResolvedTableReference, TableReference};
@@ -341,6 +341,60 @@ impl SessionContext {
341341
))),
342342
}
343343
}
344+
345+
LogicalPlan::SetVariable(SetVariable {
346+
variable, value, ..
347+
}) => {
348+
let config_options = &self.state.write().config.config_options;
349+
350+
let old_value =
351+
config_options.read().get(&variable).ok_or_else(|| {
352+
DataFusionError::Execution(format!(
353+
"Can not SET variable: Unknown Variable {}",
354+
variable
355+
))
356+
})?;
357+
358+
match old_value {
359+
ScalarValue::Boolean(_) => {
360+
let new_value = value.parse::<bool>().map_err(|_| {
361+
DataFusionError::Execution(format!(
362+
"Failed to parse {} as bool",
363+
value,
364+
))
365+
})?;
366+
config_options.write().set_bool(&variable, new_value);
367+
}
368+
369+
ScalarValue::UInt64(_) => {
370+
let new_value = value.parse::<u64>().map_err(|_| {
371+
DataFusionError::Execution(format!(
372+
"Failed to parse {} as u64",
373+
value,
374+
))
375+
})?;
376+
config_options.write().set_u64(&variable, new_value);
377+
}
378+
379+
ScalarValue::Utf8(_) => {
380+
let new_value = value.parse::<String>().map_err(|_| {
381+
DataFusionError::Execution(format!(
382+
"Failed to parse {} as String",
383+
value,
384+
))
385+
})?;
386+
config_options.write().set_string(&variable, new_value);
387+
}
388+
389+
_ => {
390+
return Err(DataFusionError::Execution(
391+
"Unsupported Scalar Value Type".to_string(),
392+
))
393+
}
394+
}
395+
self.return_empty_dataframe()
396+
}
397+
344398
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
345399
schema_name,
346400
if_not_exists,

datafusion/core/src/physical_plan/planner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,11 @@ impl DefaultPhysicalPlanner {
10971097
"Unsupported logical plan: CreateView".to_string(),
10981098
))
10991099
}
1100+
LogicalPlan::SetVariable(_) => {
1101+
Err(DataFusionError::Internal(
1102+
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
1103+
))
1104+
}
11001105
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
11011106
"Unsupported logical plan: Explain must be root of the plan".to_string(),
11021107
)),

datafusion/core/tests/sql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ pub mod idenfifers;
109109
pub mod information_schema;
110110
pub mod parquet_schema;
111111
pub mod partitioned_csv;
112+
pub mod set_variable;
112113
pub mod subqueries;
113114
#[cfg(feature = "unicode_expressions")]
114115
pub mod unicode;
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use super::*;
19+
20+
#[tokio::test]
21+
async fn set_variable_to_value() {
22+
let ctx =
23+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
24+
25+
ctx.sql("SET datafusion.execution.batch_size to 1")
26+
.await
27+
.unwrap();
28+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
29+
.await
30+
.unwrap();
31+
let expected = vec![
32+
"+---------------------------------+---------+",
33+
"| name | setting |",
34+
"+---------------------------------+---------+",
35+
"| datafusion.execution.batch_size | 1 |",
36+
"+---------------------------------+---------+",
37+
];
38+
assert_batches_sorted_eq!(expected, &result);
39+
}
40+
41+
#[tokio::test]
42+
async fn set_variable_to_value_with_equal_sign() {
43+
let ctx =
44+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
45+
46+
ctx.sql("SET datafusion.execution.batch_size = 1")
47+
.await
48+
.unwrap();
49+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
50+
.await
51+
.unwrap();
52+
let expected = vec![
53+
"+---------------------------------+---------+",
54+
"| name | setting |",
55+
"+---------------------------------+---------+",
56+
"| datafusion.execution.batch_size | 1 |",
57+
"+---------------------------------+---------+",
58+
];
59+
assert_batches_sorted_eq!(expected, &result);
60+
}
61+
62+
#[tokio::test]
63+
async fn set_variable_to_value_with_single_quoted_string() {
64+
let ctx =
65+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
66+
67+
ctx.sql("SET datafusion.execution.batch_size to '1'")
68+
.await
69+
.unwrap();
70+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
71+
.await
72+
.unwrap();
73+
let expected = vec![
74+
"+---------------------------------+---------+",
75+
"| name | setting |",
76+
"+---------------------------------+---------+",
77+
"| datafusion.execution.batch_size | 1 |",
78+
"+---------------------------------+---------+",
79+
];
80+
assert_batches_sorted_eq!(expected, &result);
81+
}
82+
83+
#[tokio::test]
84+
async fn set_variable_to_value_case_insensitive() {
85+
let ctx =
86+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
87+
88+
ctx.sql("SET datafusion.EXECUTION.batch_size to '1'")
89+
.await
90+
.unwrap();
91+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
92+
.await
93+
.unwrap();
94+
let expected = vec![
95+
"+---------------------------------+---------+",
96+
"| name | setting |",
97+
"+---------------------------------+---------+",
98+
"| datafusion.execution.batch_size | 1 |",
99+
"+---------------------------------+---------+",
100+
];
101+
assert_batches_sorted_eq!(expected, &result);
102+
}
103+
104+
#[tokio::test]
105+
async fn set_variable_unknown_variable() {
106+
let ctx = SessionContext::new();
107+
108+
let err = plan_and_collect(&ctx, "SET aabbcc to '1'")
109+
.await
110+
.unwrap_err();
111+
assert_eq!(
112+
err.to_string(),
113+
"Execution error: Can not SET variable: Unknown Variable aabbcc"
114+
);
115+
}
116+
117+
#[tokio::test]
118+
async fn set_bool_variable() {
119+
let ctx =
120+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
121+
122+
ctx.sql("SET datafusion.execution.coalesce_batches to true")
123+
.await
124+
.unwrap();
125+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches")
126+
.await
127+
.unwrap();
128+
let expected = vec![
129+
"+---------------------------------------+---------+",
130+
"| name | setting |",
131+
"+---------------------------------------+---------+",
132+
"| datafusion.execution.coalesce_batches | true |",
133+
"+---------------------------------------+---------+",
134+
];
135+
assert_batches_eq!(expected, &result);
136+
137+
ctx.sql("SET datafusion.execution.coalesce_batches to 'false'")
138+
.await
139+
.unwrap();
140+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches")
141+
.await
142+
.unwrap();
143+
let expected = vec![
144+
"+---------------------------------------+---------+",
145+
"| name | setting |",
146+
"+---------------------------------------+---------+",
147+
"| datafusion.execution.coalesce_batches | false |",
148+
"+---------------------------------------+---------+",
149+
];
150+
assert_batches_eq!(expected, &result);
151+
}
152+
153+
#[tokio::test]
154+
async fn set_bool_variable_bad_value() {
155+
let ctx =
156+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
157+
158+
let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to 1")
159+
.await
160+
.unwrap_err();
161+
162+
assert_eq!(
163+
err.to_string(),
164+
"Execution error: Failed to parse 1 as bool"
165+
);
166+
167+
let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to abc")
168+
.await
169+
.unwrap_err();
170+
171+
assert_eq!(
172+
err.to_string(),
173+
"Execution error: Failed to parse abc as bool"
174+
);
175+
}
176+
177+
#[tokio::test]
178+
async fn set_u64_variable() {
179+
let ctx =
180+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
181+
182+
ctx.sql("SET datafusion.execution.batch_size to 0")
183+
.await
184+
.unwrap();
185+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
186+
.await
187+
.unwrap();
188+
let expected = vec![
189+
"+---------------------------------+---------+",
190+
"| name | setting |",
191+
"+---------------------------------+---------+",
192+
"| datafusion.execution.batch_size | 0 |",
193+
"+---------------------------------+---------+",
194+
];
195+
assert_batches_eq!(expected, &result);
196+
197+
ctx.sql("SET datafusion.execution.batch_size to '1'")
198+
.await
199+
.unwrap();
200+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
201+
.await
202+
.unwrap();
203+
let expected = vec![
204+
"+---------------------------------+---------+",
205+
"| name | setting |",
206+
"+---------------------------------+---------+",
207+
"| datafusion.execution.batch_size | 1 |",
208+
"+---------------------------------+---------+",
209+
];
210+
assert_batches_eq!(expected, &result);
211+
212+
ctx.sql("SET datafusion.execution.batch_size to +2")
213+
.await
214+
.unwrap();
215+
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
216+
.await
217+
.unwrap();
218+
let expected = vec![
219+
"+---------------------------------+---------+",
220+
"| name | setting |",
221+
"+---------------------------------+---------+",
222+
"| datafusion.execution.batch_size | 2 |",
223+
"+---------------------------------+---------+",
224+
];
225+
assert_batches_eq!(expected, &result);
226+
}
227+
228+
#[tokio::test]
229+
async fn set_u64_variable_bad_value() {
230+
let ctx =
231+
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
232+
233+
let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to -1")
234+
.await
235+
.unwrap_err();
236+
237+
assert_eq!(
238+
err.to_string(),
239+
"Execution error: Failed to parse -1 as u64"
240+
);
241+
242+
let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to abc")
243+
.await
244+
.unwrap_err();
245+
246+
assert_eq!(
247+
err.to_string(),
248+
"Execution error: Failed to parse abc as u64"
249+
);
250+
251+
let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to 0.1")
252+
.await
253+
.unwrap_err();
254+
255+
assert_eq!(
256+
err.to_string(),
257+
"Execution error: Failed to parse 0.1 as u64"
258+
);
259+
}
260+
261+
#[tokio::test]
262+
async fn set_time_zone() {
263+
// we don't support changing time zone for now until all time zone issues fixed and related function completed
264+
265+
let ctx = SessionContext::new();
266+
267+
// for full variable name
268+
let err = plan_and_collect(&ctx, "set datafusion.execution.time_zone = '8'")
269+
.await
270+
.unwrap_err();
271+
272+
assert_eq!(
273+
err.to_string(),
274+
"Error during planning: Changing Time Zone isn't supported yet"
275+
);
276+
277+
// for alias time zone
278+
let err = plan_and_collect(&ctx, "set time zone = '8'")
279+
.await
280+
.unwrap_err();
281+
282+
assert_eq!(
283+
err.to_string(),
284+
"Error during planning: Changing Time Zone isn't supported yet"
285+
);
286+
287+
// for alias timezone
288+
let err = plan_and_collect(&ctx, "set timezone = '8'")
289+
.await
290+
.unwrap_err();
291+
292+
assert_eq!(
293+
err.to_string(),
294+
"Error during planning: Changing Time Zone isn't supported yet"
295+
);
296+
}

0 commit comments

Comments
 (0)