Skip to content

Commit ba4488f

Browse files
authored
docs: improve the documentation for Aggregate code (#12617)
* docs: improve the documentation for Aggregate code * Add new example, fix referneces
1 parent f1aa27f commit ba4488f

File tree

4 files changed

+77
-12
lines changed

4 files changed

+77
-12
lines changed

datafusion/physical-plan/src/aggregates/group_values/column.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ use datafusion_physical_expr::binary_map::OutputType;
3636

3737
use hashbrown::raw::RawTable;
3838

39-
/// Compare GroupValue Rows column by column
39+
/// A [`GroupValues`] that stores multiple columns of group values.
40+
///
41+
///
4042
pub struct GroupValuesColumn {
4143
/// The output schema
4244
schema: SchemaRef,
@@ -55,8 +57,13 @@ pub struct GroupValuesColumn {
5557
map_size: usize,
5658

5759
/// The actual group by values, stored column-wise. Compare from
58-
/// the left to right, each column is stored as `ArrayRowEq`.
59-
/// This is shown faster than the row format
60+
/// the left to right, each column is stored as [`GroupColumn`].
61+
///
62+
/// Performance tests showed that this design is faster than using the
63+
/// more general purpose [`GroupValuesRows`]. See the ticket for details:
64+
/// <https://github.com/apache/datafusion/pull/12269>
65+
///
66+
/// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows
6067
group_values: Vec<Box<dyn GroupColumn>>,
6168

6269
/// reused buffer to store hashes

datafusion/physical-plan/src/aggregates/group_values/group_column.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ use std::vec;
3737

3838
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
3939

40-
/// Trait for group values column-wise row comparison
40+
/// Trait for storing a single column of group values in [`GroupValuesColumn`]
4141
///
42-
/// Implementations of this trait store a in-progress collection of group values
42+
/// Implementations of this trait store an in-progress collection of group values
4343
/// (similar to various builders in Arrow-rs) that allow for quick comparison to
4444
/// incoming rows.
4545
///
46+
/// [`GroupValuesColumn`]: crate::aggregates::group_values::GroupValuesColumn
4647
pub trait GroupColumn: Send + Sync {
4748
/// Returns equal if the row stored in this builder at `lhs_row` is equal to
4849
/// the row in `array` at `rhs_row`
@@ -60,11 +61,13 @@ pub trait GroupColumn: Send + Sync {
6061
fn take_n(&mut self, n: usize) -> ArrayRef;
6162
}
6263

64+
/// An implementation of [`GroupColumn`] for primitive types.
6365
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
6466
group_values: Vec<T::Native>,
6567
nulls: Vec<bool>,
66-
// whether the array contains at least one null, for fast non-null path
68+
/// whether the array contains at least one null, for fast non-null path
6769
has_null: bool,
70+
/// Can the input array contain nulls?
6871
nullable: bool,
6972
}
7073

@@ -154,13 +157,14 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
154157
}
155158
}
156159

160+
/// An implementation of [`GroupColumn`] for binary and utf8 types.
157161
pub struct ByteGroupValueBuilder<O>
158162
where
159163
O: OffsetSizeTrait,
160164
{
161165
output_type: OutputType,
162166
buffer: BufferBuilder<u8>,
163-
/// Offsets into `buffer` for each distinct value. These offsets as used
167+
/// Offsets into `buffer` for each distinct value. These offsets as used
164168
/// directly to create the final `GenericBinaryArray`. The `i`th string is
165169
/// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
166170
/// are stored as a zero length string.

datafusion/physical-plan/src/aggregates/group_values/mod.rs

+50-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! [`GroupValues`] trait for storing and interning group keys
19+
1820
use arrow::record_batch::RecordBatch;
1921
use arrow_array::{downcast_primitive, ArrayRef};
2022
use arrow_schema::{DataType, SchemaRef};
@@ -37,18 +39,61 @@ use datafusion_physical_expr::binary_map::OutputType;
3739

3840
mod group_column;
3941

40-
/// An interning store for group keys
42+
/// Stores the group values during hash aggregation.
43+
///
44+
/// # Background
45+
///
46+
/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values
47+
/// identify each group, and correspond to all the distinct values of `(a,b)`.
48+
///
49+
/// ```sql
50+
/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups")
51+
/// create table t(a int, b varchar)
52+
/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');
53+
///
54+
/// select a, b, count(*) from t group by a, b;
55+
/// ----
56+
/// 1 a 2
57+
/// 2 b 1
58+
/// 3 c 1
59+
/// ```
60+
///
61+
/// # Design
62+
///
63+
/// Managing group values is a performance critical operation in hash
64+
/// aggregation. The major operations are:
65+
///
66+
/// 1. Intern: Quickly finding existing and adding new group values
67+
/// 2. Emit: Returning the group values as an array
68+
///
69+
/// There are multiple specialized implementations of this trait optimized for
70+
/// different data types and number of columns, optimized for these operations.
71+
/// See [`new_group_values`] for details.
72+
///
73+
/// # Group Ids
74+
///
75+
/// Each distinct group in a hash aggregation is identified by a unique group id
76+
/// (usize) which is assigned by instances of this trait. Group ids are
77+
/// continuous without gaps, starting from 0.
4178
pub trait GroupValues: Send {
42-
/// Calculates the `groups` for each input row of `cols`
79+
/// Calculates the group id for each input row of `cols`, assigning new
80+
/// group ids as necessary.
81+
///
82+
/// When the function returns, `groups` must contain the group id for each
83+
/// row in `cols`.
84+
///
85+
/// If a row has the same value as a previous row, the same group id is
86+
/// assigned. If a row has a new value, the next available group id is
87+
/// assigned.
4388
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
4489

45-
/// Returns the number of bytes used by this [`GroupValues`]
90+
/// Returns the number of bytes of memory used by this [`GroupValues`]
4691
fn size(&self) -> usize;
4792

4893
/// Returns true if this [`GroupValues`] is empty
4994
fn is_empty(&self) -> bool;
5095

51-
/// The number of values stored in this [`GroupValues`]
96+
/// The number of values (distinct group values) stored in this [`GroupValues`]
5297
fn len(&self) -> usize;
5398

5499
/// Emits the group values
@@ -58,6 +103,7 @@ pub trait GroupValues: Send {
58103
fn clear_shrink(&mut self, batch: &RecordBatch);
59104
}
60105

106+
/// Return a specialized implementation of [`GroupValues`] for the given schema.
61107
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
62108
if schema.fields.len() == 1 {
63109
let d = schema.fields[0].data_type();

datafusion/physical-plan/src/aggregates/group_values/row.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ use hashbrown::raw::RawTable;
3030
use std::sync::Arc;
3131

3232
/// A [`GroupValues`] making use of [`Rows`]
33+
///
34+
/// This is a general implementation of [`GroupValues`] that works for any
35+
/// combination of data types and number of columns, including nested types such as
36+
/// structs and lists.
37+
///
38+
/// It uses the arrow-rs [`Rows`] to store the group values, which is a row-wise
39+
/// representation.
3340
pub struct GroupValuesRows {
3441
/// The output schema
3542
schema: SchemaRef,
@@ -220,7 +227,8 @@ impl GroupValues for GroupValuesRows {
220227
}
221228
};
222229

223-
// TODO: Materialize dictionaries in group keys (#7647)
230+
// TODO: Materialize dictionaries in group keys
231+
// https://github.com/apache/datafusion/issues/7647
224232
for (field, array) in self.schema.fields.iter().zip(&mut output) {
225233
let expected = field.data_type();
226234
*array = dictionary_encode_if_necessary(

0 commit comments

Comments
 (0)