Skip to content

Commit 0c4e4a1

Browse files
Minor Fix for Logical and Physical Expr Conversions (#11142)
* Minor * Update planner.rs
1 parent b468ba7 commit 0c4e4a1

File tree

7 files changed

+45
-69
lines changed

7 files changed

+45
-69
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -796,17 +796,15 @@ mod tests {
796796
ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
797797
StructArray,
798798
};
799-
800799
use arrow::datatypes::{Field, Schema, SchemaBuilder};
801800
use arrow::record_batch::RecordBatch;
802801
use arrow_schema::Fields;
803-
use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue, ToDFSchema};
804-
use datafusion_expr::execution_props::ExecutionProps;
802+
use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue};
805803
use datafusion_expr::{col, lit, when, Expr};
806-
use datafusion_physical_expr::create_physical_expr;
804+
use datafusion_physical_expr::planner::logical2physical;
805+
use datafusion_physical_plan::ExecutionPlanProperties;
807806

808807
use chrono::{TimeZone, Utc};
809-
use datafusion_physical_plan::ExecutionPlanProperties;
810808
use futures::StreamExt;
811809
use object_store::local::LocalFileSystem;
812810
use object_store::path::Path;
@@ -2061,12 +2059,6 @@ mod tests {
20612059
Ok(())
20622060
}
20632061

2064-
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
2065-
let df_schema = schema.clone().to_dfschema().unwrap();
2066-
let execution_props = ExecutionProps::new();
2067-
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
2068-
}
2069-
20702062
#[tokio::test]
20712063
async fn test_struct_filter_parquet() -> Result<()> {
20722064
let tmp_dir = TempDir::new()?;

datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -410,23 +410,20 @@ pub fn build_row_filter(
410410

411411
#[cfg(test)]
412412
mod test {
413-
use arrow::datatypes::Field;
414-
use arrow_schema::TimeUnit::Nanosecond;
415-
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
416-
use parquet::arrow::parquet_to_arrow_schema;
417-
use parquet::file::reader::{FileReader, SerializedFileReader};
418-
use rand::prelude::*;
419-
413+
use super::*;
420414
use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
421415
use crate::datasource::schema_adapter::SchemaAdapterFactory;
422416

423-
use datafusion_common::ToDFSchema;
424-
use datafusion_expr::execution_props::ExecutionProps;
417+
use arrow::datatypes::Field;
418+
use arrow_schema::TimeUnit::Nanosecond;
425419
use datafusion_expr::{cast, col, lit, Expr};
426-
use datafusion_physical_expr::create_physical_expr;
420+
use datafusion_physical_expr::planner::logical2physical;
427421
use datafusion_physical_plan::metrics::{Count, Time};
428422

429-
use super::*;
423+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
424+
use parquet::arrow::parquet_to_arrow_schema;
425+
use parquet::file::reader::{FileReader, SerializedFileReader};
426+
use rand::prelude::*;
430427

431428
// We should ignore predicate that read non-primitive columns
432429
#[test]
@@ -590,10 +587,4 @@ mod test {
590587
assert_eq!(projection, remapped)
591588
}
592589
}
593-
594-
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
595-
let df_schema = schema.clone().to_dfschema().unwrap();
596-
let execution_props = ExecutionProps::new();
597-
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
598-
}
599590
}

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -404,15 +404,19 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
404404

405405
#[cfg(test)]
406406
mod tests {
407+
use std::ops::Rem;
408+
use std::sync::Arc;
409+
407410
use super::*;
408411
use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
409412
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
413+
410414
use arrow::datatypes::DataType::Decimal128;
411415
use arrow::datatypes::{DataType, Field};
412-
use datafusion_common::{Result, ToDFSchema};
413-
use datafusion_expr::execution_props::ExecutionProps;
416+
use datafusion_common::Result;
414417
use datafusion_expr::{cast, col, lit, Expr};
415-
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
418+
use datafusion_physical_expr::planner::logical2physical;
419+
416420
use parquet::arrow::arrow_to_parquet_schema;
417421
use parquet::arrow::async_reader::ParquetObjectReader;
418422
use parquet::basic::LogicalType;
@@ -422,8 +426,6 @@ mod tests {
422426
basic::Type as PhysicalType, file::statistics::Statistics as ParquetStatistics,
423427
schema::types::SchemaDescPtr,
424428
};
425-
use std::ops::Rem;
426-
use std::sync::Arc;
427429

428430
struct PrimitiveTypeField {
429431
name: &'static str,
@@ -1111,12 +1113,6 @@ mod tests {
11111113
ParquetFileMetrics::new(0, "file.parquet", &metrics)
11121114
}
11131115

1114-
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
1115-
let df_schema = schema.clone().to_dfschema().unwrap();
1116-
let execution_props = ExecutionProps::new();
1117-
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
1118-
}
1119-
11201116
#[tokio::test]
11211117
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
11221118
BloomFilterTest::new_data_index_bloom_encoding_stats()

datafusion/core/src/physical_optimizer/pruning.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,22 +1555,22 @@ pub(crate) enum StatisticsType {
15551555

15561556
#[cfg(test)]
15571557
mod tests {
1558+
use std::collections::HashMap;
1559+
use std::ops::{Not, Rem};
1560+
15581561
use super::*;
15591562
use crate::assert_batches_eq;
15601563
use crate::logical_expr::{col, lit};
1564+
15611565
use arrow::array::Decimal128Array;
15621566
use arrow::{
15631567
array::{BinaryArray, Int32Array, Int64Array, StringArray},
15641568
datatypes::TimeUnit,
15651569
};
15661570
use arrow_array::UInt64Array;
1567-
use datafusion_common::ToDFSchema;
1568-
use datafusion_expr::execution_props::ExecutionProps;
15691571
use datafusion_expr::expr::InList;
15701572
use datafusion_expr::{cast, is_null, try_cast, Expr};
1571-
use datafusion_physical_expr::create_physical_expr;
1572-
use std::collections::HashMap;
1573-
use std::ops::{Not, Rem};
1573+
use datafusion_physical_expr::planner::logical2physical;
15741574

15751575
#[derive(Debug, Default)]
15761576
/// Mock statistic provider for tests
@@ -3876,10 +3876,4 @@ mod tests {
38763876
let expr = logical2physical(expr, schema);
38773877
build_predicate_expression(&expr, schema, required_columns)
38783878
}
3879-
3880-
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
3881-
let df_schema = schema.clone().to_dfschema().unwrap();
3882-
let execution_props = ExecutionProps::new();
3883-
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
3884-
}
38853879
}

datafusion/physical-expr-common/src/aggregate/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
211211
/// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent
212212
/// with the return value of the [`AggregateExpr::all_expressions`] method.
213213
/// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
214+
/// TODO: This method only rewrites the [`PhysicalExpr`]s and does not handle [`Expr`]s.
215+
/// This can cause silent bugs and should be fixed in the future (possibly with physical-to-logical
216+
/// conversions).
214217
fn with_new_expressions(
215218
&self,
216219
_args: Vec<Arc<dyn PhysicalExpr>>,

datafusion/physical-expr/src/planner.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,22 @@
1717

1818
use std::sync::Arc;
1919

20-
use arrow::datatypes::Schema;
20+
use crate::scalar_function;
21+
use crate::{
22+
expressions::{self, binary, like, Column, Literal},
23+
PhysicalExpr,
24+
};
2125

26+
use arrow::datatypes::Schema;
2227
use datafusion_common::{
23-
exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue,
28+
exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
2429
};
2530
use datafusion_expr::execution_props::ExecutionProps;
2631
use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction};
2732
use datafusion_expr::var_provider::is_system_variables;
2833
use datafusion_expr::var_provider::VarType;
2934
use datafusion_expr::{binary_expr, Between, BinaryExpr, Expr, Like, Operator, TryCast};
3035

31-
use crate::scalar_function;
32-
use crate::{
33-
expressions::{self, binary, like, Column, Literal},
34-
PhysicalExpr,
35-
};
36-
3736
/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
3837
/// AS int)`.
3938
///
@@ -358,6 +357,13 @@ where
358357
.collect::<Result<Vec<_>>>()
359358
}
360359

360+
/// Convert a logical expression to a physical expression (without any simplification, etc)
361+
pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
362+
let df_schema = schema.clone().to_dfschema().unwrap();
363+
let execution_props = ExecutionProps::new();
364+
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
365+
}
366+
361367
#[cfg(test)]
362368
mod tests {
363369
use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};

datafusion/physical-expr/src/utils/guarantee.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -419,15 +419,16 @@ impl<'a> ColOpLit<'a> {
419419

420420
#[cfg(test)]
421421
mod test {
422+
use std::sync::OnceLock;
423+
422424
use super::*;
423-
use crate::create_physical_expr;
425+
use crate::planner::logical2physical;
426+
424427
use arrow_schema::{DataType, Field, Schema, SchemaRef};
425-
use datafusion_common::ToDFSchema;
426-
use datafusion_expr::execution_props::ExecutionProps;
427428
use datafusion_expr::expr_fn::*;
428429
use datafusion_expr::{lit, Expr};
430+
429431
use itertools::Itertools;
430-
use std::sync::OnceLock;
431432

432433
#[test]
433434
fn test_literal() {
@@ -867,13 +868,6 @@ mod test {
867868
LiteralGuarantee::try_new(column, Guarantee::NotIn, literals.iter()).unwrap()
868869
}
869870

870-
/// Convert a logical expression to a physical expression (without any simplification, etc)
871-
fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
872-
let df_schema = schema.clone().to_dfschema().unwrap();
873-
let execution_props = ExecutionProps::new();
874-
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
875-
}
876-
877871
// Schema for testing
878872
fn schema() -> SchemaRef {
879873
SCHEMA

0 commit comments

Comments
 (0)