From 695d29c503c536f3035260e588d3d838803dd7af Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 4 Nov 2024 23:10:11 +0800 Subject: [PATCH] unify `VectorizedGroupValuesColumn` and `GroupValuesColumn`. --- .../src/aggregates/group_values/column.rs | 678 ++++++++---------- .../src/aggregates/group_values/mod.rs | 10 +- 2 files changed, 288 insertions(+), 400 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index fe37329ddf9a..0b3304648a26 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -88,7 +88,7 @@ impl GroupIndexView { /// A [`GroupValues`] that stores multiple columns of group values, /// and supports vectorized operators for them /// -pub struct VectorizedGroupValuesColumn { +pub struct VectorizedGroupValuesColumn { /// The output schema schema: SchemaRef, @@ -180,7 +180,11 @@ impl VectorizedOperationBuffers { } } -impl VectorizedGroupValuesColumn { +impl VectorizedGroupValuesColumn { + // ======================================================================== + // Initialization functions + // ======================================================================== + /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result { let map = RawTable::with_capacity(0); @@ -197,6 +201,207 @@ impl VectorizedGroupValuesColumn { }) } + // ======================================================================== + // Scalarized intern + // ======================================================================== + + /// Scalarized intern + /// + /// This is used only for `streaming aggregation`, + /// because it depends on the order between `input rows` and their corresponding + /// `group indices`. + /// + /// For example, assuming `input rows` in `cols` with 4 new rows + /// (not equal to `exist rows` in `group_values`, and need to create + /// new groups for them): + /// + /// ```text + /// row1 (hash collision with the exist rows) + /// row2 + /// row3 (hash collision with the exist rows) + /// row4 + /// ``` + /// + /// # In [`GroupValuesColumn`], their `group indices` will be + /// + /// ```text + /// row1 --> 0 + /// row2 --> 1 + /// row3 --> 2 + /// row4 --> 3 + /// ``` + /// + /// `Group indices` order agrees with their input order, and the `streaming aggregation` + /// depends on this. + /// + /// # However In [`VectorizedGroupValuesColumn`], their `group indices` will be + /// + /// ```text + /// row1 --> 2 + /// row2 --> 0 + /// row3 --> 3 + /// row4 --> 1 + /// ``` + /// + /// `Group indices` order are against with their input order, and this will lead to error + /// in `streaming aggregation`. + /// + fn scalarized_intern( + &mut self, + cols: &[ArrayRef], + groups: &mut Vec, + ) -> Result<()> { + let n_rows = cols[0].len(); + + // tracks to which group each of the input rows belongs + groups.clear(); + + // 1.1 Calculate the group keys for the group values + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, batch_hashes)?; + + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = self + .map + .get_mut(target_hash, |(exist_hash, group_idx_view)| { + // It is ensured to be inlined in `scalarized_intern` + debug_assert!(!group_idx_view.is_non_inlined()); + + // Somewhat surprisingly, this closure can be called even if the + // hash doesn't match, so check the hash first with an integer + // comparison first avoid the more expensive comparison with + // group value. https://github.com/apache/datafusion/pull/11718 + if target_hash != *exist_hash { + return false; + } + + fn check_row_equal( + array_row: &dyn GroupColumn, + lhs_row: usize, + array: &ArrayRef, + rhs_row: usize, + ) -> bool { + array_row.equal_to(lhs_row, array, rhs_row) + } + + for (i, group_val) in self.group_values.iter().enumerate() { + if !check_row_equal( + group_val.as_ref(), + group_idx_view.value() as usize, + &cols[i], + row, + ) { + return false; + } + } + + true + }); + + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx_view)) => group_idx_view.value() as usize, + // 1.2 Need to create new entry for the group + None => { + // Add new entry to aggr_state and save newly created index + // let group_idx = group_values.num_rows(); + // group_values.push(group_rows.row(row)); + + let mut checklen = 0; + let group_idx = self.group_values[0].len(); + for (i, group_value) in self.group_values.iter_mut().enumerate() { + group_value.append_val(&cols[i], row); + let len = group_value.len(); + if i == 0 { + checklen = len; + } else { + debug_assert_eq!(checklen, len); + } + } + + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, GroupIndexView::new_inlined(group_idx as u64)), + |(hash, _group_index)| *hash, + &mut self.map_size, + ); + group_idx + } + }; + groups.push(group_idx); + } + + Ok(()) + } + + // ======================================================================== + // Vectorized intern + // ======================================================================== + + /// Vectorized intern + /// + /// This is used in `non-streaming aggregation` without requiring the order between + /// rows in `cols` and corresponding groups in `group_values`. + /// + /// The vectorized approach can offer higher performance for avoiding row by row + /// downcast for `cols` and being able to implement even more optimizations(like simd). + /// + fn vectorized_intern( + &mut self, + cols: &[ArrayRef], + groups: &mut Vec, + ) -> Result<()> { + let n_rows = cols[0].len(); + + // tracks to which group each of the input rows belongs + groups.clear(); + groups.resize(n_rows, usize::MAX); + + let mut batch_hashes = mem::take(&mut self.hashes_buffer); + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, &mut batch_hashes)?; + + // General steps for one round `vectorized equal_to & append`: + // 1. Collect vectorized context by checking hash values of `cols` in `map`, + // mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices` + // + // 2. Perform `vectorized_append` for `vectorized_append_row_indices`. + // `vectorized_append` must be performed before `vectorized_equal_to`, + // because some `group indices` in `vectorized_equal_to_group_indices` + // maybe still point to no actual values in `group_values` before performing append. + // + // 3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices`. If found some rows in input `cols` + // not equal to `exist rows` in `group_values`, place them in `scalarized_indices` + // and perform `scalarized_intern` for them similar as what in [`GroupValuesColumn`] + // after. + // + // 4. Perform `scalarized_intern` for rows mentioned above, when we process like this + // can see the comments of `scalarized_intern`. + // + + // 1. Collect vectorized context by checking hash values of `cols` in `map` + self.collect_vectorized_process_context(&batch_hashes, groups); + + // 2. Perform `vectorized_append` + self.vectorized_append(cols); + + // 3. Perform `vectorized_equal_to` + self.vectorized_equal_to(cols, groups); + + // 4. Perform scalarized inter for remaining rows + // (about remaining rows, can see comments for `remaining_rows`) + self.scalarized_intern_remaining(cols, &batch_hashes, groups); + + self.hashes_buffer = batch_hashes; + + Ok(()) + } + /// Collect vectorized context by checking hash values of `cols` in `map` /// /// 1. If bucket not found @@ -437,7 +642,7 @@ impl VectorizedGroupValuesColumn { /// In most situations, `scalarized_indices` will found to be empty after finishing to /// preform `vectorized_equal_to`. /// - fn scalarized_intern( + fn scalarized_intern_remaining( &mut self, cols: &[ArrayRef], batch_hashes: &[u64], @@ -471,7 +676,7 @@ impl VectorizedGroupValuesColumn { }; // Perform scalarized equal to - if self.scalarized_equal_to(group_index_view, cols, row, groups) { + if self.scalarized_equal_to_remaining(group_index_view, cols, row, groups) { // Found the row actually exists in group values, // don't need to create new group for it. continue; @@ -520,7 +725,7 @@ impl VectorizedGroupValuesColumn { self.map = map; } - fn scalarized_equal_to( + fn scalarized_equal_to_remaining( &self, group_index_view: &GroupIndexView, cols: &[ArrayRef], @@ -613,10 +818,8 @@ macro_rules! instantiate_primitive { }; } -impl GroupValues for VectorizedGroupValuesColumn { +impl GroupValues for VectorizedGroupValuesColumn { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - let n_rows = cols[0].len(); - if self.group_values.is_empty() { let mut v = Vec::with_capacity(cols.len()); @@ -673,50 +876,11 @@ impl GroupValues for VectorizedGroupValuesColumn { self.group_values = v; } - // tracks to which group each of the input rows belongs - groups.clear(); - groups.resize(n_rows, usize::MAX); - - let mut batch_hashes = mem::take(&mut self.hashes_buffer); - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, &mut batch_hashes)?; - - // General steps for one round `vectorized equal_to & append`: - // 1. Collect vectorized context by checking hash values of `cols` in `map`, - // mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices` - // and `vectorized_equal_to_group_indices` - // - // 2. Perform `vectorized_append` for `vectorized_append_row_indices`. - // `vectorized_append` must be performed before `vectorized_equal_to`, - // because some `group indices` in `vectorized_equal_to_group_indices` - // maybe still point to no actual values in `group_values` before performing append. - // - // 3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices` - // and `vectorized_equal_to_group_indices`. If found some rows in input `cols` - // not equal to `exist rows` in `group_values`, place them in `scalarized_indices` - // and perform `scalarized_intern` for them similar as what in [`GroupValuesColumn`] - // after. - // - // 4. Perform `scalarized_intern` for rows mentioned above, when we process like this - // can see the comments of `scalarized_intern`. - // - - // 1. Collect vectorized context by checking hash values of `cols` in `map` - self.collect_vectorized_process_context(&batch_hashes, groups); - - // 2. Perform `vectorized_append` - self.vectorized_append(cols); - - // 3. Perform `vectorized_equal_to` - self.vectorized_equal_to(cols, groups); - - // 4. Perform `scalarized_intern` - self.scalarized_intern(cols, &batch_hashes, groups); - - self.hashes_buffer = batch_hashes; - - Ok(()) + if !STREAMING { + self.vectorized_intern(cols, groups) + } else { + self.scalarized_intern(cols, groups) + } } fn size(&self) -> usize { @@ -758,358 +922,72 @@ impl GroupValues for VectorizedGroupValuesColumn { // SAFETY: self.map outlives iterator and is not modified concurrently unsafe { for bucket in self.map.iter() { - // Check if it is `inlined` or `non-inlined` - if bucket.as_ref().1.is_non_inlined() { - // Non-inlined case - // We take `group_index_list` from `old_group_index_lists` - - // list_offset is incrementally - self.emit_group_index_list_buffer.clear(); - let list_offset = bucket.as_ref().1.value() as usize; - for group_index in self.group_index_lists[list_offset].iter() - { - if let Some(remaining) = group_index.checked_sub(n) { - self.emit_group_index_list_buffer.push(remaining); + // In non-streaming case, we need to check if the `group index view` + // is `inlined` or `non-inlined` + if !STREAMING { + if bucket.as_ref().1.is_non_inlined() { + // Non-inlined case + // We take `group_index_list` from `old_group_index_lists` + + // list_offset is incrementally + self.emit_group_index_list_buffer.clear(); + let list_offset = bucket.as_ref().1.value() as usize; + for group_index in + self.group_index_lists[list_offset].iter() + { + if let Some(remaining) = group_index.checked_sub(n) { + self.emit_group_index_list_buffer.push(remaining); + } } - } - // The possible results: - // - `new_group_index_list` is empty, we should erase this bucket - // - only one value in `new_group_index_list`, switch the `view` to `inlined` - // - still multiple values in `new_group_index_list`, build and set the new `unlined view` - if self.emit_group_index_list_buffer.is_empty() { - self.map.erase(bucket); - } else if self.emit_group_index_list_buffer.len() == 1 { - let group_index = - self.emit_group_index_list_buffer.first().unwrap(); - bucket.as_mut().1 = - GroupIndexView::new_inlined(*group_index as u64); - } else { - let group_index_list = - &mut self.group_index_lists[next_new_list_offset]; - group_index_list.clear(); - group_index_list - .extend(self.emit_group_index_list_buffer.iter()); - bucket.as_mut().1 = GroupIndexView::new_non_inlined( - next_new_list_offset as u64, - ); - next_new_list_offset += 1; - } - } else { - // Inlined case, we just decrement group index by n - let group_index = bucket.as_ref().1.value() as usize; - match group_index.checked_sub(n) { - // Group index was >= n, shift value down - Some(sub) => { + // The possible results: + // - `new_group_index_list` is empty, we should erase this bucket + // - only one value in `new_group_index_list`, switch the `view` to `inlined` + // - still multiple values in `new_group_index_list`, build and set the new `unlined view` + if self.emit_group_index_list_buffer.is_empty() { + self.map.erase(bucket); + } else if self.emit_group_index_list_buffer.len() == 1 { + let group_index = self + .emit_group_index_list_buffer + .first() + .unwrap(); bucket.as_mut().1 = - GroupIndexView::new_inlined(sub as u64) + GroupIndexView::new_inlined(*group_index as u64); + } else { + let group_index_list = + &mut self.group_index_lists[next_new_list_offset]; + group_index_list.clear(); + group_index_list + .extend(self.emit_group_index_list_buffer.iter()); + bucket.as_mut().1 = GroupIndexView::new_non_inlined( + next_new_list_offset as u64, + ); + next_new_list_offset += 1; } - // Group index was < n, so remove from table - None => self.map.erase(bucket), - } - } - } - } - self.group_index_lists.truncate(next_new_list_offset); - - output - } - }; - - // TODO: Materialize dictionaries in group keys (#7647) - for (field, array) in self.schema.fields.iter().zip(&mut output) { - let expected = field.data_type(); - if let DataType::Dictionary(_, v) = expected { - let actual = array.data_type(); - if v.as_ref() != actual { - return Err(DataFusionError::Internal(format!( - "Converted group rows expected dictionary of {v} got {actual}" - ))); - } - *array = cast(array.as_ref(), expected)?; - } - } - - Ok(output) - } - - fn clear_shrink(&mut self, batch: &RecordBatch) { - let count = batch.num_rows(); - self.group_values.clear(); - self.map.clear(); - self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared - self.map_size = self.map.capacity() * size_of::<(u64, usize)>(); - self.hashes_buffer.clear(); - self.hashes_buffer.shrink_to(count); - self.group_index_lists.clear(); - self.emit_group_index_list_buffer.clear(); - self.vectorized_operation_buffers.clear(); - } -} - -/// A [`GroupValues`] that stores multiple columns of group values, -/// and supports scalarized operators for them -/// -/// This scalarized implementation is used only for `streaming aggregation`, -/// because it depends on the order between `input rows` and their corresponding -/// `group indices`. -/// -/// For example, assuming a `input rows` with 4 new rows -/// (not equal to `exist rows` in `group_values`, and need to create -/// new groups for them): -/// -/// ```text -/// row1 (hash collision with the exist rows) -/// row2 -/// row3 (hash collision with the exist rows) -/// row4 -/// ``` -/// -/// # In [`GroupValuesColumn`], their `group indices` will be -/// -/// ```text -/// row1 --> 0 -/// row2 --> 1 -/// row3 --> 2 -/// row4 --> 3 -/// ``` -/// -/// `Group indices` order agrees with their input order, and the `streaming aggregation` -/// depends on this. -/// -/// # However In [`VectorizedGroupValuesColumn`], their `group indices` will be -/// -/// ```text -/// row1 --> 2 -/// row2 --> 0 -/// row3 --> 3 -/// row4 --> 1 -/// ``` -/// -/// `Group indices` order are against with their input order, and this will lead to error -/// in `streaming aggregation`. -/// -pub struct GroupValuesColumn { - /// The output schema - schema: SchemaRef, - - /// Logically maps group values to a group_index in - /// [`Self::group_values`] and in each accumulator - /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys (group values) in the table - /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, group_index) - map: RawTable<(u64, usize)>, - - /// The size of `map` in bytes - map_size: usize, - - /// The actual group by values, stored column-wise. Compare from - /// the left to right, each column is stored as [`GroupColumn`]. - /// - /// Performance tests showed that this design is faster than using the - /// more general purpose [`GroupValuesRows`]. See the ticket for details: - /// - /// - /// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows - group_values: Vec>, - - /// reused buffer to store hashes - hashes_buffer: Vec, - - /// Random state for creating hashes - random_state: RandomState, -} - -impl GroupValuesColumn { - /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef) -> Result { - let map = RawTable::with_capacity(0); - Ok(Self { - schema, - map, - map_size: 0, - group_values: vec![], - hashes_buffer: Default::default(), - random_state: Default::default(), - }) - } -} - -impl GroupValues for GroupValuesColumn { - fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - let n_rows = cols[0].len(); - - if self.group_values.is_empty() { - let mut v = Vec::with_capacity(cols.len()); - - for f in self.schema.fields().iter() { - let nullable = f.is_nullable(); - match f.data_type() { - &DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type), - &DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type), - &DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type), - &DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type), - &DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type), - &DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type), - &DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type), - &DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type), - &DataType::Float32 => { - instantiate_primitive!(v, nullable, Float32Type) - } - &DataType::Float64 => { - instantiate_primitive!(v, nullable, Float64Type) - } - &DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), - &DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), - &DataType::Utf8 => { - let b = ByteGroupValueBuilder::::new(OutputType::Utf8); - v.push(Box::new(b) as _) - } - &DataType::LargeUtf8 => { - let b = ByteGroupValueBuilder::::new(OutputType::Utf8); - v.push(Box::new(b) as _) - } - &DataType::Binary => { - let b = ByteGroupValueBuilder::::new(OutputType::Binary); - v.push(Box::new(b) as _) - } - &DataType::LargeBinary => { - let b = ByteGroupValueBuilder::::new(OutputType::Binary); - v.push(Box::new(b) as _) - } - dt => { - return not_impl_err!("{dt} not supported in GroupValuesColumn") - } - } - } - self.group_values = v; - } - - // tracks to which group each of the input rows belongs - groups.clear(); - - // 1.1 Calculate the group keys for the group values - let batch_hashes = &mut self.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, batch_hashes)?; - - for (row, &target_hash) in batch_hashes.iter().enumerate() { - let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { - // Somewhat surprisingly, this closure can be called even if the - // hash doesn't match, so check the hash first with an integer - // comparison first avoid the more expensive comparison with - // group value. https://github.com/apache/datafusion/pull/11718 - if target_hash != *exist_hash { - return false; - } - - fn check_row_equal( - array_row: &dyn GroupColumn, - lhs_row: usize, - array: &ArrayRef, - rhs_row: usize, - ) -> bool { - array_row.equal_to(lhs_row, array, rhs_row) - } - - for (i, group_val) in self.group_values.iter().enumerate() { - if !check_row_equal(group_val.as_ref(), *group_idx, &cols[i], row) { - return false; - } - } - - true - }); - - let group_idx = match entry { - // Existing group_index for this group value - Some((_hash, group_idx)) => *group_idx, - // 1.2 Need to create new entry for the group - None => { - // Add new entry to aggr_state and save newly created index - // let group_idx = group_values.num_rows(); - // group_values.push(group_rows.row(row)); - - let mut checklen = 0; - let group_idx = self.group_values[0].len(); - for (i, group_value) in self.group_values.iter_mut().enumerate() { - group_value.append_val(&cols[i], row); - let len = group_value.len(); - if i == 0 { - checklen = len; - } else { - debug_assert_eq!(checklen, len); + continue; + } } - } - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (target_hash, group_idx), - |(hash, _group_index)| *hash, - &mut self.map_size, - ); - group_idx - } - }; - groups.push(group_idx); - } - - Ok(()) - } - - fn size(&self) -> usize { - let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum(); - group_values_size + self.map_size + self.hashes_buffer.allocated_size() - } - - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn len(&self) -> usize { - if self.group_values.is_empty() { - return 0; - } - - self.group_values[0].len() - } - - fn emit(&mut self, emit_to: EmitTo) -> Result> { - let mut output = match emit_to { - EmitTo::All => { - let group_values = mem::take(&mut self.group_values); - debug_assert!(self.group_values.is_empty()); - - group_values - .into_iter() - .map(|v| v.build()) - .collect::>() - } - EmitTo::First(n) => { - let output = self - .group_values - .iter_mut() - .map(|v| v.take_n(n)) - .collect::>(); + // In `streaming case`, the `group index view` is ensured to be `inlined` + debug_assert!(!bucket.as_ref().1.is_non_inlined()); - // SAFETY: self.map outlives iterator and is not modified concurrently - unsafe { - for bucket in self.map.iter() { - // Decrement group index by n - match bucket.as_ref().1.checked_sub(n) { + // Inlined case, we just decrement group index by n) + let group_index = bucket.as_ref().1.value() as usize; + match group_index.checked_sub(n) { // Group index was >= n, shift value down - Some(sub) => bucket.as_mut().1 = sub, + Some(sub) => { + bucket.as_mut().1 = + GroupIndexView::new_inlined(sub as u64) + } // Group index was < n, so remove from table None => self.map.erase(bucket), } } } + self.group_index_lists.truncate(next_new_list_offset); + output } }; @@ -1139,6 +1017,13 @@ impl GroupValues for GroupValuesColumn { self.map_size = self.map.capacity() * size_of::<(u64, usize)>(); self.hashes_buffer.clear(); self.hashes_buffer.shrink_to(count); + + // Such structure is only used in `non-streaming` case + if !STREAMING { + self.group_index_lists.clear(); + self.emit_group_index_list_buffer.clear(); + self.vectorized_operation_buffers.clear(); + } } } @@ -1152,12 +1037,10 @@ pub fn supported_schema(schema: &Schema) -> bool { .all(supported_type) } -/// Returns true if the specified data type is supported by -/// [`GroupValuesColumn`] or [`VectorizedGroupValuesColumn`] +/// Returns true if the specified data type is supported by [`GroupValuesColumn`] /// /// In order to be supported, there must be a specialized implementation of /// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`] -/// or [`VectorizedGroupValuesColumn::intern`] fn supported_type(data_type: &DataType) -> bool { matches!( *data_type, @@ -1202,7 +1085,7 @@ mod tests { fn test_intern_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); let mut group_values = - VectorizedGroupValuesColumn::try_new(data_set.schema()).unwrap(); + VectorizedGroupValuesColumn::::try_new(data_set.schema()).unwrap(); data_set.load_to_group_values(&mut group_values); let actual_batch = group_values.emit(EmitTo::All).unwrap(); @@ -1215,7 +1098,7 @@ mod tests { fn test_emit_first_n_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); let mut group_values = - VectorizedGroupValuesColumn::try_new(data_set.schema()).unwrap(); + VectorizedGroupValuesColumn::::try_new(data_set.schema()).unwrap(); // 1~num_rows times to emit the groups let num_rows = data_set.expected_batch.num_rows(); @@ -1266,7 +1149,8 @@ mod tests { let field = Field::new("item", DataType::Int32, true); let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); - let mut group_values = VectorizedGroupValuesColumn::try_new(schema).unwrap(); + let mut group_values = + VectorizedGroupValuesColumn::::try_new(schema).unwrap(); // Insert group index views and check if success to insert insert_inline_group_index_view(&mut group_values, 0, 0); @@ -1717,7 +1601,7 @@ mod tests { } fn insert_inline_group_index_view( - group_values: &mut VectorizedGroupValuesColumn, + group_values: &mut VectorizedGroupValuesColumn, hash_key: u64, group_index: u64, ) { @@ -1730,7 +1614,7 @@ mod tests { } fn insert_non_inline_group_index_view( - group_values: &mut VectorizedGroupValuesColumn, + group_values: &mut VectorizedGroupValuesColumn, hash_key: u64, group_indices: Vec, ) { diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index af1b82de6227..a678fce11eba 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -37,7 +37,7 @@ mod bytes_view; use bytes::GroupValuesByes; use datafusion_physical_expr::binary_map::OutputType; -use crate::aggregates::{group_values::column::GroupValuesColumn, order::GroupOrdering}; +use crate::aggregates::order::GroupOrdering; mod group_column; mod null_builder; @@ -150,9 +150,13 @@ pub fn new_group_values( if column::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(VectorizedGroupValuesColumn::try_new(schema)?)) + Ok(Box::new(VectorizedGroupValuesColumn::::try_new( + schema, + )?)) } else { - Ok(Box::new(GroupValuesColumn::try_new(schema)?)) + Ok(Box::new(VectorizedGroupValuesColumn::::try_new( + schema, + )?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?))