diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 977b40922f7c..907e453cf606 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -36,7 +36,9 @@ use datafusion_physical_expr::binary_map::OutputType; use hashbrown::raw::RawTable; -/// Compare GroupValue Rows column by column +/// A [`GroupValues`] that stores multiple columns of group values. +/// +/// pub struct GroupValuesColumn { /// The output schema schema: SchemaRef, @@ -55,8 +57,13 @@ pub struct GroupValuesColumn { map_size: usize, /// The actual group by values, stored column-wise. Compare from - /// the left to right, each column is stored as `ArrayRowEq`. - /// This is shown faster than the row format + /// the left to right, each column is stored as [`GroupColumn`]. + /// + /// Performance tests showed that this design is faster than using the + /// more general purpose [`GroupValuesRows`]. See the ticket for details: + /// + /// + /// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows group_values: Vec>, /// reused buffer to store hashes diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index a82e6d856c70..53b13243b755 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -37,12 +37,13 @@ use std::vec; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; -/// Trait for group values column-wise row comparison +/// Trait for storing a single column of group values in [`GroupValuesColumn`] /// -/// Implementations of this trait store a in-progress collection of group values +/// Implementations of this trait store an in-progress collection of group values /// (similar to various builders in Arrow-rs) that allow for quick comparison to /// incoming rows. /// +/// [`GroupValuesColumn`]: crate::aggregates::group_values::GroupValuesColumn pub trait GroupColumn: Send + Sync { /// Returns equal if the row stored in this builder at `lhs_row` is equal to /// the row in `array` at `rhs_row` @@ -60,11 +61,13 @@ pub trait GroupColumn: Send + Sync { fn take_n(&mut self, n: usize) -> ArrayRef; } +/// An implementation of [`GroupColumn`] for primitive types. pub struct PrimitiveGroupValueBuilder { group_values: Vec, nulls: Vec, - // whether the array contains at least one null, for fast non-null path + /// whether the array contains at least one null, for fast non-null path has_null: bool, + /// Can the input array contain nulls? nullable: bool, } @@ -154,13 +157,14 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } } +/// An implementation of [`GroupColumn`] for binary and utf8 types. pub struct ByteGroupValueBuilder where O: OffsetSizeTrait, { output_type: OutputType, buffer: BufferBuilder, - /// Offsets into `buffer` for each distinct value. These offsets as used + /// Offsets into `buffer` for each distinct value. These offsets as used /// directly to create the final `GenericBinaryArray`. The `i`th string is /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values /// are stored as a zero length string. diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 9256631fa578..5f13f5ca3259 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`GroupValues`] trait for storing and interning group keys + use arrow::record_batch::RecordBatch; use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::{DataType, SchemaRef}; @@ -37,18 +39,61 @@ use datafusion_physical_expr::binary_map::OutputType; mod group_column; -/// An interning store for group keys +/// Stores the group values during hash aggregation. +/// +/// # Background +/// +/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values +/// identify each group, and correspond to all the distinct values of `(a,b)`. +/// +/// ```sql +/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups") +/// create table t(a int, b varchar) +/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c'); +/// +/// select a, b, count(*) from t group by a, b; +/// ---- +/// 1 a 2 +/// 2 b 1 +/// 3 c 1 +/// ``` +/// +/// # Design +/// +/// Managing group values is a performance critical operation in hash +/// aggregation. The major operations are: +/// +/// 1. Intern: Quickly finding existing and adding new group values +/// 2. Emit: Returning the group values as an array +/// +/// There are multiple specialized implementations of this trait optimized for +/// different data types and number of columns, optimized for these operations. +/// See [`new_group_values`] for details. +/// +/// # Group Ids +/// +/// Each distinct group in a hash aggregation is identified by a unique group id +/// (usize) which is assigned by instances of this trait. Group ids are +/// continuous without gaps, starting from 0. pub trait GroupValues: Send { - /// Calculates the `groups` for each input row of `cols` + /// Calculates the group id for each input row of `cols`, assigning new + /// group ids as necessary. + /// + /// When the function returns, `groups` must contain the group id for each + /// row in `cols`. + /// + /// If a row has the same value as a previous row, the same group id is + /// assigned. If a row has a new value, the next available group id is + /// assigned. fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()>; - /// Returns the number of bytes used by this [`GroupValues`] + /// Returns the number of bytes of memory used by this [`GroupValues`] fn size(&self) -> usize; /// Returns true if this [`GroupValues`] is empty fn is_empty(&self) -> bool; - /// The number of values stored in this [`GroupValues`] + /// The number of values (distinct group values) stored in this [`GroupValues`] fn len(&self) -> usize; /// Emits the group values @@ -58,6 +103,7 @@ pub trait GroupValues: Send { fn clear_shrink(&mut self, batch: &RecordBatch); } +/// Return a specialized implementation of [`GroupValues`] for the given schema. pub fn new_group_values(schema: SchemaRef) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index b252d0008784..8ca88257bf1a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -30,6 +30,13 @@ use hashbrown::raw::RawTable; use std::sync::Arc; /// A [`GroupValues`] making use of [`Rows`] +/// +/// This is a general implementation of [`GroupValues`] that works for any +/// combination of data types and number of columns, including nested types such as +/// structs and lists. +/// +/// It uses the arrow-rs [`Rows`] to store the group values, which is a row-wise +/// representation. pub struct GroupValuesRows { /// The output schema schema: SchemaRef, @@ -220,7 +227,8 @@ impl GroupValues for GroupValuesRows { } }; - // TODO: Materialize dictionaries in group keys (#7647) + // TODO: Materialize dictionaries in group keys + // https://github.com/apache/datafusion/issues/7647 for (field, array) in self.schema.fields.iter().zip(&mut output) { let expected = field.data_type(); *array = dictionary_encode_if_necessary(