diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 1cde1c5050a8..967ccc0b0866 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -47,10 +47,6 @@ pub enum AggregateFunction { Correlation, /// Grouping Grouping, - /// Bool And - BoolAnd, - /// Bool Or - BoolOr, } impl AggregateFunction { @@ -64,8 +60,6 @@ impl AggregateFunction { NthValue => "NTH_VALUE", Correlation => "CORR", Grouping => "GROUPING", - BoolAnd => "BOOL_AND", - BoolOr => "BOOL_OR", } } } @@ -82,8 +76,6 @@ impl FromStr for AggregateFunction { Ok(match name { // general "avg" => AggregateFunction::Avg, - "bool_and" => AggregateFunction::BoolAnd, - "bool_or" => AggregateFunction::BoolOr, "max" => AggregateFunction::Max, "mean" => AggregateFunction::Avg, "min" => AggregateFunction::Min, @@ -128,9 +120,6 @@ impl AggregateFunction { // The coerced_data_types is same with input_types. Ok(coerced_data_types[0].clone()) } - AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { - Ok(DataType::Boolean) - } AggregateFunction::Correlation => { correlation_return_type(&coerced_data_types[0]) } @@ -179,10 +168,6 @@ impl AggregateFunction { .collect::>(); Signature::uniform(1, valid, Volatility::Immutable) } - AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { - Signature::uniform(1, vec![DataType::Boolean], Volatility::Immutable) - } - AggregateFunction::Avg => { Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) } diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index abe6d8b1823d..428fc99070d2 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -121,18 +121,6 @@ pub fn coerce_types( }; Ok(vec![v]) } - AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { - // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc - // smallint, int, bigint, real, double precision, decimal, or interval. - if !is_bool_and_or_support_arg_type(&input_types[0]) { - return plan_err!( - "The function {:?} does not support inputs of type {:?}.", - agg_fun, - input_types[0] - ); - } - Ok(input_types.to_vec()) - } AggregateFunction::Correlation => { if !is_correlation_support_arg_type(&input_types[0]) { return plan_err!( @@ -319,10 +307,6 @@ pub fn avg_sum_type(arg_type: &DataType) -> Result { } } -pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool { - matches!(arg_type, DataType::Boolean) -} - pub fn is_sum_support_arg_type(arg_type: &DataType) -> bool { match arg_type { DataType::Dictionary(_, dict_value_type) => { diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs b/datafusion/functions-aggregate/src/bool_and_or.rs new file mode 100644 index 000000000000..d0028672743e --- /dev/null +++ b/datafusion/functions-aggregate/src/bool_and_or.rs @@ -0,0 +1,343 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expressions that can evaluated at runtime during query execution + +use std::any::Any; + +use arrow::array::ArrayRef; +use arrow::array::BooleanArray; +use arrow::compute::bool_and as compute_bool_and; +use arrow::compute::bool_or as compute_bool_or; +use arrow::datatypes::DataType; +use arrow::datatypes::Field; + +use datafusion_common::internal_err; +use datafusion_common::{downcast_value, not_impl_err}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity}; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, Volatility, +}; + +use datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; + +// returns the new value after bool_and/bool_or with the new values, taking nullability into account +macro_rules! typed_bool_and_or_batch { + ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ + let array = downcast_value!($VALUES, $ARRAYTYPE); + let delta = $OP(array); + Ok(ScalarValue::$SCALAR(delta)) + }}; +} + +// bool_and/bool_or the array and returns a ScalarValue of its corresponding type. +macro_rules! bool_and_or_batch { + ($VALUES:expr, $OP:ident) => {{ + match $VALUES.data_type() { + DataType::Boolean => { + typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP) + } + e => { + return internal_err!( + "Bool and/Bool or is not expected to receive the type {e:?}" + ); + } + } + }}; +} + +/// dynamically-typed bool_and(array) -> ScalarValue +fn bool_and_batch(values: &ArrayRef) -> Result { + bool_and_or_batch!(values, compute_bool_and) +} + +/// dynamically-typed bool_or(array) -> ScalarValue +fn bool_or_batch(values: &ArrayRef) -> Result { + bool_and_or_batch!(values, compute_bool_or) +} + +make_udaf_expr_and_func!( + BoolAnd, + bool_and, + expression, + "The values to combine with `AND`", + bool_and_udaf +); + +make_udaf_expr_and_func!( + BoolOr, + bool_or, + expression, + "The values to combine with `OR`", + bool_or_udaf +); + +/// BOOL_AND aggregate expression +#[derive(Debug)] +pub struct BoolAnd { + signature: Signature, +} + +impl BoolAnd { + fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Boolean], + Volatility::Immutable, + ), + } + } +} + +impl Default for BoolAnd { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for BoolAnd { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "bool_and" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn accumulator(&self, _: AccumulatorArgs) -> Result> { + Ok(Box::::default()) + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + Ok(vec![Field::new( + format_state_name(args.name, self.name()), + DataType::Boolean, + true, + )]) + } + + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + true + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + match args.data_type { + DataType::Boolean => { + Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y))) + } + _ => not_impl_err!( + "GroupsAccumulator not supported for {} with {}", + args.name, + args.data_type + ), + } + } + + fn aliases(&self) -> &[String] { + &[] + } + + fn create_sliding_accumulator( + &self, + _: AccumulatorArgs, + ) -> Result> { + Ok(Box::::default()) + } + + fn order_sensitivity(&self) -> AggregateOrderSensitivity { + AggregateOrderSensitivity::Insensitive + } + + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Identical + } +} + +#[derive(Debug, Default)] +struct BoolAndAccumulator { + acc: Option, +} + +impl Accumulator for BoolAndAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &values[0]; + self.acc = match (self.acc, bool_and_batch(values)?) { + (None, ScalarValue::Boolean(v)) => v, + (Some(v), ScalarValue::Boolean(None)) => Some(v), + (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b), + _ => unreachable!(), + }; + Ok(()) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Boolean(self.acc)) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&mut self) -> Result> { + Ok(vec![ScalarValue::Boolean(self.acc)]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } +} + +/// BOOL_OR aggregate expression +#[derive(Debug, Clone)] +pub struct BoolOr { + signature: Signature, +} + +impl BoolOr { + fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Boolean], + Volatility::Immutable, + ), + } + } +} + +impl Default for BoolOr { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for BoolOr { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "bool_or" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn accumulator(&self, _: AccumulatorArgs) -> Result> { + Ok(Box::::default()) + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + Ok(vec![Field::new( + format_state_name(args.name, self.name()), + DataType::Boolean, + true, + )]) + } + + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + true + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + match args.data_type { + DataType::Boolean => { + Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y))) + } + _ => not_impl_err!( + "GroupsAccumulator not supported for {} with {}", + args.name, + args.data_type + ), + } + } + + fn aliases(&self) -> &[String] { + &[] + } + + fn create_sliding_accumulator( + &self, + _: AccumulatorArgs, + ) -> Result> { + Ok(Box::::default()) + } + + fn order_sensitivity(&self) -> AggregateOrderSensitivity { + AggregateOrderSensitivity::Insensitive + } + + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Identical + } +} + +#[derive(Debug, Default)] +struct BoolOrAccumulator { + acc: Option, +} + +impl Accumulator for BoolOrAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &values[0]; + self.acc = match (self.acc, bool_or_batch(values)?) { + (None, ScalarValue::Boolean(v)) => v, + (Some(v), ScalarValue::Boolean(None)) => Some(v), + (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b), + _ => unreachable!(), + }; + Ok(()) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Boolean(self.acc)) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&mut self) -> Result> { + Ok(vec![ScalarValue::Boolean(self.acc)]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } +} diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index 20a8d2c15926..260d6dab31b9 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -70,8 +70,8 @@ pub mod approx_median; pub mod approx_percentile_cont; pub mod approx_percentile_cont_with_weight; pub mod bit_and_or_xor; +pub mod bool_and_or; pub mod string_agg; - use crate::approx_percentile_cont::approx_percentile_cont_udaf; use crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf; use datafusion_common::Result; @@ -89,6 +89,8 @@ pub mod expr_fn { pub use super::bit_and_or_xor::bit_and; pub use super::bit_and_or_xor::bit_or; pub use super::bit_and_or_xor::bit_xor; + pub use super::bool_and_or::bool_and; + pub use super::bool_and_or::bool_or; pub use super::count::count; pub use super::count::count_distinct; pub use super::covariance::covar_pop; @@ -143,6 +145,8 @@ pub fn all_default_aggregate_functions() -> Vec> { bit_and_or_xor::bit_and_udaf(), bit_and_or_xor::bit_or_udaf(), bit_and_or_xor::bit_xor_udaf(), + bool_and_or::bool_and_udaf(), + bool_and_or::bool_or_udaf(), ] } diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs deleted file mode 100644 index 341932bd77a4..000000000000 --- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs +++ /dev/null @@ -1,394 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines physical expressions that can evaluated at runtime during query execution - -use crate::{AggregateExpr, PhysicalExpr}; -use arrow::datatypes::DataType; -use arrow::{ - array::{ArrayRef, BooleanArray}, - datatypes::Field, -}; -use datafusion_common::{ - downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue, -}; -use datafusion_expr::{Accumulator, GroupsAccumulator}; -use std::any::Any; -use std::sync::Arc; - -use crate::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; -use crate::aggregate::utils::down_cast_any_ref; -use crate::expressions::format_state_name; -use arrow::array::Array; -use arrow::compute::{bool_and, bool_or}; - -// returns the new value after bool_and/bool_or with the new values, taking nullability into account -macro_rules! typed_bool_and_or_batch { - ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ - let array = downcast_value!($VALUES, $ARRAYTYPE); - let delta = $OP(array); - Ok(ScalarValue::$SCALAR(delta)) - }}; -} - -// bool_and/bool_or the array and returns a ScalarValue of its corresponding type. -macro_rules! bool_and_or_batch { - ($VALUES:expr, $OP:ident) => {{ - match $VALUES.data_type() { - DataType::Boolean => { - typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP) - } - e => { - return internal_err!( - "Bool and/Bool or is not expected to receive the type {e:?}" - ); - } - } - }}; -} - -/// dynamically-typed bool_and(array) -> ScalarValue -fn bool_and_batch(values: &ArrayRef) -> Result { - bool_and_or_batch!(values, bool_and) -} - -/// dynamically-typed bool_or(array) -> ScalarValue -fn bool_or_batch(values: &ArrayRef) -> Result { - bool_and_or_batch!(values, bool_or) -} - -/// BOOL_AND aggregate expression -#[derive(Debug, Clone)] -pub struct BoolAnd { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BoolAnd { - /// Create a new BOOL_AND aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BoolAnd { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - Ok(Box::::default()) - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bool_and"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - match self.data_type { - DataType::Boolean => { - Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y))) - } - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } - - fn create_sliding_accumulator(&self) -> Result> { - Ok(Box::::default()) - } -} - -impl PartialEq for BoolAnd { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -#[derive(Debug, Default)] -struct BoolAndAccumulator { - acc: Option, -} - -impl Accumulator for BoolAndAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - self.acc = match (self.acc, bool_and_batch(values)?) { - (None, ScalarValue::Boolean(v)) => v, - (Some(v), ScalarValue::Boolean(None)) => Some(v), - (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b), - _ => unreachable!(), - }; - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn state(&mut self) -> Result> { - Ok(vec![ScalarValue::Boolean(self.acc)]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Boolean(self.acc)) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// BOOL_OR aggregate expression -#[derive(Debug, Clone)] -pub struct BoolOr { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BoolOr { - /// Create a new BOOL_OR aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BoolOr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - Ok(Box::::default()) - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bool_or"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - match self.data_type { - DataType::Boolean => { - Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y))) - } - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } - - fn create_sliding_accumulator(&self) -> Result> { - Ok(Box::::default()) - } -} - -impl PartialEq for BoolOr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -#[derive(Debug, Default)] -struct BoolOrAccumulator { - acc: Option, -} - -impl Accumulator for BoolOrAccumulator { - fn state(&mut self) -> Result> { - Ok(vec![ScalarValue::Boolean(self.acc)]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - self.acc = match (self.acc, bool_or_batch(values)?) { - (None, ScalarValue::Boolean(v)) => v, - (Some(v), ScalarValue::Boolean(None)) => Some(v), - (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b), - _ => unreachable!(), - }; - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Boolean(self.acc)) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::expressions::col; - use crate::expressions::tests::aggregate; - use crate::generic_test_op; - use arrow::datatypes::*; - use arrow::record_batch::RecordBatch; - - #[test] - fn test_bool_and() -> Result<()> { - let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, false])); - generic_test_op!(a, DataType::Boolean, BoolAnd, ScalarValue::from(false)) - } - - #[test] - fn bool_and_with_nulls() -> Result<()> { - let a: ArrayRef = Arc::new(BooleanArray::from(vec![ - Some(true), - None, - Some(true), - Some(true), - ])); - generic_test_op!(a, DataType::Boolean, BoolAnd, ScalarValue::from(true)) - } - - #[test] - fn bool_and_all_nulls() -> Result<()> { - let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None])); - generic_test_op!(a, DataType::Boolean, BoolAnd, ScalarValue::Boolean(None)) - } - - #[test] - fn test_bool_or() -> Result<()> { - let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, false])); - generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::from(true)) - } - - #[test] - fn bool_or_with_nulls() -> Result<()> { - let a: ArrayRef = Arc::new(BooleanArray::from(vec![ - Some(false), - None, - Some(false), - Some(false), - ])); - generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::from(false)) - } - - #[test] - fn bool_or_all_nulls() -> Result<()> { - let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None])); - generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::Boolean(None)) - } -} diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 1dfe9ffd6905..53cfcfb033a1 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -66,16 +66,6 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BoolOr, _) => Arc::new(expressions::BoolOr::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), (AggregateFunction::ArrayAgg, false) => { let expr = input_phy_exprs[0].clone(); let nullable = expr.nullable(input_schema)?; @@ -165,9 +155,7 @@ mod tests { use datafusion_common::plan_err; use datafusion_expr::{type_coercion, Signature}; - use crate::expressions::{ - try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min, - }; + use crate::expressions::{try_cast, ArrayAgg, Avg, DistinctArrayAgg, Max, Min}; use super::*; #[test] @@ -281,48 +269,6 @@ mod tests { Ok(()) } - #[test] - fn test_bool_and_or_expr() -> Result<()> { - let funcs = vec![AggregateFunction::BoolAnd, AggregateFunction::BoolOr]; - let data_types = vec![DataType::Boolean]; - for fun in funcs { - for data_type in &data_types { - let input_schema = - Schema::new(vec![Field::new("c1", data_type.clone(), true)]); - let input_phy_exprs: Vec> = vec![Arc::new( - expressions::Column::new_with_schema("c1", &input_schema).unwrap(), - )]; - let result_agg_phy_exprs = create_physical_agg_expr_for_test( - &fun, - false, - &input_phy_exprs[0..1], - &input_schema, - "c1", - )?; - match fun { - AggregateFunction::BoolAnd => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - AggregateFunction::BoolOr => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - _ => {} - }; - } - } - Ok(()) - } - #[test] fn test_sum_avg_expr() -> Result<()> { let funcs = vec![AggregateFunction::Avg]; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index a6946e739c97..73d810ec056d 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -25,9 +25,6 @@ pub(crate) mod accumulate { pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; -pub(crate) mod bool_op { - pub use datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; -} pub(crate) mod prim_op { pub use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 87c7deccc2cd..f64c5b1fb260 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -21,7 +21,6 @@ pub(crate) mod array_agg; pub(crate) mod array_agg_distinct; pub(crate) mod array_agg_ordered; pub(crate) mod average; -pub(crate) mod bool_and_or; pub(crate) mod correlation; pub(crate) mod covariance; pub(crate) mod grouping; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 322610404074..0020aa5f55b2 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -40,7 +40,6 @@ pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg; pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; pub use crate::aggregate::average::Avg; pub use crate::aggregate::average::AvgAccumulator; -pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr}; pub use crate::aggregate::build_in::create_aggregate_expr; pub use crate::aggregate::correlation::Correlation; pub use crate::aggregate::grouping::Grouping; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 6375df721ae6..50356d5b6052 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -494,8 +494,8 @@ enum AggregateFunction { // BIT_AND = 19; // BIT_OR = 20; // BIT_XOR = 21; - BOOL_AND = 22; - BOOL_OR = 23; +// BOOL_AND = 22; +// BOOL_OR = 23; // REGR_SLOPE = 26; // REGR_INTERCEPT = 27; // REGR_COUNT = 28; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 5c483f70d150..8cca0fe4a876 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -538,8 +538,6 @@ impl serde::Serialize for AggregateFunction { Self::ArrayAgg => "ARRAY_AGG", Self::Correlation => "CORRELATION", Self::Grouping => "GROUPING", - Self::BoolAnd => "BOOL_AND", - Self::BoolOr => "BOOL_OR", Self::NthValueAgg => "NTH_VALUE_AGG", }; serializer.serialize_str(variant) @@ -558,8 +556,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "ARRAY_AGG", "CORRELATION", "GROUPING", - "BOOL_AND", - "BOOL_OR", "NTH_VALUE_AGG", ]; @@ -607,8 +603,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg), "CORRELATION" => Ok(AggregateFunction::Correlation), "GROUPING" => Ok(AggregateFunction::Grouping), - "BOOL_AND" => Ok(AggregateFunction::BoolAnd), - "BOOL_OR" => Ok(AggregateFunction::BoolOr), "NTH_VALUE_AGG" => Ok(AggregateFunction::NthValueAgg), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index bc5b6be2ad87..56f14982923d 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1948,8 +1948,8 @@ pub enum AggregateFunction { /// BIT_AND = 19; /// BIT_OR = 20; /// BIT_XOR = 21; - BoolAnd = 22, - BoolOr = 23, + /// BOOL_AND = 22; + /// BOOL_OR = 23; /// REGR_SLOPE = 26; /// REGR_INTERCEPT = 27; /// REGR_COUNT = 28; @@ -1975,8 +1975,6 @@ impl AggregateFunction { AggregateFunction::ArrayAgg => "ARRAY_AGG", AggregateFunction::Correlation => "CORRELATION", AggregateFunction::Grouping => "GROUPING", - AggregateFunction::BoolAnd => "BOOL_AND", - AggregateFunction::BoolOr => "BOOL_OR", AggregateFunction::NthValueAgg => "NTH_VALUE_AGG", } } @@ -1989,8 +1987,6 @@ impl AggregateFunction { "ARRAY_AGG" => Some(Self::ArrayAgg), "CORRELATION" => Some(Self::Correlation), "GROUPING" => Some(Self::Grouping), - "BOOL_AND" => Some(Self::BoolAnd), - "BOOL_OR" => Some(Self::BoolOr), "NTH_VALUE_AGG" => Some(Self::NthValueAgg), _ => None, } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 5bec655bb1ff..ba0e708218cf 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -140,8 +140,6 @@ impl From for AggregateFunction { protobuf::AggregateFunction::Min => Self::Min, protobuf::AggregateFunction::Max => Self::Max, protobuf::AggregateFunction::Avg => Self::Avg, - protobuf::AggregateFunction::BoolAnd => Self::BoolAnd, - protobuf::AggregateFunction::BoolOr => Self::BoolOr, protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg, protobuf::AggregateFunction::Correlation => Self::Correlation, protobuf::AggregateFunction::Grouping => Self::Grouping, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 66b7c77799ea..08999effa4b1 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -111,8 +111,6 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::Min => Self::Min, AggregateFunction::Max => Self::Max, AggregateFunction::Avg => Self::Avg, - AggregateFunction::BoolAnd => Self::BoolAnd, - AggregateFunction::BoolOr => Self::BoolOr, AggregateFunction::ArrayAgg => Self::ArrayAgg, AggregateFunction::Correlation => Self::Correlation, AggregateFunction::Grouping => Self::Grouping, @@ -376,8 +374,6 @@ pub fn serialize_expr( AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg, AggregateFunction::Min => protobuf::AggregateFunction::Min, AggregateFunction::Max => protobuf::AggregateFunction::Max, - AggregateFunction::BoolAnd => protobuf::AggregateFunction::BoolAnd, - AggregateFunction::BoolOr => protobuf::AggregateFunction::BoolOr, AggregateFunction::Avg => protobuf::AggregateFunction::Avg, AggregateFunction::Correlation => { protobuf::AggregateFunction::Correlation diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index ed966509b842..a9d3736dee08 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -23,10 +23,10 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ - ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, Correlation, - CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, - Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, - OrderSensitiveArrayAgg, Rank, RankType, RowNumber, TryCastExpr, WindowShift, + ArrayAgg, Avg, BinaryExpr, CaseExpr, CastExpr, Column, Correlation, CumeDist, + DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, + NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, + RankType, RowNumber, TryCastExpr, WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -240,10 +240,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { let inner = if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Grouping - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BoolAnd - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BoolOr } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::ArrayAgg } else if aggr_expr.downcast_ref::().is_some() { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 61764394ee74..b3966c3f0204 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -59,7 +59,9 @@ use datafusion_expr::{ TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; -use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor}; +use datafusion_functions_aggregate::expr_fn::{ + bit_and, bit_or, bit_xor, bool_and, bool_or, +}; use datafusion_functions_aggregate::string_agg::string_agg; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, @@ -671,6 +673,8 @@ async fn roundtrip_expr_api() -> Result<()> { bit_or(lit(2)), bit_xor(lit(2)), string_agg(col("a").cast_to(&DataType::Utf8, &schema)?, lit("|")), + bool_and(lit(true)), + bool_or(lit(true)), ]; // ensure expressions created with the expr api can be round tripped