Skip to content

docs: improve the documentation for Aggregate code #12617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use datafusion_physical_expr::binary_map::OutputType;

use hashbrown::raw::RawTable;

/// Compare GroupValue Rows column by column
/// A [`GroupValues`] that stores multiple columns of group values.
///
///
pub struct GroupValuesColumn {
/// The output schema
schema: SchemaRef,
Expand All @@ -55,8 +57,13 @@ pub struct GroupValuesColumn {
map_size: usize,

/// The actual group by values, stored column-wise. Compare from
/// the left to right, each column is stored as `ArrayRowEq`.
/// This is shown faster than the row format
/// the left to right, each column is stored as [`GroupColumn`].
///
/// Performance tests showed that this design is faster than using the
/// more general purpose [`GroupValuesRows`]. See the ticket for details:
/// <https://github.com/apache/datafusion/pull/12269>
///
/// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows
group_values: Vec<Box<dyn GroupColumn>>,

/// reused buffer to store hashes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ use std::vec;

use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};

/// Trait for group values column-wise row comparison
/// Trait for storing a single column of group values in [`GroupValuesColumn`]
///
/// Implementations of this trait store a in-progress collection of group values
/// Implementations of this trait store an in-progress collection of group values
/// (similar to various builders in Arrow-rs) that allow for quick comparison to
/// incoming rows.
///
/// [`GroupValuesColumn`]: crate::aggregates::group_values::GroupValuesColumn
pub trait GroupColumn: Send + Sync {
/// Returns equal if the row stored in this builder at `lhs_row` is equal to
/// the row in `array` at `rhs_row`
Expand All @@ -60,11 +61,13 @@ pub trait GroupColumn: Send + Sync {
fn take_n(&mut self, n: usize) -> ArrayRef;
}

/// An implementation of [`GroupColumn`] for primitive types.
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
group_values: Vec<T::Native>,
nulls: Vec<bool>,
// whether the array contains at least one null, for fast non-null path
/// whether the array contains at least one null, for fast non-null path
has_null: bool,
/// Can the input array contain nulls?
nullable: bool,
}

Expand Down Expand Up @@ -154,13 +157,14 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
}
}

/// An implementation of [`GroupColumn`] for binary and utf8 types.
pub struct ByteGroupValueBuilder<O>
where
O: OffsetSizeTrait,
{
output_type: OutputType,
buffer: BufferBuilder<u8>,
/// Offsets into `buffer` for each distinct value. These offsets as used
/// Offsets into `buffer` for each distinct value. These offsets as used
/// directly to create the final `GenericBinaryArray`. The `i`th string is
/// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
/// are stored as a zero length string.
Expand Down
54 changes: 50 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`GroupValues`] trait for storing and interning group keys

use arrow::record_batch::RecordBatch;
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
Expand All @@ -37,18 +39,61 @@ use datafusion_physical_expr::binary_map::OutputType;

mod group_column;

/// An interning store for group keys
/// Stores the group values during hash aggregation.
///
/// # Background
///
/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values
/// identify each group, and correspond to all the distinct values of `(a,b)`.
///
/// ```sql
/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups")
/// create table t(a int, b varchar)
/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');
///
/// select a, b, count(*) from t group by a, b;
/// ----
/// 1 a 2
/// 2 b 1
/// 3 c 1
/// ```
///
/// # Design
///
/// Managing group values is a performance critical operation in hash
/// aggregation. The major operations are:
///
/// 1. Intern: Quickly finding existing and adding new group values
/// 2. Emit: Returning the group values as an array
///
/// There are multiple specialized implementations of this trait optimized for
/// different data types and number of columns, optimized for these operations.
/// See [`new_group_values`] for details.
///
/// # Group Ids
///
/// Each distinct group in a hash aggregation is identified by a unique group id
/// (usize) which is assigned by instances of this trait. Group ids are
/// continuous without gaps, starting from 0.
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
/// Calculates the group id for each input row of `cols`, assigning new
/// group ids as necessary.
///
/// When the function returns, `groups` must contain the group id for each
/// row in `cols`.
///
/// If a row has the same value as a previous row, the same group id is
/// assigned. If a row has a new value, the next available group id is
/// assigned.
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;

/// Returns the number of bytes used by this [`GroupValues`]
/// Returns the number of bytes of memory used by this [`GroupValues`]
fn size(&self) -> usize;

/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;

/// The number of values stored in this [`GroupValues`]
/// The number of values (distinct group values) stored in this [`GroupValues`]
fn len(&self) -> usize;

/// Emits the group values
Expand All @@ -58,6 +103,7 @@ pub trait GroupValues: Send {
fn clear_shrink(&mut self, batch: &RecordBatch);
}

/// Return a specialized implementation of [`GroupValues`] for the given schema.
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();
Expand Down
10 changes: 9 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ use hashbrown::raw::RawTable;
use std::sync::Arc;

/// A [`GroupValues`] making use of [`Rows`]
///
/// This is a general implementation of [`GroupValues`] that works for any
/// combination of data types and number of columns, including nested types such as
/// structs and lists.
///
/// It uses the arrow-rs [`Rows`] to store the group values, which is a row-wise
/// representation.
pub struct GroupValuesRows {
/// The output schema
schema: SchemaRef,
Expand Down Expand Up @@ -220,7 +227,8 @@ impl GroupValues for GroupValuesRows {
}
};

// TODO: Materialize dictionaries in group keys (#7647)
// TODO: Materialize dictionaries in group keys
// https://github.com/apache/datafusion/issues/7647
for (field, array) in self.schema.fields.iter().zip(&mut output) {
let expected = field.data_type();
*array = dictionary_encode_if_necessary(
Expand Down