Skip to content

Commit

Permalink
Add Docs and Examples to PhysicalSortExpr, as well as asc and `desc…
Browse files Browse the repository at this point in the history
…` methods (#12589)
  • Loading branch information
alamb committed Sep 23, 2024
1 parent b00723a commit 04895c4
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 28 deletions.
21 changes: 7 additions & 14 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,20 +1252,13 @@ mod tests {
col("int_col").sort(false, true),
]],
Ok(vec![vec![
PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: physical_col("int_col", &schema).unwrap(),
options: SortOptions {
descending: true,
nulls_first: true,
},
},
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),

PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
]])
),
];
Expand Down
76 changes: 75 additions & 1 deletion datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,51 @@ use datafusion_common::Result;
use datafusion_expr_common::columnar_value::ColumnarValue;

/// Represents Sort operation for a column in a RecordBatch
///
/// Example:
/// ```
/// # use std::any::Any;
/// # use std::fmt::Display;
/// # use std::hash::Hasher;
/// # use std::sync::Arc;
/// # use arrow::array::RecordBatch;
/// # use datafusion_common::Result;
/// # use arrow::compute::SortOptions;
/// # use arrow::datatypes::{DataType, Schema};
/// # use datafusion_expr_common::columnar_value::ColumnarValue;
/// # use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
/// # use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
/// # // this crate doesn't have a physical expression implementation
/// # // so make a really simple one
/// # #[derive(Clone, Debug, PartialEq, Eq, Hash)]
/// # struct MyPhysicalExpr;
/// # impl PhysicalExpr for MyPhysicalExpr {
/// # fn as_any(&self) -> &dyn Any {todo!() }
/// # fn data_type(&self, input_schema: &Schema) -> Result<DataType> {todo!()}
/// # fn nullable(&self, input_schema: &Schema) -> Result<bool> {todo!() }
/// # fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {todo!() }
/// # fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {todo!()}
/// # fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn PhysicalExpr>> {todo!()}
/// # fn dyn_hash(&self, _state: &mut dyn Hasher) {todo!()}
/// # }
/// # impl Display for MyPhysicalExpr {
/// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "a") }
/// # }
/// # impl PartialEq<dyn Any> for MyPhysicalExpr {
/// # fn eq(&self, _other: &dyn Any) -> bool { true }
/// # }
/// # fn col(name: &str) -> Arc<dyn PhysicalExpr> { Arc::new(MyPhysicalExpr) }
/// // Sort by a ASC
/// let options = SortOptions::default();
/// let sort_expr = PhysicalSortExpr::new(col("a"), options);
/// assert_eq!(sort_expr.to_string(), "a ASC");
///
/// // Sort by a DESC NULLS LAST
/// let sort_expr = PhysicalSortExpr::new_default(col("a"))
/// .desc()
/// .nulls_last();
/// assert_eq!(sort_expr.to_string(), "a DESC NULLS LAST");
/// ```
#[derive(Clone, Debug)]
pub struct PhysicalSortExpr {
/// Physical expression representing the column to sort
Expand All @@ -43,6 +88,35 @@ impl PhysicalSortExpr {
pub fn new(expr: Arc<dyn PhysicalExpr>, options: SortOptions) -> Self {
Self { expr, options }
}

/// Create a new PhysicalSortExpr with default [`SortOptions`]
pub fn new_default(expr: Arc<dyn PhysicalExpr>) -> Self {
Self::new(expr, SortOptions::default())
}

/// Set the sort sort options to ASC
pub fn asc(mut self) -> Self {
self.options.descending = false;
self
}

/// Set the sort sort options to DESC
pub fn desc(mut self) -> Self {
self.options.descending = true;
self
}

/// Set the sort sort options to NULLS FIRST
pub fn nulls_first(mut self) -> Self {
self.options.nulls_first = true;
self
}

/// Set the sort sort options to NULLS LAST
pub fn nulls_last(mut self) -> Self {
self.options.nulls_first = false;
self
}
}

impl PartialEq for PhysicalSortExpr {
Expand All @@ -60,7 +134,7 @@ impl Hash for PhysicalSortExpr {
}
}

impl std::fmt::Display for PhysicalSortExpr {
impl Display for PhysicalSortExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} {}", self.expr, to_str(&self.options))
}
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,9 +1022,8 @@ mod tests {
impl SortedUnboundedExec {
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let mut eq_properties = EquivalenceProperties::new(schema);
eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new(
eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new_default(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)]]);
let mode = ExecutionMode::Unbounded;
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode)
Expand Down Expand Up @@ -1560,10 +1559,9 @@ mod tests {
cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
};
let mut plan = SortExec::new(
vec![PhysicalSortExpr::new(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)],
vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
"c1", 0,
)))],
Arc::new(source),
);
plan = plan.with_fetch(Some(9));
Expand Down
11 changes: 4 additions & 7 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,9 +1174,7 @@ mod tests {
let mut eq_properties = EquivalenceProperties::new(schema);
eq_properties.add_new_orderings(vec![columns
.iter()
.map(|expr| {
PhysicalSortExpr::new(Arc::clone(expr), SortOptions::default())
})
.map(|expr| PhysicalSortExpr::new_default(Arc::clone(expr)))
.collect::<Vec<_>>()]);
let mode = ExecutionMode::Unbounded;
PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3), mode)
Expand Down Expand Up @@ -1286,10 +1284,9 @@ mod tests {
congestion_cleared: Arc::new(Mutex::new(false)),
};
let spm = SortPreservingMergeExec::new(
vec![PhysicalSortExpr::new(
Arc::new(Column::new("c1", 0)),
SortOptions::default(),
)],
vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
"c1", 0,
)))],
Arc::new(source),
);
let spm_task = SpawnedTask::spawn(collect(Arc::new(spm), task_ctx));
Expand Down

0 comments on commit 04895c4

Please sign in to comment.