diff --git a/Cargo.toml b/Cargo.toml index 124747999041..479d2cadc65b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ members = [ "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-expr-common", - "datafusion/physical-expr-functions-aggregate", "datafusion/physical-optimizer", "datafusion/physical-plan", "datafusion/proto", @@ -106,7 +105,6 @@ datafusion-functions-window = { path = "datafusion/functions-window", version = datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false } datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false } -datafusion-physical-expr-functions-aggregate = { path = "datafusion/physical-expr-functions-aggregate", version = "41.0.0" } datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "41.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "41.0.0" } datafusion-proto = { path = "datafusion/proto", version = "41.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 50333d17ca8d..1e89bb3af87e 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "adler32" version = "1.2.0" @@ -167,9 +173,9 @@ checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" [[package]] name = "arrayvec" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" @@ -430,7 +436,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -765,7 +771,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] @@ -815,9 +821,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.3" +version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210" +checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" dependencies = [ "arrayref", "arrayvec", @@ -999,7 +1005,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1155,7 +1161,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" dependencies = [ "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1206,7 +1212,6 @@ dependencies = [ "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", - "datafusion-physical-expr-functions-aggregate", "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", @@ -1501,20 +1506,6 @@ dependencies = [ "rand", ] -[[package]] -name = "datafusion-physical-expr-functions-aggregate" -version = "41.0.0" -dependencies = [ - "ahash", - "arrow", - "datafusion-common", - "datafusion-expr", - "datafusion-expr-common", - "datafusion-functions-aggregate-common", - "datafusion-physical-expr-common", - "rand", -] - [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" @@ -1546,7 +1537,6 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-common", - "datafusion-physical-expr-functions-aggregate", "futures", "half", "hashbrown", @@ -1743,12 +1733,12 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.31" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" +checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -1831,7 +1821,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1924,9 +1914,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ "atomic-waker", "bytes", @@ -2111,7 +2101,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.5", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -2148,7 +2138,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "rustls 0.23.12", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2353,9 +2343,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.156" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libflate" @@ -2489,6 +2479,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "1.0.2" @@ -2829,7 +2828,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3028,9 +3027,9 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ "getrandom", "libredox", @@ -3074,15 +3073,15 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "reqwest" -version = "0.12.5" +version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" +checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64 0.22.1", "bytes", "futures-core", "futures-util", - "h2 0.4.5", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -3098,7 +3097,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.12", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pemfile 2.1.3", "rustls-pki-types", "serde", @@ -3114,7 +3113,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "winreg", + "windows-registry", ] [[package]] @@ -3253,9 +3252,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +checksum = "04182dffc9091a404e0fc069ea5cd60e5b866c3adf881eff99a32d048242dffa" dependencies = [ "openssl-probe", "rustls-pemfile 2.1.3", @@ -3421,7 +3420,7 @@ checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3563,7 +3562,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3609,7 +3608,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3622,7 +3621,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3644,9 +3643,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.74" +version = "2.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fceb41e3d546d0bd83421d3409b1460cc7444cd389341a4c880fe7a042cb3d7" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" dependencies = [ "proc-macro2", "quote", @@ -3658,6 +3657,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "tempfile" @@ -3704,7 +3706,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3774,9 +3776,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", @@ -3798,7 +3800,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3895,7 +3897,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3940,7 +3942,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4095,7 +4097,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "wasm-bindgen-shared", ] @@ -4129,7 +4131,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4213,6 +4215,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -4361,16 +4393,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "winreg" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" -dependencies = [ - "cfg-if", - "windows-sys 0.48.0", -] - [[package]] name = "xmlparser" version = "0.13.6" @@ -4404,7 +4426,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index adbba3eb31d6..de228e058096 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -110,7 +110,6 @@ datafusion-functions-window = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } -datafusion-physical-expr-functions-aggregate = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 735a381586ad..67f3cb01c0a4 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -490,7 +490,6 @@ //! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule //! [`Schema`]: arrow::datatypes::Schema //! [`PhysicalExpr`]: physical_plan::PhysicalExpr -//! [`AggregateExpr`]: physical_plan::AggregateExpr //! [`RecordBatch`]: arrow::record_batch::RecordBatch //! [`RecordBatchReader`]: arrow::record_batch::RecordBatchReader //! [`Array`]: arrow::array::Array @@ -556,11 +555,6 @@ pub mod physical_expr_common { pub use datafusion_physical_expr_common::*; } -/// re-export of [`datafusion_physical_expr_functions_aggregate`] crate -pub mod physical_expr_functions_aggregate { - pub use datafusion_physical_expr_functions_aggregate::*; -} - /// re-export of [`datafusion_physical_expr`] crate pub mod physical_expr { pub use datafusion_physical_expr::*; diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 8cbb187f7bd2..1a12fc7de888 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -26,7 +26,8 @@ use crate::physical_plan::ExecutionPlan; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::{physical_exprs_equal, AggregateExpr, PhysicalExpr}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; /// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs @@ -122,7 +123,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { type GroupExprsRef<'a> = ( &'a PhysicalGroupBy, - &'a [Arc], + &'a [Arc], &'a [Option>], ); @@ -171,8 +172,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected macro_rules! assert_optimized { @@ -224,7 +225,7 @@ mod tests { fn partial_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec>, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -244,7 +245,7 @@ mod tests { fn final_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec>, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -272,7 +273,7 @@ mod tests { expr: Arc, name: &str, schema: &Schema, - ) -> Arc { + ) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![expr]) .schema(Arc::new(schema.clone())) .alias(name) diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index f8edf73e3d2a..a2726d62e9f6 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -23,8 +23,9 @@ use std::sync::Arc; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_datafusion_err, Result}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalSortRequirement, + reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::concat_slices; @@ -117,7 +118,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// /// # Parameters /// -/// * `aggr_exprs` - A vector of `Arc` representing the +/// * `aggr_exprs` - A vector of `Arc` representing the /// aggregate expressions to be optimized. /// * `prefix_requirement` - An array slice representing the ordering /// requirements preceding the aggregate expressions. @@ -130,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// successfully. Any errors occurring during the conversion process are /// passed through. fn try_convert_aggregate_if_better( - aggr_exprs: Vec>, + aggr_exprs: Vec>, prefix_requirement: &[PhysicalSortRequirement], eq_properties: &EquivalenceProperties, -) -> Result>> { +) -> Result>> { aggr_exprs .into_iter() .map(|aggr_expr| { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 8d6c5089fa34..9501d3c6bbbb 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -58,8 +58,8 @@ use crate::physical_plan::unnest::UnnestExec; use crate::physical_plan::values::ValuesExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ - displayable, windows, AggregateExpr, ExecutionPlan, ExecutionPlanProperties, - InputOrderMode, Partitioning, PhysicalExpr, WindowExpr, + displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, + Partitioning, PhysicalExpr, WindowExpr, }; use arrow::compute::SortOptions; @@ -81,9 +81,9 @@ use datafusion_expr::{ DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; -use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_sql::utils::window_expr_common_partition_keys; @@ -719,7 +719,7 @@ impl DefaultPhysicalPlanner { // optimization purposes. For example, a FIRST_VALUE may turn // into a LAST_VALUE with the reverse ordering requirement. // To reflect such changes to subsequent stages, use the updated - // `AggregateExpr`/`PhysicalSortExpr` objects. + // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); let next_partition_mode = if can_repartition { @@ -1541,7 +1541,7 @@ pub fn create_window_expr( } type AggregateExprWithOptionalArgs = ( - Arc, + Arc, // The filter clause, if any Option>, // Ordering requirements, if any diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index ca8376fdec0a..faa9378535fd 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -48,13 +48,11 @@ use datafusion_common::TableReference; use datafusion_expr::utils::COUNT_STAR_EXPANSION; use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::{ - expressions, AggregateExpr, EquivalenceProperties, PhysicalExpr, -}; +use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr}; use async_trait::async_trait; use datafusion_catalog::Session; -use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use futures::Stream; use tempfile::TempDir; // backwards compatibility @@ -429,7 +427,7 @@ impl TestAggregate { } /// Return appropriate expr depending if COUNT is for col or table (*) - pub fn count_expr(&self, schema: &Schema) -> Arc { + pub fn count_expr(&self, schema: &Schema) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![self.column()]) .schema(Arc::new(schema.clone())) .alias(self.column_name()) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 138e5bda7f39..62e9be63983c 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -25,7 +25,7 @@ use arrow::util::pretty::pretty_format_batches; use arrow_array::types::Int64Type; use datafusion::common::Result; use datafusion::datasource::MemTable; -use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index 698d1350cb61..c9cbaa8396fc 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -15,172 +15,5 @@ // specific language governing permissions and limitations // under the License. -//! [`AggregateExpr`] which defines the interface all aggregate expressions -//! (built-in and custom) need to satisfy. - -use crate::order::AggregateOrderSensitivity; -use arrow::datatypes::{DataType, Field}; -use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; -use datafusion_expr_common::accumulator::Accumulator; -use datafusion_expr_common::groups_accumulator::GroupsAccumulator; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use std::fmt::Debug; -use std::{any::Any, sync::Arc}; - pub mod count_distinct; pub mod groups_accumulator; - -/// An aggregate expression that: -/// * knows its resulting field -/// * knows how to create its accumulator -/// * knows its accumulator's state's field -/// * knows the expressions from whose its accumulator will receive values -/// -/// Any implementation of this trait also needs to implement the -/// `PartialEq` to allows comparing equality between the -/// trait objects. -pub trait AggregateExpr: Send + Sync + Debug + PartialEq { - /// Returns the aggregate expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// the accumulator used to accumulate values from the expressions. - /// the accumulator expects the same number of arguments as `expressions` and must - /// return states with the same description as `state_fields` - fn create_accumulator(&self) -> Result>; - - /// the fields that encapsulate the Accumulator's state - /// the number of fields here equals the number of states that the accumulator contains - fn state_fields(&self) -> Result>; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Order by requirements for the aggregate function - /// By default it is `None` (there is no requirement) - /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - /// Indicates whether aggregator can produce the correct result with any - /// arbitrary input ordering. By default, we assume that aggregate expressions - /// are order insensitive. - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - AggregateOrderSensitivity::Insensitive - } - - /// Sets the indicator whether ordering requirements of the aggregator is - /// satisfied by its input. If this is not the case, aggregators with order - /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce - /// the correct result with possibly more work internally. - /// - /// # Returns - /// - /// Returns `Ok(Some(updated_expr))` if the process completes successfully. - /// If the expression can benefit from existing input ordering, but does - /// not implement the method, returns an error. Order insensitive and hard - /// requirement aggregators return `Ok(None)`. - fn with_beneficial_ordering( - self: Arc, - _requirement_satisfied: bool, - ) -> Result>> { - if self.order_bys().is_some() && self.order_sensitivity().is_beneficial() { - return exec_err!( - "Should implement with satisfied for aggregator :{:?}", - self.name() - ); - } - Ok(None) - } - - /// Human readable name such as `"MIN(c2)"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "AggregateExpr: default name" - } - - /// If the aggregate expression has a specialized - /// [`GroupsAccumulator`] implementation. If this returns true, - /// `[Self::create_groups_accumulator`] will be called. - fn groups_accumulator_supported(&self) -> bool { - false - } - - /// Return a specialized [`GroupsAccumulator`] that manages state - /// for all groups. - /// - /// For maximum performance, a [`GroupsAccumulator`] should be - /// implemented in addition to [`Accumulator`]. - fn create_groups_accumulator(&self) -> Result> { - not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") - } - - /// Construct an expression that calculates the aggregate in reverse. - /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). - /// For aggregates that do not support calculation in reverse, - /// returns None (which is the default value). - fn reverse_expr(&self) -> Option> { - None - } - - /// Creates accumulator implementation that supports retract - fn create_sliding_accumulator(&self) -> Result> { - not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") - } - - /// Returns all expressions used in the [`AggregateExpr`]. - /// These expressions are (1)function arguments, (2) order by expressions. - fn all_expressions(&self) -> AggregatePhysicalExpressions { - let args = self.expressions(); - let order_bys = self.order_bys().unwrap_or(&[]); - let order_by_exprs = order_bys - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - AggregatePhysicalExpressions { - args, - order_by_exprs, - } - } - - /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent - /// with the return value of the [`AggregateExpr::all_expressions`] method. - /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. - fn with_new_expressions( - &self, - _args: Vec>, - _order_by_exprs: Vec>, - ) -> Option> { - None - } - - /// If this function is max, return (output_field, true) - /// if the function is min, return (output_field, false) - /// otherwise return None (the default) - /// - /// output_field is the name of the column produced by this aggregate - /// - /// Note: this is used to use special aggregate implementations in certain conditions - fn get_minmax_desc(&self) -> Option<(Field, bool)> { - None - } - - /// Returns default value of the function given the input is Null - /// Most of the aggregate function return Null if input is Null, - /// while `count` returns 0 if input is Null - fn default_value(&self, data_type: &DataType) -> Result; -} - -/// Stores the physical expressions used inside the `AggregateExpr`. -pub struct AggregatePhysicalExpressions { - /// Aggregate function arguments - pub args: Vec>, - /// Order by expressions - pub order_by_exprs: Vec>, -} diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index 7b8ce0397af8..4fba772d8ddc 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use arrow::array::{ArrayRef, AsArray}; use arrow::datatypes::ArrowNativeType; @@ -32,25 +32,6 @@ use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use crate::aggregate::AggregateExpr; - -/// Downcast a `Box` or `Arc` -/// and return the inner trait object as [`Any`] so -/// that it can be downcast to a specific implementation. -/// -/// This method is used when implementing the `PartialEq` -/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality -/// between the trait objects. -pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { - if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else { - any - } -} - /// Convert scalar values from an accumulator into arrays. pub fn get_accum_scalar_values_as_arrays( accum: &mut dyn Accumulator, diff --git a/datafusion/physical-expr-functions-aggregate/Cargo.toml b/datafusion/physical-expr-functions-aggregate/Cargo.toml deleted file mode 100644 index 6eed89614c53..000000000000 --- a/datafusion/physical-expr-functions-aggregate/Cargo.toml +++ /dev/null @@ -1,48 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-physical-expr-functions-aggregate" -description = "Logical plan and expression representation for DataFusion query engine" -keywords = ["datafusion", "logical", "plan", "expressions"] -readme = "README.md" -version = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -license = { workspace = true } -authors = { workspace = true } -rust-version = { workspace = true } - -[lints] -workspace = true - -[lib] -name = "datafusion_physical_expr_functions_aggregate" -path = "src/lib.rs" - -[features] - -[dependencies] -ahash = { workspace = true } -arrow = { workspace = true } -datafusion-common = { workspace = true } -datafusion-expr = { workspace = true } -datafusion-expr-common = { workspace = true } -datafusion-functions-aggregate-common = { workspace = true } -datafusion-physical-expr-common = { workspace = true } -rand = { workspace = true } diff --git a/datafusion/physical-expr-functions-aggregate/src/lib.rs b/datafusion/physical-expr-functions-aggregate/src/lib.rs deleted file mode 100644 index 2ff7ff5777ec..000000000000 --- a/datafusion/physical-expr-functions-aggregate/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Technically, all aggregate functions that depend on `expr` crate should be included here. - -pub mod aggregate; diff --git a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs similarity index 69% rename from datafusion/physical-expr-functions-aggregate/src/aggregate.rs rename to datafusion/physical-expr/src/aggregate.rs index fd986e00a7ef..5c1216f2a386 100644 --- a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -15,29 +15,46 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod groups_accumulator { + #[allow(unused_imports)] + pub(crate) mod accumulate { + pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; + } + pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ + accumulate::NullState, GroupsAccumulatorAdapter, + }; +} +pub(crate) mod stats { + pub use datafusion_functions_aggregate_common::stats::StatsType; +} +pub mod utils { + pub use datafusion_functions_aggregate_common::utils::{ + adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options, + ordering_fields, DecimalAverager, Hashable, + }; +} + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr::AggregateUDF; use datafusion_expr::ReversedUDAF; use datafusion_expr_common::accumulator::Accumulator; -use datafusion_expr_common::groups_accumulator::GroupsAccumulator; use datafusion_expr_common::type_coercion::aggregates::check_arg_count; use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs; -use datafusion_functions_aggregate_common::aggregate::AggregateExpr; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; -use datafusion_functions_aggregate_common::utils::{self, down_cast_any_ref}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr_common::utils::reverse_order_bys; +use datafusion_expr_common::groups_accumulator::GroupsAccumulator; use std::fmt::Debug; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; -/// Builder for physical [`AggregateExpr`] +/// Builder for physical [`AggregateFunctionExpr`] /// -/// `AggregateExpr` contains the information necessary to call +/// `AggregateFunctionExpr` contains the information necessary to call /// an aggregate expression. #[derive(Debug, Clone)] pub struct AggregateExprBuilder { @@ -71,7 +88,7 @@ impl AggregateExprBuilder { } } - pub fn build(self) -> Result> { + pub fn build(self) -> Result> { let Self { fun, args, @@ -204,6 +221,17 @@ impl AggregateFunctionExpr { &self.fun } + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + pub fn expressions(&self) -> Vec> { + self.args.clone() + } + + /// Human readable name such as `"MIN(c2)"`. + pub fn name(&self) -> &str { + &self.name + } + /// Return if the aggregation is distinct pub fn is_distinct(&self) -> bool { self.is_distinct @@ -219,34 +247,13 @@ impl AggregateFunctionExpr { self.is_reversed } + /// Return if the aggregation is nullable pub fn is_nullable(&self) -> bool { self.is_nullable } -} -impl AggregateExpr for AggregateFunctionExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn expressions(&self) -> Vec> { - self.args.clone() - } - - fn state_fields(&self) -> Result> { - let args = StateFieldsArgs { - name: &self.name, - input_types: &self.input_types, - return_type: &self.data_type, - ordering_fields: &self.ordering_fields, - is_distinct: self.is_distinct, - }; - - self.fun.state_fields(args) - } - - fn field(&self) -> Result { + /// the field of the final result of this aggregation. + pub fn field(&self) -> Result { Ok(Field::new( &self.name, self.data_type.clone(), @@ -254,7 +261,10 @@ impl AggregateExpr for AggregateFunctionExpr { )) } - fn create_accumulator(&self) -> Result> { + /// the accumulator used to accumulate values from the expressions. + /// the accumulator expects the same number of arguments as `expressions` and must + /// return states with the same description as `state_fields` + pub fn create_accumulator(&self) -> Result> { let acc_args = AccumulatorArgs { return_type: &self.data_type, schema: &self.schema, @@ -269,7 +279,83 @@ impl AggregateExpr for AggregateFunctionExpr { self.fun.accumulator(acc_args) } - fn create_sliding_accumulator(&self) -> Result> { + /// the field of the final result of this aggregation. + pub fn state_fields(&self) -> Result> { + let args = StateFieldsArgs { + name: &self.name, + input_types: &self.input_types, + return_type: &self.data_type, + ordering_fields: &self.ordering_fields, + is_distinct: self.is_distinct, + }; + + self.fun.state_fields(args) + } + + /// Order by requirements for the aggregate function + /// By default it is `None` (there is no requirement) + /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this + pub fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + if self.ordering_req.is_empty() { + return None; + } + + if !self.order_sensitivity().is_insensitive() { + return Some(&self.ordering_req); + } + + None + } + + /// Indicates whether aggregator can produce the correct result with any + /// arbitrary input ordering. By default, we assume that aggregate expressions + /// are order insensitive. + pub fn order_sensitivity(&self) -> AggregateOrderSensitivity { + if !self.ordering_req.is_empty() { + // If there is requirement, use the sensitivity of the implementation + self.fun.order_sensitivity() + } else { + // If no requirement, aggregator is order insensitive + AggregateOrderSensitivity::Insensitive + } + } + + /// Sets the indicator whether ordering requirements of the aggregator is + /// satisfied by its input. If this is not the case, aggregators with order + /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce + /// the correct result with possibly more work internally. + /// + /// # Returns + /// + /// Returns `Ok(Some(updated_expr))` if the process completes successfully. + /// If the expression can benefit from existing input ordering, but does + /// not implement the method, returns an error. Order insensitive and hard + /// requirement aggregators return `Ok(None)`. + pub fn with_beneficial_ordering( + self: Arc, + beneficial_ordering: bool, + ) -> Result>> { + let Some(updated_fn) = self + .fun + .clone() + .with_beneficial_ordering(beneficial_ordering)? + else { + return Ok(None); + }; + + AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) + .order_by(self.ordering_req.to_vec()) + .schema(Arc::new(self.schema.clone())) + .alias(self.name().to_string()) + .with_ignore_nulls(self.ignore_nulls) + .with_distinct(self.is_distinct) + .with_reversed(self.is_reversed) + .build() + .map(Some) + } + + /// Creates accumulator implementation that supports retract + pub fn create_sliding_accumulator(&self) -> Result> { let args = AccumulatorArgs { return_type: &self.data_type, schema: &self.schema, @@ -335,11 +421,10 @@ impl AggregateExpr for AggregateFunctionExpr { Ok(accumulator) } - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. + pub fn groups_accumulator_supported(&self) -> bool { let args = AccumulatorArgs { return_type: &self.data_type, schema: &self.schema, @@ -353,7 +438,12 @@ impl AggregateExpr for AggregateFunctionExpr { self.fun.groups_accumulator_supported(args) } - fn create_groups_accumulator(&self) -> Result> { + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. + pub fn create_groups_accumulator(&self) -> Result> { let args = AccumulatorArgs { return_type: &self.data_type, schema: &self.schema, @@ -367,52 +457,11 @@ impl AggregateExpr for AggregateFunctionExpr { self.fun.create_groups_accumulator(args) } - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - if self.ordering_req.is_empty() { - return None; - } - - if !self.order_sensitivity().is_insensitive() { - return Some(&self.ordering_req); - } - - None - } - - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - if !self.ordering_req.is_empty() { - // If there is requirement, use the sensitivity of the implementation - self.fun.order_sensitivity() - } else { - // If no requirement, aggregator is order insensitive - AggregateOrderSensitivity::Insensitive - } - } - - fn with_beneficial_ordering( - self: Arc, - beneficial_ordering: bool, - ) -> Result>> { - let Some(updated_fn) = self - .fun - .clone() - .with_beneficial_ordering(beneficial_ordering)? - else { - return Ok(None); - }; - - AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) - .order_by(self.ordering_req.to_vec()) - .schema(Arc::new(self.schema.clone())) - .alias(self.name().to_string()) - .with_ignore_nulls(self.ignore_nulls) - .with_distinct(self.is_distinct) - .with_reversed(self.is_reversed) - .build() - .map(Some) - } - - fn reverse_expr(&self) -> Option> { + /// Construct an expression that calculates the aggregate in reverse. + /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). + /// For aggregates that do not support calculation in reverse, + /// returns None (which is the default value). + pub fn reverse_expr(&self) -> Option> { match self.fun.reverse_udf() { ReversedUDAF::NotSupported => None, ReversedUDAF::Identical => Some(Arc::new(self.clone())), @@ -440,33 +489,72 @@ impl AggregateExpr for AggregateFunctionExpr { } } - fn get_minmax_desc(&self) -> Option<(Field, bool)> { + /// Returns all expressions used in the [`AggregateFunctionExpr`]. + /// These expressions are (1)function arguments, (2) order by expressions. + pub fn all_expressions(&self) -> AggregatePhysicalExpressions { + let args = self.expressions(); + let order_bys = self.order_bys().unwrap_or(&[]); + let order_by_exprs = order_bys + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + AggregatePhysicalExpressions { + args, + order_by_exprs, + } + } + + /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method. + /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. + pub fn with_new_expressions( + &self, + _args: Vec>, + _order_by_exprs: Vec>, + ) -> Option> { + None + } + + /// If this function is max, return (output_field, true) + /// if the function is min, return (output_field, false) + /// otherwise return None (the default) + /// + /// output_field is the name of the column produced by this aggregate + /// + /// Note: this is used to use special aggregate implementations in certain conditions + pub fn get_minmax_desc(&self) -> Option<(Field, bool)> { self.fun .is_descending() .and_then(|flag| self.field().ok().map(|f| (f, flag))) } - fn default_value(&self, data_type: &DataType) -> Result { + /// Returns default value of the function given the input is Null + /// Most of the aggregate function return Null if input is Null, + /// while `count` returns 0 if input is Null + pub fn default_value(&self, data_type: &DataType) -> Result { self.fun.default_value(data_type) } } -impl PartialEq for AggregateFunctionExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.fun == x.fun - && self.args.len() == x.args.len() - && self - .args - .iter() - .zip(x.args.iter()) - .all(|(this_arg, other_arg)| this_arg.eq(other_arg)) - }) - .unwrap_or(false) +/// Stores the physical expressions used inside the `AggregateExpr`. +pub struct AggregatePhysicalExpressions { + /// Aggregate function arguments + pub args: Vec>, + /// Order by expressions + pub order_by_exprs: Vec>, +} + +impl PartialEq for AggregateFunctionExpr { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.data_type == other.data_type + && self.fun == other.fun + && self.args.len() == other.args.len() + && self + .args + .iter() + .zip(other.args.iter()) + .all(|(this_arg, other_arg)| this_arg.eq(other_arg)) } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index c4255172d680..7db7188b85d3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -19,27 +19,7 @@ #![deny(clippy::clone_on_ref_ptr)] // Backward compatibility -pub mod aggregate { - pub(crate) mod groups_accumulator { - #[allow(unused_imports)] - pub(crate) mod accumulate { - pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; - } - pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ - accumulate::NullState, GroupsAccumulatorAdapter, - }; - } - pub(crate) mod stats { - pub use datafusion_functions_aggregate_common::stats::StatsType; - } - pub mod utils { - pub use datafusion_functions_aggregate_common::utils::{ - adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, - get_sort_options, ordering_fields, DecimalAverager, Hashable, - }; - } - pub use datafusion_functions_aggregate_common::aggregate::AggregateExpr; -} +pub mod aggregate; pub mod analysis; pub mod binary_map { pub use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType}; @@ -67,9 +47,6 @@ pub mod execution_props { pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; -pub use datafusion_functions_aggregate_common::aggregate::{ - AggregateExpr, AggregatePhysicalExpressions, -}; pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties}; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 52015f425217..5439e140502a 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -29,20 +29,19 @@ use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, WindowFrame}; +use crate::aggregate::AggregateFunctionExpr; use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr, }; -use crate::{ - expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, -}; +use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// A window expr that takes the form of an aggregate function. /// /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct PlainAggregateWindowExpr { - aggregate: Arc, + aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -51,7 +50,7 @@ pub struct PlainAggregateWindowExpr { impl PlainAggregateWindowExpr { /// Create a new aggregate window function expression pub fn new( - aggregate: Arc, + aggregate: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -65,7 +64,7 @@ impl PlainAggregateWindowExpr { } /// Get aggregate expr of AggregateWindowExpr - pub fn get_aggregate_expr(&self) -> &Arc { + pub fn get_aggregate_expr(&self) -> &Arc { &self.aggregate } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index afa799e86953..ac3a4f4c09ec 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -28,13 +28,12 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{Accumulator, WindowFrame}; +use crate::aggregate::AggregateFunctionExpr; use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr, }; -use crate::{ - expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, -}; +use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// A window expr that takes the form of an aggregate function that /// can be incrementally computed over sliding windows. @@ -42,7 +41,7 @@ use crate::{ /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct SlidingAggregateWindowExpr { - aggregate: Arc, + aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -51,7 +50,7 @@ pub struct SlidingAggregateWindowExpr { impl SlidingAggregateWindowExpr { /// Create a new (sliding) aggregate window function expression. pub fn new( - aggregate: Arc, + aggregate: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -64,8 +63,8 @@ impl SlidingAggregateWindowExpr { } } - /// Get the [AggregateExpr] of this object. - pub fn get_aggregate_expr(&self) -> &Arc { + /// Get the [AggregateFunctionExpr] of this object. + pub fn get_aggregate_expr(&self) -> &Arc { &self.aggregate } } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 66b250c5063b..2b8725b5bac7 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -23,7 +23,7 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::Result; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::projection::ProjectionExec; -use datafusion_physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics}; +use datafusion_physical_plan::{expressions, ExecutionPlan, Statistics}; use crate::PhysicalOptimizerRule; use datafusion_common::stats::Precision; @@ -58,12 +58,12 @@ impl PhysicalOptimizerRule for AggregateStatistics { let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { if let Some((non_null_rows, name)) = - take_optimizable_column_and_table_count(&**expr, &stats) + take_optimizable_column_and_table_count(expr, &stats) { projections.push((expressions::lit(non_null_rows), name.to_owned())); - } else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) { + } else if let Some((min, name)) = take_optimizable_min(expr, &stats) { projections.push((expressions::lit(min), name.to_owned())); - } else if let Some((max, name)) = take_optimizable_max(&**expr, &stats) { + } else if let Some((max, name)) = take_optimizable_max(expr, &stats) { projections.push((expressions::lit(max), name.to_owned())); } else { // TODO: we need all aggr_expr to be resolved (cf TODO fullres) @@ -137,7 +137,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> /// If this agg_expr is a count that can be exactly derived from the statistics, return it. fn take_optimizable_column_and_table_count( - agg_expr: &dyn AggregateExpr, + agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { let col_stats = &stats.column_statistics; @@ -174,7 +174,7 @@ fn take_optimizable_column_and_table_count( /// If this agg_expr is a min that is exactly defined in the statistics, return it. fn take_optimizable_min( - agg_expr: &dyn AggregateExpr, + agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { if let Precision::Exact(num_rows) = &stats.num_rows { @@ -220,7 +220,7 @@ fn take_optimizable_min( /// If this agg_expr is a max that is exactly defined in the statistics, return it. fn take_optimizable_max( - agg_expr: &dyn AggregateExpr, + agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { if let Precision::Exact(num_rows) = &stats.num_rows { @@ -266,33 +266,27 @@ fn take_optimizable_max( // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 -fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool { - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { - return true; - } +fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool { + if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { + return true; } false } // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 -fn is_min(agg_expr: &dyn AggregateExpr) -> bool { - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name().to_lowercase() == "min" { - return true; - } +fn is_min(agg_expr: &AggregateFunctionExpr) -> bool { + if agg_expr.fun().name().to_lowercase() == "min" { + return true; } false } // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 -fn is_max(agg_expr: &dyn AggregateExpr) -> bool { - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name().to_lowercase() == "max" { - return true; - } +fn is_max(agg_expr: &AggregateFunctionExpr) -> bool { + if agg_expr.fun().name().to_lowercase() == "max" { + return true; } false } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 78da4dc9c53f..24387c5f15ee 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -55,7 +55,6 @@ datafusion-functions-aggregate = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } -datafusion-physical-expr-functions-aggregate = { workspace = true } futures = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5aa255e7c341..0aeaa15b5f1d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -42,10 +42,11 @@ use datafusion_expr::Accumulator; use datafusion_physical_expr::{ equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, UnKnownColumn}, - physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, - LexRequirement, PhysicalExpr, PhysicalSortRequirement, + physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, + PhysicalExpr, PhysicalSortRequirement, }; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use itertools::Itertools; pub mod group_values; @@ -253,7 +254,7 @@ pub struct AggregateExec { /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions - aggr_expr: Vec>, + aggr_expr: Vec>, /// FILTER (WHERE clause) expression for each aggregate expression filter_expr: Vec>>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause @@ -280,7 +281,10 @@ impl AggregateExec { /// Function used in `ConvertFirstLast` optimizer rule, /// where we need parts of the new value, others cloned from the old one /// Rewrites aggregate exec with new aggregate expressions. - pub fn with_new_aggr_exprs(&self, aggr_expr: Vec>) -> Self { + pub fn with_new_aggr_exprs( + &self, + aggr_expr: Vec>, + ) -> Self { Self { aggr_expr, // clone the rest of the fields @@ -306,7 +310,7 @@ impl AggregateExec { pub fn try_new( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -343,7 +347,7 @@ impl AggregateExec { fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec>, + mut aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -451,7 +455,7 @@ impl AggregateExec { } /// Aggregate expressions - pub fn aggr_expr(&self) -> &[Arc] { + pub fn aggr_expr(&self) -> &[Arc] { &self.aggr_expr } @@ -788,7 +792,7 @@ impl ExecutionPlan for AggregateExec { fn create_schema( input_schema: &Schema, group_expr: &[(Arc, String)], - aggr_expr: &[Arc], + aggr_expr: &[Arc], contains_null_expr: bool, mode: AggregateMode, ) -> Result { @@ -834,7 +838,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// /// # Parameters /// -/// - `aggr_expr`: A reference to an `Arc` representing the +/// - `aggr_expr`: A reference to an `Arc` representing the /// aggregate expression. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the /// physical GROUP BY expression. @@ -846,7 +850,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// A `LexOrdering` instance indicating the lexical ordering requirement for /// the aggregate expression. fn get_aggregate_expr_req( - aggr_expr: &Arc, + aggr_expr: &Arc, group_by: &PhysicalGroupBy, agg_mode: &AggregateMode, ) -> LexOrdering { @@ -894,7 +898,7 @@ fn get_aggregate_expr_req( /// the aggregator requirement is incompatible. fn finer_ordering( existing_req: &LexOrdering, - aggr_expr: &Arc, + aggr_expr: &Arc, group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -912,7 +916,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// /// # Parameters /// -/// - `aggr_exprs`: A slice of `Arc` containing all the +/// - `aggr_exprs`: A slice of `Arc` containing all the /// aggregate expressions. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the /// physical GROUP BY expression. @@ -926,7 +930,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. pub fn get_finer_aggregate_exprs_requirement( - aggr_exprs: &mut [Arc], + aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -996,10 +1000,10 @@ pub fn get_finer_aggregate_exprs_requirement( /// returns physical expressions for arguments to evaluate against a batch /// The expressions are different depending on `mode`: -/// * Partial: AggregateExpr::expressions -/// * Final: columns of `AggregateExpr::state_fields()` +/// * Partial: AggregateFunctionExpr::expressions +/// * Final: columns of `AggregateFunctionExpr::state_fields()` pub fn aggregate_expressions( - aggr_expr: &[Arc], + aggr_expr: &[Arc], mode: &AggregateMode, col_idx_base: usize, ) -> Result>>> { @@ -1035,12 +1039,12 @@ pub fn aggregate_expressions( } /// uses `state_fields` to build a vec of physical column expressions required to merge the -/// AggregateExpr' accumulator's state. +/// AggregateFunctionExpr' accumulator's state. /// /// `index_base` is the starting physical column index for the next expanded state field. fn merge_expressions( index_base: usize, - expr: &Arc, + expr: &Arc, ) -> Result>> { expr.state_fields().map(|fields| { fields @@ -1054,7 +1058,7 @@ fn merge_expressions( pub type AccumulatorItem = Box; pub fn create_accumulators( - aggr_expr: &[Arc], + aggr_expr: &[Arc], ) -> Result> { aggr_expr .iter() @@ -1218,8 +1222,8 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use crate::common::collect; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::Literal; - use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; use futures::{FutureExt, Stream}; // Generate a schema which consists of 5 columns (a, b, c, d, e) @@ -1496,13 +1500,12 @@ mod tests { groups: vec![vec![false]], }; - let aggregates: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + ]; let task_ctx = if spill { // set to an appropriate value to trigger spill @@ -1793,7 +1796,7 @@ mod tests { } // Median(a) - fn test_median_agg_expr(schema: SchemaRef) -> Result> { + fn test_median_agg_expr(schema: SchemaRef) -> Result> { AggregateExprBuilder::new(median_udaf(), vec![col("a", &schema)?]) .schema(schema) .alias("MEDIAN(a)") @@ -1819,17 +1822,16 @@ mod tests { }; // something that allocates within the aggregator - let aggregates_v0: Vec> = + let aggregates_v0: Vec> = vec![test_median_agg_expr(Arc::clone(&input_schema))?]; // use fast-path in `row_hash.rs`. - let aggregates_v2: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates_v2: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + ]; for (version, groups, aggregates) in [ (0, groups_none, aggregates_v0), @@ -1883,13 +1885,12 @@ mod tests { let groups = PhysicalGroupBy::default(); - let aggregates: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(a)") - .build()?, - ]; + let aggregates: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(a)") + .build()?, + ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1923,13 +1924,12 @@ mod tests { let groups = PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); - let aggregates: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(b)") + .build()?, + ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1974,7 +1974,7 @@ mod tests { fn test_first_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result> { + ) -> Result> { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -1992,7 +1992,7 @@ mod tests { fn test_last_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result> { + ) -> Result> { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -2047,7 +2047,7 @@ mod tests { descending: false, nulls_first: false, }; - let aggregates: Vec> = if is_first_acc { + let aggregates: Vec> = if is_first_acc { vec![test_first_value_agg_expr(&schema, sort_options)?] } else { vec![test_last_value_agg_expr(&schema, sort_options)?] @@ -2212,7 +2212,7 @@ mod tests { }; let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); - let aggregates: Vec> = vec![ + let aggregates: Vec> = vec![ test_first_value_agg_expr(&schema, option_desc)?, test_last_value_agg_expr(&schema, option_desc)?, ]; @@ -2270,7 +2270,7 @@ mod tests { ], ); - let aggregates: Vec> = + let aggregates: Vec> = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)]) .schema(Arc::clone(&schema)) .alias("1") diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 05f4ec621813..d022bb007d9b 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -47,10 +47,9 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{ - AggregateExpr, GroupsAccumulatorAdapter, PhysicalSortExpr, -}; +use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use futures::ready; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -396,7 +395,7 @@ pub(crate) struct GroupedHashAggregateStream { /// processed. Reused across batches here to avoid reallocations current_group_indices: Vec, - /// Accumulators, one for each `AggregateExpr` in the query + /// Accumulators, one for each `AggregateFunctionExpr` in the query /// /// For example, if the query has aggregates, `SUM(x)`, /// `COUNT(y)`, there will be two accumulators, each one @@ -579,7 +578,7 @@ impl GroupedHashAggregateStream { /// that is supported by the aggregate, or a /// [`GroupsAccumulatorAdapter`] if not. pub(crate) fn create_group_accumulator( - agg_expr: &Arc, + agg_expr: &Arc, ) -> Result> { if agg_expr.groups_accumulator_supported() { agg_expr.create_groups_accumulator() diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index e1182719293d..c1c66f6d3923 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -34,7 +34,7 @@ pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, + expressions, functions, udf, Distribution, Partitioning, PhysicalExpr, }; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexRequirement; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index fb86a008e2cd..026798c5798b 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -31,7 +31,7 @@ pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::PhysicalSortExpr; pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, + expressions, functions, udf, Distribution, Partitioning, PhysicalExpr, }; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; @@ -82,7 +82,7 @@ pub mod windows; pub mod work_table; pub mod udaf { - pub use datafusion_physical_expr_functions_aggregate::aggregate::AggregateFunctionExpr; + pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } pub mod coalesce; diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index d607bb79b44e..56823e6dec2d 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -37,14 +37,13 @@ use datafusion_expr::{ BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition, WindowUDF, }; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr}, - AggregateExpr, ConstExpr, EquivalenceProperties, LexOrdering, - PhysicalSortRequirement, + ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; -use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; use itertools::Itertools; mod bounded_window_agg_exec; @@ -142,7 +141,7 @@ fn window_expr_from_aggregate_expr( partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, - aggregate: Arc, + aggregate: Arc, ) -> Arc { // Is there a potentially unlimited sized window frame? let unbounded_window = window_frame.start_bound.is_unbounded(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 96fb45eafe62..78f370c714cc 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -18,7 +18,7 @@ use std::fmt::Debug; use std::sync::Arc; -use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; use prost::bytes::BufMut; use prost::Message; @@ -34,6 +34,7 @@ use datafusion::datasource::physical_plan::ParquetExec; use datafusion::datasource::physical_plan::{AvroExec, CsvExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; +use datafusion::physical_expr::aggregate::AggregateFunctionExpr; use datafusion::physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use datafusion::physical_plan::aggregates::AggregateMode; use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; @@ -59,7 +60,7 @@ use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMerge use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ - AggregateExpr, ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, + ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF}; @@ -467,7 +468,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let physical_aggr_expr: Vec> = hash_agg + let physical_aggr_expr: Vec> = hash_agg .aggr_expr .iter() .zip(hash_agg.aggr_expr_name.iter()) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7949a457f40f..555ad22a9bc1 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -29,7 +29,7 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; -use datafusion::physical_plan::{AggregateExpr, Partitioning, PhysicalExpr, WindowExpr}; +use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; use datafusion::{ datasource::{ file_format::{csv::CsvSink, json::JsonSink}, @@ -49,58 +49,50 @@ use crate::protobuf::{ use super::PhysicalExtensionCodec; pub fn serialize_physical_aggr_expr( - aggr_expr: Arc, + aggr_expr: Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { let expressions = serialize_physical_exprs(aggr_expr.expressions(), codec)?; let ordering_req = aggr_expr.order_bys().unwrap_or(&[]).to_vec(); let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?; - if let Some(a) = aggr_expr.as_any().downcast_ref::() { - let name = a.fun().name().to_string(); - let mut buf = Vec::new(); - codec.try_encode_udaf(a.fun(), &mut buf)?; - Ok(protobuf::PhysicalExprNode { - expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( - protobuf::PhysicalAggregateExprNode { - aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), - expr: expressions, - ordering_req, - distinct: a.is_distinct(), - ignore_nulls: a.ignore_nulls(), - fun_definition: (!buf.is_empty()).then_some(buf) - }, - )), - }) - } else { - unreachable!("No other types exists besides AggergationFunctionExpr"); - } + let name = aggr_expr.fun().name().to_string(); + let mut buf = Vec::new(); + codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( + protobuf::PhysicalAggregateExprNode { + aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), + expr: expressions, + ordering_req, + distinct: aggr_expr.is_distinct(), + ignore_nulls: aggr_expr.ignore_nulls(), + fun_definition: (!buf.is_empty()).then_some(buf) + }, + )), + }) } fn serialize_physical_window_aggr_expr( - aggr_expr: &dyn AggregateExpr, + aggr_expr: &AggregateFunctionExpr, _window_frame: &WindowFrame, codec: &dyn PhysicalExtensionCodec, ) -> Result<(physical_window_expr_node::WindowFunction, Option>)> { - if let Some(a) = aggr_expr.as_any().downcast_ref::() { - if a.is_distinct() || a.ignore_nulls() { - // TODO - return not_impl_err!( - "Distinct aggregate functions not supported in window expressions" - ); - } - - let mut buf = Vec::new(); - codec.try_encode_udaf(a.fun(), &mut buf)?; - Ok(( - physical_window_expr_node::WindowFunction::UserDefinedAggrFunction( - a.fun().name().to_string(), - ), - (!buf.is_empty()).then_some(buf), - )) - } else { - unreachable!("No other types exists besides AggergationFunctionExpr"); + if aggr_expr.is_distinct() || aggr_expr.ignore_nulls() { + // TODO + return not_impl_err!( + "Distinct aggregate functions not supported in window expressions" + ); } + + let mut buf = Vec::new(); + codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; + Ok(( + physical_window_expr_node::WindowFunction::UserDefinedAggrFunction( + aggr_expr.fun().name().to_string(), + ), + (!buf.is_empty()).then_some(buf), + )) } pub fn serialize_physical_window_expr( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0ffc494321fb..60f5565bdeee 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -24,7 +24,7 @@ use std::vec; use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; -use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; use datafusion_functions_aggregate::array_agg::array_agg_udaf; @@ -47,7 +47,6 @@ use datafusion::datasource::physical_plan::{ use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; -use datafusion::physical_expr::aggregate::utils::down_cast_any_ref; use datafusion::physical_expr::expressions::Literal; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr}; @@ -70,13 +69,12 @@ use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::windows::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowAggExec, }; -use datafusion::physical_plan::{ - AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, Statistics, -}; +use datafusion::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr, Statistics}; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_common::config::TableParquetOptions; @@ -362,7 +360,7 @@ fn rountrip_aggregate() -> Result<()> { .alias("NTH_VALUE(b, 1)") .build()?; - let test_cases: Vec>> = vec![ + let test_cases: Vec>> = vec![ // AVG vec![avg_expr], // NTH_VALUE @@ -395,7 +393,7 @@ fn rountrip_aggregate_with_limit() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = + let aggregates: Vec> = vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -424,7 +422,7 @@ fn rountrip_aggregate_with_approx_pencentile_cont() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = vec![AggregateExprBuilder::new( + let aggregates: Vec> = vec![AggregateExprBuilder::new( approx_percentile_cont_udaf(), vec![col("b", &schema)?, lit(0.5)], ) @@ -459,7 +457,7 @@ fn rountrip_aggregate_with_sort() -> Result<()> { }, }]; - let aggregates: Vec> = + let aggregates: Vec> = vec![ AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -526,7 +524,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = + let aggregates: Vec> = vec![ AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -748,7 +746,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { } impl PartialEq for CustomPredicateExpr { fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) + other .downcast_ref::() .map(|x| self.inner.eq(&x.inner)) .unwrap_or(false)