Skip to content

Commit

Permalink
Improve documentation and add Display impl to EquivalenceProperties
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 23, 2024
1 parent 21ec332 commit 825e4fc
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 14 deletions.
19 changes: 19 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,22 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
any
}
}

/// Writes a list of [`PhysicalExpr`]s to a `std::fmt::Formatter`.
///
/// Example output: `[a + 1, b]`
pub fn format_physical_expr_list(
f: &mut std::fmt::Formatter<'_>,
ordering: &[Arc<dyn PhysicalExpr>],
) -> std::fmt::Result {
let mut iter = ordering.iter();
write!(f, "[")?;
if let Some(expr) = iter.next() {
write!(f, "{}", expr)?;
}
for expr in iter {
write!(f, ", {}", expr)?;
}
write!(f, "]")?;
Ok(())
}
37 changes: 37 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::sync::Arc;

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
Expand All @@ -27,6 +28,7 @@ use crate::{

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::JoinType;
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
Expand Down Expand Up @@ -101,6 +103,19 @@ impl ConstExpr {
}
}

/// Display implementation for `ConstExpr`
///
/// Example `c` or `c(across_partitions)`
impl Display for ConstExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.expr)?;
if self.across_partitions {
write!(f, "(across_partitions)")?;
}
Ok(())
}
}

impl From<Arc<dyn PhysicalExpr>> for ConstExpr {
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
Self::new(expr)
Expand Down Expand Up @@ -224,6 +239,14 @@ impl EquivalenceClass {
}
}

impl Display for EquivalenceClass {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "[")?;
format_physical_expr_list(f, &self.exprs)?;
write!(f, "]")
}
}

/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
/// class represents a distinct equivalence class in a relation.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -575,6 +598,20 @@ impl EquivalenceGroup {
}
}

impl Display for EquivalenceGroup {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "[")?;
let mut iter = self.iter();
if let Some(cls) = iter.next() {
write!(f, "{}", cls)?;
}
for cls in iter {
write!(f, ", {}", cls)?;
}
write!(f, "]")
}
}

#[cfg(test)]
mod tests {

Expand Down
19 changes: 17 additions & 2 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::hash::Hash;
use std::sync::Arc;

use arrow_schema::SortOptions;

use crate::equivalence::add_offset_to_expr;
use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
use arrow_schema::SortOptions;

/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following table:
Expand Down Expand Up @@ -219,6 +219,21 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) ->
false
}

impl Display for OrderingEquivalenceClass {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
let mut iter = self.orderings.iter();
if let Some(ordering) = iter.next() {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
}
for ordering in iter {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
}
write!(f, "]")?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
103 changes: 91 additions & 12 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

Expand All @@ -41,11 +42,16 @@ use datafusion_physical_expr_common::utils::ExprPropertiesNode;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;

/// A `EquivalenceProperties` object stores useful information related to a schema.
/// A `EquivalenceProperties` object stores information known about the output
/// of a plan node, that can be used to optimize the plan.
///
/// Currently, it keeps track of:
/// - Equivalent expressions, e.g expressions that have same value.
/// - Valid sort expressions (orderings) for the schema.
/// - Constants expressions (e.g expressions that are known to have constant values).
/// - Sort expressions (orderings)
/// - Equivalent expressions: expressions that are known to have same value.
/// - Constants expressions: expressions that are known to contain a single
/// constant value.
///
/// # Example equivalent sort expressions
///
/// Consider table below:
///
Expand All @@ -60,9 +66,13 @@ use itertools::Itertools;
/// └---┴---┘
/// ```
///
/// where both `a ASC` and `b DESC` can describe the table ordering. With
/// `EquivalenceProperties`, we can keep track of these different valid sort
/// expressions and treat `a ASC` and `b DESC` on an equal footing.
/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
/// `EquivalenceProperties`, tracks these different valid sort expressions and
/// treat `a ASC` and `b DESC` on an equal footing. For example if the query
/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
/// avoided.
///
/// # Example equivalent expressions
///
/// Similarly, consider the table below:
///
Expand All @@ -77,11 +87,38 @@ use itertools::Itertools;
/// └---┴---┘
/// ```
///
/// where columns `a` and `b` always have the same value. We keep track of such
/// equivalences inside this object. With this information, we can optimize
/// things like partitioning. For example, if the partition requirement is
/// `Hash(a)` and output partitioning is `Hash(b)`, then we can deduce that
/// the existing partitioning satisfies the requirement.
/// In this case, columns `a` and `b` always have the same value, which can of
/// such equivalences inside this object. With this information, Datafusion can
/// optimize operations such as. For example, if the partition requirement is
/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids
/// repartitioning the data as the existing partitioning satisfies the
/// requirement.
///
/// # Code Example
/// ```
/// use std::sync::Arc;
/// use arrow_schema::{Schema, Field, DataType, SchemaRef};
/// use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
/// use datafusion_physical_expr::expressions::col;
///
/// let schema: SchemaRef = Arc::new(Schema::new(vec![
/// Field::new("a", DataType::Int32, false),
/// Field::new("b", DataType::Int32, false),
/// Field::new("c", DataType::Int32, false),
/// ]));
///
/// let col_a = col("a", &schema).unwrap();
/// let col_b = col("b", &schema).unwrap();
/// let col_c = col("c", &schema).unwrap();
///
/// // This object represents data that is sorted by a ASC, c DESC
/// // with a single constant value of b
/// let mut eq_properties = EquivalenceProperties::new(schema);
/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
/// eq_properties.add_new_ordering(vec![col_a.asc(), col_c.desc()]);
///
/// assert_eq!(eq_properties.to_string(), "ff")
/// ```
#[derive(Debug, Clone)]
pub struct EquivalenceProperties {
/// Collection of equivalence classes that store expressions with the same
Expand Down Expand Up @@ -193,6 +230,11 @@ impl EquivalenceProperties {
self.oeq_class.add_new_orderings(orderings);
}

/// Adds a single ordering to the existing ordering equivalence class.
pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
self.add_new_orderings([ordering]);
}

/// Incorporates the given equivalence group to into the existing
/// equivalence group within.
pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
Expand Down Expand Up @@ -1049,6 +1091,37 @@ impl EquivalenceProperties {
}
}

/// More readable display version of the `EquivalenceProperties`.
///
/// Format:
/// ```text
/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a = 1]
/// ```
impl Display for EquivalenceProperties {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.eq_group.is_empty()
&& self.oeq_class.is_empty()
&& self.constants.is_empty()
{
return write!(f, "No properties");
}
if !self.oeq_class.is_empty() {
write!(f, "order: {}", self.oeq_class)?;
}
if !self.eq_group.is_empty() {
write!(f, ", eq: {}", self.eq_group)?;
}
if !self.constants.is_empty() {
write!(f, ", const: [")?;
for c in &self.constants {
write!(f, ", {}", c)?;
}
write!(f, "]")?;
}
Ok(())
}
}

/// Calculates the properties of a given [`ExprPropertiesNode`].
///
/// Order information can be retrieved as:
Expand Down Expand Up @@ -2600,6 +2673,12 @@ mod tests {
Ok(())
}

/// Return a new schema with the same types, but new field names
///
/// The new field names are the old field names with `text` appended.
///
/// For example, the schema "a", "b", "c" becomes "a1", "b1", "c1"
/// if `text` is "1".
fn append_fields(schema: &SchemaRef, text: &str) -> SchemaRef {
Arc::new(Schema::new(
schema
Expand Down

0 comments on commit 825e4fc

Please sign in to comment.