Skip to content

Commit 453a45a

Browse files
authored
fix: issue #8838 discard extra sort when sorted element is wrapped (#9127)
* fix: issue #8838 discard extra sort when sorted element is wrapped fix: issue #8838 discard extra sort when sorted element is wrapped fix: issue #8838 discard extra sort when sorted element is wrapped * fix bugs * fix bugs * fix bugs * fix:bugs * adding tests * adding cast UTF8 type and diable scalarfunction situation * fix typo
1 parent fc84a63 commit 453a45a

File tree

4 files changed

+209
-16
lines changed

4 files changed

+209
-16
lines changed

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::expressions::CastExpr;
19+
use arrow_schema::SchemaRef;
20+
use datafusion_common::{JoinSide, JoinType};
21+
use indexmap::IndexSet;
22+
use itertools::Itertools;
1823
use std::collections::{HashMap, HashSet};
1924
use std::hash::{Hash, Hasher};
2025
use std::sync::Arc;
@@ -31,12 +36,8 @@ use crate::{
3136
PhysicalSortRequirement,
3237
};
3338

34-
use arrow_schema::{SchemaRef, SortOptions};
39+
use arrow_schema::SortOptions;
3540
use datafusion_common::tree_node::{Transformed, TreeNode};
36-
use datafusion_common::{JoinSide, JoinType};
37-
38-
use indexmap::IndexSet;
39-
use itertools::Itertools;
4041

4142
/// A `EquivalenceProperties` object stores useful information related to a schema.
4243
/// Currently, it keeps track of:
@@ -426,6 +427,87 @@ impl EquivalenceProperties {
426427
(!meet.is_empty()).then_some(meet)
427428
}
428429

430+
/// we substitute the ordering according to input expression type, this is a simplified version
431+
/// In this case, we just substitute when the expression satisfy the following confition
432+
/// I. just have one column and is a CAST expression
433+
/// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic
434+
/// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if
435+
/// x is DESC or ASC
436+
pub fn substitute_ordering_component(
437+
matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
438+
sort_expr: &[PhysicalSortExpr],
439+
schema: SchemaRef,
440+
) -> Vec<PhysicalSortExpr> {
441+
sort_expr
442+
.iter()
443+
.filter(|sort_expr| {
444+
matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
445+
})
446+
.map(|sort_expr| {
447+
let referring_exprs: Vec<_> = matching_exprs
448+
.iter()
449+
.filter(|matched| expr_refers(matched, &sort_expr.expr))
450+
.cloned()
451+
.collect();
452+
// does not referring to any matching component, we just skip it
453+
454+
if referring_exprs.len() == 1 {
455+
// we check whether this expression is substitutable or not
456+
let r_expr = referring_exprs[0].clone();
457+
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
458+
// we need to know whether the Cast Expr matches or not
459+
let expr_type =
460+
sort_expr.expr.data_type(schema.as_ref()).unwrap();
461+
if cast_expr.expr.eq(&sort_expr.expr)
462+
&& cast_expr.is_bigger_cast(expr_type)
463+
{
464+
PhysicalSortExpr {
465+
expr: r_expr.clone(),
466+
options: sort_expr.options,
467+
}
468+
} else {
469+
sort_expr.clone()
470+
}
471+
} else {
472+
sort_expr.clone()
473+
}
474+
} else {
475+
sort_expr.clone()
476+
}
477+
})
478+
.collect()
479+
}
480+
/// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
481+
/// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
482+
/// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
483+
/// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
484+
/// dependency map, happen in issue 8838: <https://github.com/apache/arrow-datafusion/issues/8838>
485+
pub fn substitute_oeq_class(
486+
&mut self,
487+
exprs: &[(Arc<dyn PhysicalExpr>, String)],
488+
mapping: &ProjectionMapping,
489+
schema: SchemaRef,
490+
) {
491+
let matching_exprs: Arc<Vec<_>> = Arc::new(
492+
exprs
493+
.iter()
494+
.filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr)))
495+
.map(|(source, _)| source)
496+
.collect(),
497+
);
498+
let orderings = std::mem::take(&mut self.oeq_class.orderings);
499+
let new_order = orderings
500+
.into_iter()
501+
.map(move |order| {
502+
Self::substitute_ordering_component(
503+
matching_exprs.clone(),
504+
&order,
505+
schema.clone(),
506+
)
507+
})
508+
.collect();
509+
self.oeq_class = OrderingEquivalenceClass::new(new_order);
510+
}
429511
/// Projects argument `expr` according to `projection_mapping`, taking
430512
/// equivalences into account.
431513
///
@@ -564,7 +646,6 @@ impl EquivalenceProperties {
564646

565647
// Get dependency map for existing orderings:
566648
let dependency_map = self.construct_dependency_map(&mapping);
567-
568649
let orderings = mapping.iter().flat_map(|(source, target)| {
569650
referred_dependencies(&dependency_map, source)
570651
.into_iter()

datafusion/physical-expr/src/expressions/cast.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::physical_expr::down_cast_any_ref;
19+
use crate::sort_properties::SortProperties;
20+
use crate::PhysicalExpr;
1821
use std::any::Any;
1922
use std::fmt;
2023
use std::hash::{Hash, Hasher};
2124
use std::sync::Arc;
22-
23-
use crate::physical_expr::down_cast_any_ref;
24-
use crate::sort_properties::SortProperties;
25-
use crate::PhysicalExpr;
25+
use DataType::*;
2626

2727
use arrow::compute::{can_cast_types, kernels, CastOptions};
2828
use arrow::datatypes::{DataType, Schema};
@@ -41,7 +41,7 @@ const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
4141
#[derive(Debug, Clone)]
4242
pub struct CastExpr {
4343
/// The expression to cast
44-
expr: Arc<dyn PhysicalExpr>,
44+
pub expr: Arc<dyn PhysicalExpr>,
4545
/// The data type to cast to
4646
cast_type: DataType,
4747
/// Cast options
@@ -76,6 +76,26 @@ impl CastExpr {
7676
pub fn cast_options(&self) -> &CastOptions<'static> {
7777
&self.cast_options
7878
}
79+
pub fn is_bigger_cast(&self, src: DataType) -> bool {
80+
if src == self.cast_type {
81+
return true;
82+
}
83+
matches!(
84+
(src, &self.cast_type),
85+
(Int8, Int16 | Int32 | Int64)
86+
| (Int16, Int32 | Int64)
87+
| (Int32, Int64)
88+
| (UInt8, UInt16 | UInt32 | UInt64)
89+
| (UInt16, UInt32 | UInt64)
90+
| (UInt32, UInt64)
91+
| (
92+
Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
93+
Float32 | Float64
94+
)
95+
| (Int64 | UInt64, Float64)
96+
| (Utf8, LargeUtf8)
97+
)
98+
}
7999
}
80100

81101
impl fmt::Display for CastExpr {

datafusion/physical-plan/src/projection.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ impl ProjectionExec {
7070
input: Arc<dyn ExecutionPlan>,
7171
) -> Result<Self> {
7272
let input_schema = input.schema();
73-
7473
let fields: Result<Vec<Field>> = expr
7574
.iter()
7675
.map(|(e, name)| {
@@ -95,7 +94,10 @@ impl ProjectionExec {
9594
// construct a map from the input expressions to the output expression of the Projection
9695
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;
9796

98-
let input_eqs = input.equivalence_properties();
97+
let mut input_eqs = input.equivalence_properties();
98+
99+
input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone());
100+
99101
let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
100102
let output_ordering = project_eqs.oeq_class().output_ordering();
101103

@@ -201,9 +203,13 @@ impl ExecutionPlan for ProjectionExec {
201203
}
202204

203205
fn equivalence_properties(&self) -> EquivalenceProperties {
204-
self.input
205-
.equivalence_properties()
206-
.project(&self.projection_mapping, self.schema())
206+
let mut equi_properties = self.input.equivalence_properties();
207+
equi_properties.substitute_oeq_class(
208+
&self.expr,
209+
&self.projection_mapping,
210+
self.input.schema().clone(),
211+
);
212+
equi_properties.project(&self.projection_mapping, self.schema())
207213
}
208214

209215
fn with_new_children(
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# prepare the table
19+
statement ok
20+
CREATE EXTERNAL TABLE delta_encoding_required_column (
21+
c_customer_sk INT NOT NULL,
22+
c_current_cdemo_sk INT NOT NULL
23+
)
24+
STORED AS CSV
25+
WITH ORDER (
26+
c_customer_sk DESC,
27+
c_current_cdemo_sk DESC
28+
)
29+
LOCATION '../../testing/data/csv/aggregate_test_100.csv';
30+
31+
# test for substitute CAST senario
32+
query TT
33+
EXPLAIN
34+
SELECT
35+
CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big,
36+
c_current_cdemo_sk
37+
FROM delta_encoding_required_column
38+
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
39+
----
40+
logical_plan
41+
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
42+
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
43+
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
44+
physical_plan
45+
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
46+
--ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
47+
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
48+
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false
49+
50+
# test for commom rename
51+
query TT
52+
EXPLAIN
53+
SELECT
54+
c_customer_sk AS c_customer_sk_big,
55+
c_current_cdemo_sk
56+
FROM delta_encoding_required_column
57+
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
58+
----
59+
logical_plan
60+
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
61+
--Projection: delta_encoding_required_column.c_customer_sk AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
62+
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
63+
physical_plan
64+
ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
65+
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false
66+
67+
68+
# test for cast Utf8
69+
query TT
70+
EXPLAIN
71+
SELECT
72+
CAST(c_customer_sk AS STRING) AS c_customer_sk_big,
73+
c_current_cdemo_sk
74+
FROM delta_encoding_required_column
75+
ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
76+
----
77+
logical_plan
78+
Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
79+
--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
80+
----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk]
81+
physical_plan
82+
SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
83+
--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
84+
----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk]
85+
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
86+
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false

0 commit comments

Comments
 (0)