From b451abd2758440ccc41b4af7f06d3bd44f262c07 Mon Sep 17 00:00:00 2001 From: dmitrybugakov Date: Thu, 3 Oct 2024 13:06:47 +0200 Subject: [PATCH] avoid code duplication --- .../physical-expr-common/src/binary_map.rs | 285 ++++-------------- .../src/binary_view_map.rs | 219 ++++---------- .../src/aggregates/group_values/bytes.rs | 1 + .../src/aggregates/group_values/bytes_view.rs | 1 + 4 files changed, 108 insertions(+), 398 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index 9ff43a6c51564..2b053846aed6d 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -67,9 +67,14 @@ impl ArrowBytesSet { /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { fn make_payload_fn(_value: Option<&[u8]>) {} + fn update_payload_fn(_payload: &mut ()) {} fn observe_payload_fn(_payload: ()) {} - self.0 - .insert_if_new(values, make_payload_fn, observe_payload_fn); + self.0.insert_if_new( + values, + make_payload_fn, + update_payload_fn, + observe_payload_fn, + ); } /// Converts this set into a `StringArray`/`LargeStringArray` or @@ -277,8 +282,12 @@ where /// `make_payload_fn`: invoked for each value that is not already present /// to create the payload, in order of the values in `values` /// - /// `observe_payload_fn`: invoked once, for each value in `values`, that was - /// already present in the map, with corresponding payload value. + /// `update_payload_fn`: invoked when the value is already present in the map. + /// It receives a mutable reference to the existing payload, allowing it to modify it. + /// + /// `observe_payload_fn`: invoked for each value in `values` that was + /// already present in the map. If `update_payload_fn` is defined, it will be called + /// **after** the payload is updated. It is called with the corresponding payload value. /// /// # Returns /// @@ -287,15 +296,17 @@ where /// /// # Safety: /// - /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked + /// Note that `make_payload_fn`, `update_payload_fn` and `observe_payload_fn` are only invoked /// with valid values from `values`, not for the `NULL` value. - pub fn insert_if_new( + pub fn insert_if_new( &mut self, values: &ArrayRef, make_payload_fn: MP, + update_payload_fn: UP, observe_payload_fn: OP, ) where MP: FnMut(Option<&[u8]>) -> V, + UP: FnMut(&mut V), OP: FnMut(V), { // Sanity array type @@ -305,9 +316,10 @@ where values.data_type(), DataType::Binary | DataType::LargeBinary )); - self.insert_if_new_inner::>( + self.insert_if_new_inner::>( values, make_payload_fn, + update_payload_fn, observe_payload_fn, ) } @@ -316,9 +328,10 @@ where values.data_type(), DataType::Utf8 | DataType::LargeUtf8 )); - self.insert_if_new_inner::>( + self.insert_if_new_inner::>( values, make_payload_fn, + update_payload_fn, observe_payload_fn, ) } @@ -334,13 +347,15 @@ where /// simpler and understand and reducing code bloat due to duplication. /// /// See comments on `insert_if_new` for more details - fn insert_if_new_inner( + fn insert_if_new_inner( &mut self, values: &ArrayRef, mut make_payload_fn: MP, + mut update_payload_fn: UP, mut observe_payload_fn: OP, ) where MP: FnMut(Option<&[u8]>) -> V, + UP: FnMut(&mut V), OP: FnMut(V), B: ByteArrayType, { @@ -362,8 +377,10 @@ where for (value, &hash) in values.iter().zip(batch_hashes.iter()) { // handle null value let Some(value) = value else { - let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { - payload + let payload = if let Some((ref mut payload, _)) = self.null { + // Update the existing null payload + update_payload_fn(payload); + *payload } else { let payload = make_payload_fn(None); let null_index = self.offsets.len() - 1; @@ -397,6 +414,7 @@ where }); if let Some(entry) = entry { + update_payload_fn(&mut entry.payload); entry.payload } // if no existing entry, make a new one @@ -437,6 +455,7 @@ where }); if let Some(entry) = entry { + update_payload_fn(&mut entry.payload); entry.payload } // if no existing entry, make a new one @@ -475,200 +494,43 @@ where } } - /// Inserts each value from `values` into the map, invoking `make_payload_fn` for - /// each value if not already present, or `update_payload_fn` if the value already exists. + /// Retrieves the payloads for each value from `values`, either by using + /// small value optimizations or larger value handling. /// - /// This function handles both the insert and update cases. + /// This function will compute hashes for each value and attempt to retrieve + /// the corresponding payload from the map. If the value is not found, it will return `None`. /// /// # Arguments: /// - /// `values`: The array whose values are inserted or updated in the map. + /// `values`: The array whose payloads need to be retrieved. /// - /// `make_payload_fn`: Invoked for each value that is not already present - /// to create the payload, in the order of the values in `values`. + /// # Returns /// - /// `update_payload_fn`: Invoked for each value that is already present, - /// allowing the payload to be updated in-place. - pub fn insert_or_update( - &mut self, - values: &ArrayRef, - make_payload_fn: MP, - update_payload_fn: UP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - UP: FnMut(&mut V), - { - // Check the output type and dispatch to the appropriate internal function + /// A vector of payloads for each value, or `None` if the value is not found. + pub fn get_payloads(self, values: &ArrayRef) -> Vec> { match self.output_type { OutputType::Binary => { assert!(matches!( values.data_type(), DataType::Binary | DataType::LargeBinary )); - self.insert_or_update_inner::>( - values, - make_payload_fn, - update_payload_fn, - ) + self.get_payloads_inner::>(values) } OutputType::Utf8 => { assert!(matches!( values.data_type(), DataType::Utf8 | DataType::LargeUtf8 )); - self.insert_or_update_inner::>( - values, - make_payload_fn, - update_payload_fn, - ) + self.get_payloads_inner::>(values) } _ => unreachable!("View types should use `ArrowBytesViewMap`"), - }; - } - - /// Generic version of [`Self::insert_or_update`] that handles `ByteArrayType` - /// (both String and Binary). - /// - /// This is the only function that is generic on [`ByteArrayType`], which avoids having - /// to template the entire structure, simplifying the code and reducing code bloat due - /// to duplication. - /// - /// See comments on `insert_or_update` for more details. - fn insert_or_update_inner( - &mut self, - values: &ArrayRef, - mut make_payload_fn: MP, - mut update_payload_fn: UP, - ) where - MP: FnMut(Option<&[u8]>) -> V, // Function to create a new entry - UP: FnMut(&mut V), // Function to update an existing entry - B: ByteArrayType, - { - // Step 1: Compute hashes - let batch_hashes = &mut self.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(values.len(), 0); - create_hashes(&[values.clone()], &self.random_state, batch_hashes).unwrap(); // Compute the hashes for the values - - // Step 2: Insert or update each value - let values = values.as_bytes::(); - - assert_eq!(values.len(), batch_hashes.len()); // Ensure hash count matches value count - - for (value, &hash) in values.iter().zip(batch_hashes.iter()) { - // Handle null value - let Some(value) = value else { - if let Some((ref mut payload, _)) = self.null { - // If null is already present, update the payload - update_payload_fn(payload); - } else { - // Null value doesn't exist, so create a new one - let payload = make_payload_fn(None); - let null_index = self.offsets.len() - 1; - // Nulls need a zero length in the offset buffer - let offset = self.buffer.len(); - self.offsets.push(O::usize_as(offset)); - self.null = Some((payload, null_index)); - } - continue; - }; - - let value: &[u8] = value.as_ref(); - let value_len = O::usize_as(value.len()); - - // Small value optimization - if value.len() <= SHORT_VALUE_LEN { - let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x as usize); - - // Check if the value is already present in the set - let entry = self.map.get_mut(hash, |header| { - if header.len != value_len { - return false; - } - inline == header.offset_or_inline - }); - - if let Some(entry) = entry { - update_payload_fn(&mut entry.payload); - } else { - // Insert a new value if not found - self.buffer.append_slice(value); - self.offsets.push(O::usize_as(self.buffer.len())); - let payload = make_payload_fn(Some(value)); - let new_entry = Entry { - hash, - len: value_len, - offset_or_inline: inline, - payload, - }; - self.map.insert_accounted( - new_entry, - |header| header.hash, - &mut self.map_size, - ); - } - } else { - // Handle larger values - let entry = self.map.get_mut(hash, |header| { - if header.len != value_len { - return false; - } - let existing_value = - unsafe { self.buffer.as_slice().get_unchecked(header.range()) }; - value == existing_value - }); - - if let Some(entry) = entry { - update_payload_fn(&mut entry.payload); - } else { - // Insert a new large value if not found - let offset = self.buffer.len(); - self.buffer.append_slice(value); - self.offsets.push(O::usize_as(self.buffer.len())); - let payload = make_payload_fn(Some(value)); - let new_entry = Entry { - hash, - len: value_len, - offset_or_inline: offset, - payload, - }; - self.map.insert_accounted( - new_entry, - |header| header.hash, - &mut self.map_size, - ); - } - }; - } - - // Ensure no overflow in offsets - if O::from_usize(self.buffer.len()).is_none() { - panic!( - "Put {} bytes in buffer, more than can be represented by a {}", - self.buffer.len(), - type_name::() - ); } } /// Generic version of [`Self::get_payloads`] that handles `ByteArrayType` /// (both String and Binary). /// - /// This function computes the hashes for each value and retrieves the payloads - /// stored in the map, leveraging small value optimizations when possible. - /// - /// # Arguments: - /// - /// `values`: The array whose payloads are being retrieved. - /// - /// # Returns - /// - /// A vector of payloads for each value, or `None` if the value is not found. - /// - /// # Safety: - /// - /// This function ensures that small values are handled using inline optimization - /// and larger values are safely retrieved from the buffer. + /// See [`get_payloads`] for more details. fn get_payloads_inner(self, values: &ArrayRef) -> Vec> where B: ByteArrayType, @@ -732,44 +594,6 @@ where payloads } - /// Retrieves the payloads for each value from `values`, either by using - /// small value optimizations or larger value handling. - /// - /// This function will compute hashes for each value and attempt to retrieve - /// the corresponding payload from the map. If the value is not found, it will return `None`. - /// - /// # Arguments: - /// - /// `values`: The array whose payloads need to be retrieved. - /// - /// # Returns - /// - /// A vector of payloads for each value, or `None` if the value is not found. - /// - /// # Safety: - /// - /// This function handles both small and large values in a safe manner, though `unsafe` code is - /// used internally for performance optimization. - pub fn get_payloads(self, values: &ArrayRef) -> Vec> { - match self.output_type { - OutputType::Binary => { - assert!(matches!( - values.data_type(), - DataType::Binary | DataType::LargeBinary - )); - self.get_payloads_inner::>(values) - } - OutputType::Utf8 => { - assert!(matches!( - values.data_type(), - DataType::Utf8 | DataType::LargeUtf8 - )); - self.get_payloads_inner::>(values) - } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), - } - } - /// Converts this set into a `StringArray`, `LargeStringArray`, /// `BinaryArray`, or `LargeBinaryArray` containing each distinct value /// that was inserted. This is done without copying the values. @@ -1167,7 +991,7 @@ mod tests { } #[test] - fn test_insert_or_update_count_u8() { + fn test_update_payload_count_u8() { let input = vec![ Some("A"), Some("bcdefghijklmnop"), @@ -1187,12 +1011,13 @@ mod tests { let string_array = StringArray::from(input.clone()); let arr: ArrayRef = Arc::new(string_array); - map.insert_or_update( + map.insert_if_new( &arr, |_| 1u8, |count| { *count += 1; }, + |_| {}, ); let expected_counts = [ @@ -1213,14 +1038,11 @@ mod tests { let mut result_payload: Option = None; - map.insert_or_update( + map.insert_if_new( &arr, - |_| { - panic!("Unexpected new entry during verification"); - }, - |count| { - result_payload = Some(*count); - }, + |_| panic!("Unexpected new entry during verification"), + |count| result_payload = Some(*count), + |_| {}, ); if let Some(expected_count) = @@ -1233,7 +1055,7 @@ mod tests { } #[test] - fn test_insert_if_new_after_insert_or_update() { + fn test_observe_after_update() { let initial_values = StringArray::from(vec![ Some("A"), Some("B"), @@ -1245,18 +1067,19 @@ mod tests { let mut map: ArrowBytesMap = ArrowBytesMap::new(OutputType::Utf8); let arr: ArrayRef = Arc::new(initial_values); - map.insert_or_update( + map.insert_if_new( &arr, |_| 1u8, |count| { *count += 1; }, + |_| {}, ); let additional_values = StringArray::from(vec![Some("A"), Some("D"), Some("E")]); let arr_additional: ArrayRef = Arc::new(additional_values); - map.insert_if_new(&arr_additional, |_| 5u8, |_| {}); + map.insert_if_new(&arr_additional, |_| 5u8, |_| {}, |_| {}); let combined_arr = StringArray::from(vec![ Some("A"), @@ -1295,12 +1118,13 @@ mod tests { let string_array = StringArray::from(input.clone()); let arr: ArrayRef = Arc::new(string_array); - map.insert_or_update( + map.insert_if_new( &arr, |_| 1u8, |count| { *count += 1; }, + |_| {}, ); let expected_payloads = [ @@ -1418,6 +1242,7 @@ mod tests { seen_new_strings.push(value); TestPayload { index } }, + |_| {}, |payload| { seen_indexes.push(payload.index); }, diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index e7cc9c7724597..4d93848a146c8 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -43,9 +43,14 @@ impl ArrowBytesViewSet { /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { fn make_payload_fn(_value: Option<&[u8]>) {} + fn update_payload_fn(_payload: &mut ()) {} fn observe_payload_fn(_payload: ()) {} - self.0 - .insert_if_new(values, make_payload_fn, observe_payload_fn); + self.0.insert_if_new( + values, + make_payload_fn, + update_payload_fn, + observe_payload_fn, + ); } /// Return the contents of this map and replace it with a new empty map with @@ -178,8 +183,12 @@ where /// `make_payload_fn`: invoked for each value that is not already present /// to create the payload, in order of the values in `values` /// - /// `observe_payload_fn`: invoked once, for each value in `values`, that was - /// already present in the map, with corresponding payload value. + /// `update_payload_fn`: invoked when the value is already present in the map. + /// It receives a mutable reference to the existing payload, allowing it to modify it. + /// + /// `observe_payload_fn`: invoked for each value in `values` that was + /// already present in the map. If `update_payload_fn` is defined, it will be called + /// **after** the payload is updated. It is called with the corresponding payload value. /// /// # Returns /// @@ -188,32 +197,36 @@ where /// /// # Safety: /// - /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked + /// Note that `make_payload_fn`, `update_payload_fn` and `observe_payload_fn` are only invoked /// with valid values from `values`, not for the `NULL` value. - pub fn insert_if_new( + pub fn insert_if_new( &mut self, values: &ArrayRef, make_payload_fn: MP, + update_payload_fn: UP, observe_payload_fn: OP, ) where MP: FnMut(Option<&[u8]>) -> V, + UP: FnMut(&mut V), OP: FnMut(V), { // Sanity check array type match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView)); - self.insert_if_new_inner::( + self.insert_if_new_inner::( values, make_payload_fn, + update_payload_fn, observe_payload_fn, ) } OutputType::Utf8View => { assert!(matches!(values.data_type(), DataType::Utf8View)); - self.insert_if_new_inner::( + self.insert_if_new_inner::( values, make_payload_fn, + update_payload_fn, observe_payload_fn, ) } @@ -229,13 +242,15 @@ where /// simpler and understand and reducing code bloat due to duplication. /// /// See comments on `insert_if_new` for more details - fn insert_if_new_inner( + fn insert_if_new_inner( &mut self, values: &ArrayRef, mut make_payload_fn: MP, + mut update_payload_fn: UP, mut observe_payload_fn: OP, ) where MP: FnMut(Option<&[u8]>) -> V, + UP: FnMut(&mut V), OP: FnMut(V), B: ByteViewType, { @@ -257,8 +272,9 @@ where for (value, &hash) in values.iter().zip(batch_hashes.iter()) { // handle null value let Some(value) = value else { - let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { - payload + let payload = if let Some((ref mut payload, _)) = self.null { + update_payload_fn(payload); + *payload } else { let payload = make_payload_fn(None); let null_index = self.builder.len(); @@ -284,6 +300,7 @@ where }); let payload = if let Some(entry) = entry { + update_payload_fn(&mut entry.payload); entry.payload } else { // no existing value, make a new one. @@ -306,149 +323,37 @@ where } } - /// Inserts each value from `values` into the map, invoking `make_payload_fn` for - /// each value if not already present, or `update_payload_fn` if the value already exists. + /// Retrieves the payloads for each value from `values`, either by using + /// small value optimizations or larger value handling. /// - /// This function handles both the insert and update cases. + /// This function will compute hashes for each value and attempt to retrieve + /// the corresponding payload from the map. If the value is not found, it will return `None`. /// /// # Arguments: /// - /// `values`: The array whose values are inserted or updated in the map. + /// `values`: The array whose payloads need to be retrieved. /// - /// `make_payload_fn`: Invoked for each value that is not already present - /// to create the payload, in the order of the values in `values`. + /// # Returns /// - /// `update_payload_fn`: Invoked for each value that is already present, - /// allowing the payload to be updated in-place. - pub fn insert_or_update( - &mut self, - values: &ArrayRef, - make_payload_fn: MP, - update_payload_fn: UP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - UP: FnMut(&mut V), - { - // Check the output type and dispatch to the appropriate internal function + /// A vector of payloads for each value, or `None` if the value is not found. + pub fn get_payloads(self, values: &ArrayRef) -> Vec> { match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView)); - self.insert_or_update_inner::( - values, - make_payload_fn, - update_payload_fn, - ) + self.get_payloads_inner::(values) } OutputType::Utf8View => { assert!(matches!(values.data_type(), DataType::Utf8View)); - self.insert_or_update_inner::( - values, - make_payload_fn, - update_payload_fn, - ) + self.get_payloads_inner::(values) } _ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"), - }; - } - - /// Generic version of [`Self::insert_or_update`] that handles `ByteViewType` - /// (both StringView and BinaryView). - /// - /// This is the only function that is generic on [`ByteViewType`], which avoids having - /// to template the entire structure, simplifying the code and reducing code bloat due - /// to duplication. - /// - /// See comments on `insert_or_update` for more details. - fn insert_or_update_inner( - &mut self, - values: &ArrayRef, - mut make_payload_fn: MP, - mut update_payload_fn: UP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - UP: FnMut(&mut V), - B: ByteViewType, - { - // step 1: compute hashes - let batch_hashes = &mut self.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(values.len(), 0); - create_hashes(&[values.clone()], &self.random_state, batch_hashes) - // hash is supported for all types and create_hashes only - // returns errors for unsupported types - .unwrap(); - - // step 2: insert each value into the set, if not already present - let values = values.as_byte_view::(); - - // Ensure lengths are equivalent - assert_eq!(values.len(), batch_hashes.len()); - - for (value, &hash) in values.iter().zip(batch_hashes.iter()) { - // Handle null value - let Some(value) = value else { - if let Some((ref mut payload, _)) = self.null { - update_payload_fn(payload); - } else { - let payload = make_payload_fn(None); - let null_index = self.builder.len(); - self.builder.append_null(); - self.null = Some((payload, null_index)); - } - continue; - }; - - let value: &[u8] = value.as_ref(); - - let entry = self.map.get_mut(hash, |header| { - let v = self.builder.get_value(header.view_idx); - - if v.len() != value.len() { - return false; - } - - v == value - }); - - if let Some(entry) = entry { - update_payload_fn(&mut entry.payload); - } else { - // no existing value, make a new one. - let payload = make_payload_fn(Some(value)); - - let inner_view_idx = self.builder.len(); - let new_header = Entry { - view_idx: inner_view_idx, - hash, - payload, - }; - - self.builder.append_value(value); - - self.map - .insert_accounted(new_header, |h| h.hash, &mut self.map_size); - }; } } /// Generic version of [`Self::get_payloads`] that handles `ByteViewType` /// (both StringView and BinaryView). /// - /// This function computes the hashes for each value and retrieves the payloads - /// stored in the map, leveraging small value optimizations when possible. - /// - /// # Arguments: - /// - /// `values`: The array whose payloads are being retrieved. - /// - /// # Returns - /// - /// A vector of payloads for each value, or `None` if the value is not found. - /// - /// # Safety: - /// - /// This function ensures that small values are handled using inline optimization - /// and larger values are safely retrieved from the builder. + /// See [`get_payloads`] for more details. fn get_payloads_inner(self, values: &ArrayRef) -> Vec> where B: ByteViewType, @@ -488,33 +393,6 @@ where payloads } - /// Retrieves the payloads for each value from `values`, either by using - /// small value optimizations or larger value handling. - /// - /// This function will compute hashes for each value and attempt to retrieve - /// the corresponding payload from the map. If the value is not found, it will return `None`. - /// - /// # Arguments: - /// - /// `values`: The array whose payloads need to be retrieved. - /// - /// # Returns - /// - /// A vector of payloads for each value, or `None` if the value is not found. - pub fn get_payloads(self, values: &ArrayRef) -> Vec> { - match self.output_type { - OutputType::BinaryView => { - assert!(matches!(values.data_type(), DataType::BinaryView)); - self.get_payloads_inner::(values) - } - OutputType::Utf8View => { - assert!(matches!(values.data_type(), DataType::Utf8View)); - self.get_payloads_inner::(values) - } - _ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"), - } - } - /// Converts this set into a `StringViewArray`, or `BinaryViewArray`, /// containing each distinct value /// that was inserted. This is done without copying the values. @@ -791,7 +669,7 @@ mod tests { } #[test] - fn test_insert_or_update_count_u8() { + fn test_update_payload_count_u8() { let values = GenericByteViewArray::from(vec![ Some("a"), Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), @@ -805,12 +683,13 @@ mod tests { let mut map: ArrowBytesViewMap = ArrowBytesViewMap::new(OutputType::Utf8View); let arr: ArrayRef = Arc::new(values); - map.insert_or_update( + map.insert_if_new( &arr, |_| 1u8, |count| { *count += 1; }, + |_| {}, ); let expected_counts = [ @@ -827,7 +706,7 @@ mod tests { let mut result_payload: Option = None; - map.insert_or_update( + map.insert_if_new( &arr, |_| { panic!("Unexpected new entry during verification"); @@ -835,6 +714,7 @@ mod tests { |count| { result_payload = Some(*count); }, + |_| {}, ); assert_eq!(result_payload.unwrap(), value.1); @@ -842,7 +722,7 @@ mod tests { } #[test] - fn test_insert_if_new_after_insert_or_update() { + fn test_observe_after_update() { let initial_values = GenericByteViewArray::from(vec![ Some("A"), Some("B"), @@ -854,19 +734,20 @@ mod tests { let mut map: ArrowBytesViewMap = ArrowBytesViewMap::new(OutputType::Utf8View); let arr: ArrayRef = Arc::new(initial_values); - map.insert_or_update( + map.insert_if_new( &arr, |_| 1u8, |count| { *count += 1; }, + |_| {}, ); let additional_values = GenericByteViewArray::from(vec![Some("A"), Some("D"), Some("E")]); let arr_additional: ArrayRef = Arc::new(additional_values); - map.insert_if_new(&arr_additional, |_| 5u8, |_| {}); + map.insert_if_new(&arr_additional, |_| 5u8, |_| {}, |_| {}); let expected_payloads = [Some(1u8), Some(2u8), Some(2u8), Some(5u8), Some(5u8)]; @@ -903,12 +784,13 @@ mod tests { let mut map: ArrowBytesViewMap = ArrowBytesViewMap::new(OutputType::Utf8View); let arr: ArrayRef = Arc::new(values); - map.insert_or_update( + map.insert_if_new( &arr, |_| 1u8, |count| { *count += 1; }, + |_| {}, ); let expected_payloads = [ @@ -994,6 +876,7 @@ mod tests { seen_new_strings.push(value); TestPayload { index } }, + |_| {}, |payload| { seen_indexes.push(payload.index); }, diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/bytes.rs index f789af8b8a024..557c78fc36bac 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/bytes.rs @@ -61,6 +61,7 @@ impl GroupValues for GroupValuesByes { self.num_groups += 1; group_idx }, + |_| {}, // called for each group |group_idx| { groups.push(group_idx); diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs index 1a0cb90a16d47..e0068889faaca 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs @@ -62,6 +62,7 @@ impl GroupValues for GroupValuesBytesView { self.num_groups += 1; group_idx }, + |_| {}, // called for each group |group_idx| { groups.push(group_idx);