Skip to content

Commit 86ca11a

Browse files
committed
Add SessionConfig reference to ScalarFunctionArgs
1 parent 592fe6a commit 86ca11a

File tree

105 files changed

+1506
-391
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+1506
-391
lines changed

datafusion-examples/examples/composed_extension_codec.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@ async fn main() {
7171

7272
// deserialize proto back to execution plan
7373
let runtime = ctx.runtime_env();
74+
let state = ctx.state();
75+
let config_options = state.config_options();
7476
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
75-
.try_into_physical_plan(&ctx, runtime.deref(), &composed_codec)
77+
.try_into_physical_plan(&ctx, config_options, runtime.deref(), &composed_codec)
7678
.expect("from proto");
7779

7880
// assert that the original and deserialized execution plans are equal

datafusion-examples/examples/expr_api.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use datafusion::common::stats::Precision;
2626
use datafusion::common::tree_node::{Transformed, TreeNode};
2727
use datafusion::common::{ColumnStatistics, DFSchema};
2828
use datafusion::common::{ScalarValue, ToDFSchema};
29+
use datafusion::config::ConfigOptions;
2930
use datafusion::error::Result;
3031
use datafusion::functions_aggregate::first_last::first_value_udaf;
3132
use datafusion::logical_expr::execution_props::ExecutionProps;
@@ -35,7 +36,9 @@ use datafusion::logical_expr::simplify::SimplifyContext;
3536
use datafusion::logical_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
3637
use datafusion::optimizer::analyzer::type_coercion::TypeCoercionRewriter;
3738
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
38-
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
39+
use datafusion::physical_expr::{
40+
analyze, create_physical_expr, AnalysisContext, ExprBoundaries,
41+
};
3942
use datafusion::prelude::*;
4043

4144
/// This example demonstrates the DataFusion [`Expr`] API.
@@ -176,7 +179,8 @@ fn simplify_demo() -> Result<()> {
176179
// expressions, such as the current time (to evaluate `now()`
177180
// correctly)
178181
let props = ExecutionProps::new();
179-
let context = SimplifyContext::new(&props).with_schema(schema);
182+
let config_options = ConfigOptions::default_singleton_arc();
183+
let context = SimplifyContext::new(&props, config_options).with_schema(schema);
180184
let simplifier = ExprSimplifier::new(context);
181185

182186
// And then call the simplify_expr function:
@@ -191,7 +195,8 @@ fn simplify_demo() -> Result<()> {
191195

192196
// here are some other examples of what DataFusion is capable of
193197
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
194-
let context = SimplifyContext::new(&props).with_schema(schema.clone());
198+
let context =
199+
SimplifyContext::new(&props, config_options).with_schema(schema.clone());
195200
let simplifier = ExprSimplifier::new(context);
196201

197202
// basic arithmetic simplification
@@ -529,8 +534,8 @@ fn type_coercion_demo() -> Result<()> {
529534

530535
// Evaluation with an expression that has not been type coerced cannot succeed.
531536
let props = ExecutionProps::default();
532-
let physical_expr =
533-
datafusion::physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
537+
let config_options = ConfigOptions::default_singleton_arc();
538+
let physical_expr = create_physical_expr(&expr, &df_schema, &props, config_options)?;
534539
let e = physical_expr.evaluate(&batch).unwrap_err();
535540
assert!(e
536541
.find_root()
@@ -543,26 +548,21 @@ fn type_coercion_demo() -> Result<()> {
543548
assert!(physical_expr.evaluate(&batch).is_ok());
544549

545550
// 2. Type coercion with `ExprSimplifier::coerce`.
546-
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
551+
let context = SimplifyContext::new(&props, config_options)
552+
.with_schema(Arc::new(df_schema.clone()));
547553
let simplifier = ExprSimplifier::new(context);
548554
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
549-
let physical_expr = datafusion::physical_expr::create_physical_expr(
550-
&coerced_expr,
551-
&df_schema,
552-
&props,
553-
)?;
555+
let physical_expr =
556+
create_physical_expr(&coerced_expr, &df_schema, &props, config_options)?;
554557
assert!(physical_expr.evaluate(&batch).is_ok());
555558

556559
// 3. Type coercion with `TypeCoercionRewriter`.
557560
let coerced_expr = expr
558561
.clone()
559562
.rewrite(&mut TypeCoercionRewriter::new(&df_schema))?
560563
.data;
561-
let physical_expr = datafusion::physical_expr::create_physical_expr(
562-
&coerced_expr,
563-
&df_schema,
564-
&props,
565-
)?;
564+
let physical_expr =
565+
create_physical_expr(&coerced_expr, &df_schema, &props, config_options)?;
566566
assert!(physical_expr.evaluate(&batch).is_ok());
567567

568568
// 4. Apply explicit type coercion by manually rewriting the expression
@@ -586,11 +586,8 @@ fn type_coercion_demo() -> Result<()> {
586586
}
587587
})?
588588
.data;
589-
let physical_expr = datafusion::physical_expr::create_physical_expr(
590-
&coerced_expr,
591-
&df_schema,
592-
&props,
593-
)?;
589+
let physical_expr =
590+
create_physical_expr(&coerced_expr, &df_schema, &props, config_options)?;
594591
assert!(physical_expr.evaluate(&batch).is_ok());
595592

596593
Ok(())

datafusion-examples/examples/planner_api.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::error::Result;
19+
use datafusion::execution::session_state::SessionStateOptimizerConfig;
1920
use datafusion::logical_expr::{LogicalPlan, PlanType};
2021
use datafusion::physical_plan::{displayable, DisplayFormatType};
2122
use datafusion::physical_planner::DefaultPhysicalPlanner;
@@ -97,17 +98,19 @@ async fn to_physical_plan_step_by_step_demo(
9798
ctx: &SessionContext,
9899
) -> Result<()> {
99100
// First analyze the logical plan
100-
let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
101+
let session_state = ctx.state();
102+
let analyzed_logical_plan = session_state.analyzer().execute_and_check(
101103
input,
102-
ctx.state().config_options(),
104+
session_state.config_options(),
103105
|_, _| (),
104106
)?;
105107
println!("Analyzed logical plan:\n\n{:?}\n\n", analyzed_logical_plan);
106108

107109
// Optimize the analyzed logical plan
108-
let optimized_logical_plan = ctx.state().optimizer().optimize(
110+
let session_optimizer_config = SessionStateOptimizerConfig::new(&session_state);
111+
let optimized_logical_plan = session_state.optimizer().optimize(
109112
analyzed_logical_plan,
110-
&ctx.state(),
113+
&session_optimizer_config,
111114
|_, _| (),
112115
)?;
113116
println!(
@@ -116,10 +119,9 @@ async fn to_physical_plan_step_by_step_demo(
116119
);
117120

118121
// Create the physical plan
119-
let physical_plan = ctx
120-
.state()
122+
let physical_plan = session_state
121123
.query_planner()
122-
.create_physical_plan(&optimized_logical_plan, &ctx.state())
124+
.create_physical_plan(&optimized_logical_plan, &session_state)
123125
.await?;
124126
println!(
125127
"Final physical plan:\n\n{}\n\n",
@@ -139,7 +141,7 @@ async fn to_physical_plan_step_by_step_demo(
139141
// on DefaultPhysicalPlanner. Not all planners will provide this feature.
140142
let planner = DefaultPhysicalPlanner::default();
141143
let physical_plan =
142-
planner.optimize_physical_plan(physical_plan, &ctx.state(), |_, _| {})?;
144+
planner.optimize_physical_plan(physical_plan, &session_state, |_, _| {})?;
143145
println!(
144146
"Optimized physical plan:\n\n{}\n\n",
145147
displayable(physical_plan.as_ref())

datafusion-examples/examples/pruning.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020

2121
use arrow::array::{ArrayRef, BooleanArray, Int32Array};
2222
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
23+
use datafusion::common::config::ConfigOptions;
2324
use datafusion::common::{DFSchema, ScalarValue};
2425
use datafusion::execution::context::ExecutionProps;
2526
use datafusion::physical_expr::create_physical_expr;
@@ -188,7 +189,9 @@ impl PruningStatistics for MyCatalog {
188189
fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate {
189190
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
190191
let props = ExecutionProps::new();
191-
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
192+
let config_options = ConfigOptions::default_singleton_arc();
193+
let physical_expr =
194+
create_physical_expr(&expr, &df_schema, &props, config_options).unwrap();
192195
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
193196
}
194197

datafusion-examples/examples/simple_udtf.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use datafusion::arrow::record_batch::RecordBatch;
2323
use datafusion::catalog::Session;
2424
use datafusion::catalog::TableFunctionImpl;
2525
use datafusion::common::{plan_err, ScalarValue};
26+
use datafusion::config::ConfigOptions;
2627
use datafusion::datasource::memory::MemorySourceConfig;
2728
use datafusion::datasource::TableProvider;
2829
use datafusion::error::Result;
@@ -142,7 +143,8 @@ impl TableFunctionImpl for LocalCsvTableFunc {
142143
.map(|expr| {
143144
// try to simplify the expression, so 1+2 becomes 3, for example
144145
let execution_props = ExecutionProps::new();
145-
let info = SimplifyContext::new(&execution_props);
146+
let config_options = ConfigOptions::default_singleton_arc();
147+
let info = SimplifyContext::new(&execution_props, config_options);
146148
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;
147149

148150
if let Expr::Literal(ScalarValue::Int64(Some(limit))) = expr {

datafusion/catalog-listing/src/helpers.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ use arrow::{
3333
datatypes::{DataType, Field, Fields, Schema},
3434
record_batch::RecordBatch,
3535
};
36-
use datafusion_expr::execution_props::ExecutionProps;
36+
3737
use futures::stream::FuturesUnordered;
3838
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
3939
use log::{debug, trace};
4040

41+
use datafusion_common::config::ConfigOptions;
4142
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
4243
use datafusion_common::{Column, DFSchema, DataFusionError};
44+
use datafusion_expr::execution_props::ExecutionProps;
4345
use datafusion_expr::{Expr, Volatility};
4446
use datafusion_physical_expr::create_physical_expr;
4547
use object_store::path::Path;
@@ -242,6 +244,7 @@ async fn prune_partitions(
242244
partitions: Vec<Partition>,
243245
filters: &[Expr],
244246
partition_cols: &[(String, DataType)],
247+
config_options: &Arc<ConfigOptions>,
245248
) -> Result<Vec<Partition>> {
246249
if filters.is_empty() {
247250
return Ok(partitions);
@@ -293,7 +296,7 @@ async fn prune_partitions(
293296

294297
// Applies `filter` to `batch` returning `None` on error
295298
let do_filter = |filter| -> Result<ArrayRef> {
296-
let expr = create_physical_expr(filter, &df_schema, &props)?;
299+
let expr = create_physical_expr(filter, &df_schema, &props, config_options)?;
297300
expr.evaluate(&batch)?.into_array(partitions.len())
298301
};
299302

@@ -412,6 +415,7 @@ pub async fn pruned_partition_list<'a>(
412415
filters: &'a [Expr],
413416
file_extension: &'a str,
414417
partition_cols: &'a [(String, DataType)],
418+
config_options: &Arc<ConfigOptions>,
415419
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
416420
// if no partition col => simply list all the files
417421
if partition_cols.is_empty() {
@@ -436,8 +440,14 @@ pub async fn pruned_partition_list<'a>(
436440
.await?;
437441
debug!("Listed {} partitions", partitions.len());
438442

439-
let pruned =
440-
prune_partitions(table_path, partitions, filters, partition_cols).await?;
443+
let pruned = prune_partitions(
444+
table_path,
445+
partitions,
446+
filters,
447+
partition_cols,
448+
config_options,
449+
)
450+
.await?;
441451

442452
debug!("Pruning yielded {} partitions", pruned.len());
443453

@@ -605,6 +615,7 @@ mod tests {
605615
&[filter],
606616
".parquet",
607617
&[(String::from("mypartition"), DataType::Utf8)],
618+
&Arc::clone(ConfigOptions::default_singleton_arc()),
608619
)
609620
.await
610621
.expect("partition pruning failed")
@@ -630,6 +641,7 @@ mod tests {
630641
&[filter],
631642
".parquet",
632643
&[(String::from("mypartition"), DataType::Utf8)],
644+
&Arc::clone(ConfigOptions::default_singleton_arc()),
633645
)
634646
.await
635647
.expect("partition pruning failed")
@@ -673,6 +685,7 @@ mod tests {
673685
(String::from("part1"), DataType::Utf8),
674686
(String::from("part2"), DataType::Utf8),
675687
],
688+
&Arc::clone(ConfigOptions::default_singleton_arc()),
676689
)
677690
.await
678691
.expect("partition pruning failed")
@@ -1016,10 +1029,17 @@ mod tests {
10161029
.unwrap();
10171030
}
10181031

1019-
(Arc::new(memory), Arc::new(MockSession {}))
1032+
(
1033+
Arc::new(memory),
1034+
Arc::new(MockSession {
1035+
config: SessionConfig::new(),
1036+
}),
1037+
)
10201038
}
10211039

1022-
struct MockSession {}
1040+
struct MockSession {
1041+
config: SessionConfig,
1042+
}
10231043

10241044
#[async_trait]
10251045
impl Session for MockSession {
@@ -1028,7 +1048,7 @@ mod tests {
10281048
}
10291049

10301050
fn config(&self) -> &SessionConfig {
1031-
unimplemented!()
1051+
&self.config
10321052
}
10331053

10341054
async fn create_physical_plan(

datafusion/common/src/config.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::collections::{BTreeMap, HashMap};
2222
use std::error::Error;
2323
use std::fmt::{self, Display};
2424
use std::str::FromStr;
25+
use std::sync::{Arc, LazyLock};
2526

2627
use crate::error::_config_err;
2728
use crate::parsers::CompressionTypeVariant;
@@ -724,7 +725,7 @@ config_namespace! {
724725
}
725726

726727
/// A key value pair, with a corresponding description
727-
#[derive(Debug)]
728+
#[derive(Debug, Hash, PartialEq, Eq)]
728729
pub struct ConfigEntry {
729730
/// A unique string to identify this config value
730731
pub key: String,
@@ -777,7 +778,20 @@ impl ConfigField for ConfigOptions {
777778
}
778779
}
779780

781+
static CONFIG_OPTIONS_SINGLETON: LazyLock<Arc<ConfigOptions>> =
782+
LazyLock::new(|| Arc::new(ConfigOptions::default()));
783+
780784
impl ConfigOptions {
785+
/// this is a static singleton to be used for testing only where the default values are sufficient
786+
pub fn default_singleton() -> &'static ConfigOptions {
787+
CONFIG_OPTIONS_SINGLETON.as_ref()
788+
}
789+
790+
/// this is a static singleton to be used for testing only where the default values are sufficient
791+
pub fn default_singleton_arc() -> &'static Arc<ConfigOptions> {
792+
&CONFIG_OPTIONS_SINGLETON
793+
}
794+
781795
/// Creates a new [`ConfigOptions`] with default values
782796
pub fn new() -> Self {
783797
Self::default()

datafusion/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ zstd = { version = "0.13", optional = true, default-features = false }
144144

145145
[dev-dependencies]
146146
async-trait = { workspace = true }
147-
criterion = { workspace = true, features = ["async_tokio"] }
147+
criterion = { workspace = true, features = ["async_tokio", "async_futures"] }
148148
ctor = { workspace = true }
149149
dashmap = "6.1.0"
150150
datafusion-doc = { workspace = true }

0 commit comments

Comments
 (0)