diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index d5367ad34163..5bdbb3d8609c 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -48,6 +48,14 @@ make_udaf_function!( first_value_udaf ); +make_udaf_function!( + LastValue, + last_value, + value, + "Returns the last value in a group of values.", + last_value_udaf +); + pub struct FirstValue { signature: Signature, aliases: Vec, @@ -514,6 +522,116 @@ impl PartialEq for FirstValuePhysicalExpr { } } +pub struct LastValue { + signature: Signature, + aliases: Vec, +} + +impl Debug for LastValue { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("LastValue") + .field("name", &self.name()) + .field("signature", &self.signature) + .field("accumulator", &"") + .finish() + } +} + +impl Default for LastValue { + fn default() -> Self { + Self::new() + } +} + +impl LastValue { + pub fn new() -> Self { + Self { + aliases: vec![String::from("LAST_VALUE")], + signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable), + } + } +} + +impl AggregateUDFImpl for LastValue { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "LAST_VALUE" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].clone()) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + let mut all_sort_orders = vec![]; + + // Construct PhysicalSortExpr objects from Expr objects: + let mut sort_exprs = vec![]; + for expr in acc_args.sort_exprs { + if let Expr::Sort(sort) = expr { + if let Expr::Column(col) = sort.expr.as_ref() { + let name = &col.name; + let e = expressions::column::col(name, acc_args.schema)?; + sort_exprs.push(PhysicalSortExpr { + expr: e, + options: SortOptions { + descending: !sort.asc, + nulls_first: sort.nulls_first, + }, + }); + } + } + } + if !sort_exprs.is_empty() { + all_sort_orders.extend(sort_exprs); + } + + let ordering_req = all_sort_orders; + + let ordering_dtypes = ordering_req + .iter() + .map(|e| e.expr.data_type(acc_args.schema)) + .collect::>>()?; + + let requirement_satisfied = ordering_req.is_empty(); + + LastValueAccumulator::try_new( + acc_args.data_type, + &ordering_dtypes, + ordering_req, + acc_args.ignore_nulls, + ) + .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) + } + + fn state_fields( + &self, + name: &str, + value_type: DataType, + ordering_fields: Vec, + ) -> Result> { + let mut fields = vec![Field::new( + format_state_name(name, "last_value"), + value_type, + true, + )]; + fields.extend(ordering_fields); + fields.push(Field::new("is_set", DataType::Boolean, true)); + Ok(fields) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + /// TO BE DEPRECATED: Builtin LAST_VALUE physical aggregate expression will be replaced by udf in the future #[derive(Debug, Clone)] pub struct LastValuePhysicalExpr { diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index 8016b76889f7..b4dae934f3dc 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -70,7 +70,10 @@ pub mod expr_fn { /// Registers all enabled packages with a [`FunctionRegistry`] pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { - let functions: Vec> = vec![first_last::first_value_udaf()]; + let functions: Vec> = vec![ + first_last::first_value_udaf(), + first_last::last_value_udaf(), + ]; functions.into_iter().try_for_each(|udf| { let existing_udaf = registry.register_udaf(udf)?;