Skip to content

Commit 8bc00f8

Browse files
committed
Implement special min/max accumulator for Strings: MinMaxBytesAccumulator
1 parent 6d61503 commit 8bc00f8

File tree

4 files changed

+855
-69
lines changed

4 files changed

+855
-69
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl NullState {
9595
///
9696
/// When value_fn is called it also sets
9797
///
98-
/// 1. `self.seen_values[group_index]` to true for all rows that had a non null vale
98+
/// 1. `self.seen_values[group_index]` to true for all rows that had a non null value
9999
pub fn accumulate<T, F>(
100100
&mut self,
101101
group_indices: &[usize],

datafusion/functions-aggregate/src/min_max.rs

+124-68
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,7 @@
1717
//! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function
1818
//! [`Min`] and [`MinAccumulator`] accumulator for the `min` function
1919
20-
// distributed with this work for additional information
21-
// regarding copyright ownership. The ASF licenses this file
22-
// to you under the Apache License, Version 2.0 (the
23-
// "License"); you may not use this file except in compliance
24-
// with the License. You may obtain a copy of the License at
25-
//
26-
// http://www.apache.org/licenses/LICENSE-2.0
27-
//
28-
// Unless required by applicable law or agreed to in writing,
29-
// software distributed under the License is distributed on an
30-
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
31-
// KIND, either express or implied. See the License for the
32-
// specific language governing permissions and limitations
33-
// under the License.
20+
mod min_max_bytes;
3421

3522
use arrow::array::{
3623
ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
@@ -64,6 +51,7 @@ use arrow::datatypes::{
6451
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
6552
};
6653

54+
use crate::min_max::min_max_bytes::MinMaxBytesAccumulator;
6755
use datafusion_common::ScalarValue;
6856
use datafusion_expr::{
6957
function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Signature, Volatility,
@@ -116,7 +104,7 @@ impl Default for Max {
116104
/// the specified [`ArrowPrimitiveType`].
117105
///
118106
/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
119-
macro_rules! instantiate_max_accumulator {
107+
macro_rules! instantiate_primitive_max_accumulator {
120108
($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
121109
Ok(Box::new(
122110
PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| {
@@ -135,7 +123,7 @@ macro_rules! instantiate_max_accumulator {
135123
///
136124
///
137125
/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
138-
macro_rules! instantiate_min_accumulator {
126+
macro_rules! instantiate_primitive_min_accumulator {
139127
($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
140128
Ok(Box::new(
141129
PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| {
@@ -243,6 +231,12 @@ impl AggregateUDFImpl for Max {
243231
| Time32(_)
244232
| Time64(_)
245233
| Timestamp(_, _)
234+
| Utf8
235+
| LargeUtf8
236+
| Utf8View
237+
| Binary
238+
| LargeBinary
239+
| BinaryView
246240
)
247241
}
248242

@@ -254,58 +248,86 @@ impl AggregateUDFImpl for Max {
254248
use TimeUnit::*;
255249
let data_type = args.return_type;
256250
match data_type {
257-
Int8 => instantiate_max_accumulator!(data_type, i8, Int8Type),
258-
Int16 => instantiate_max_accumulator!(data_type, i16, Int16Type),
259-
Int32 => instantiate_max_accumulator!(data_type, i32, Int32Type),
260-
Int64 => instantiate_max_accumulator!(data_type, i64, Int64Type),
261-
UInt8 => instantiate_max_accumulator!(data_type, u8, UInt8Type),
262-
UInt16 => instantiate_max_accumulator!(data_type, u16, UInt16Type),
263-
UInt32 => instantiate_max_accumulator!(data_type, u32, UInt32Type),
264-
UInt64 => instantiate_max_accumulator!(data_type, u64, UInt64Type),
251+
Int8 => instantiate_primitive_max_accumulator!(data_type, i8, Int8Type),
252+
Int16 => instantiate_primitive_max_accumulator!(data_type, i16, Int16Type),
253+
Int32 => instantiate_primitive_max_accumulator!(data_type, i32, Int32Type),
254+
Int64 => instantiate_primitive_max_accumulator!(data_type, i64, Int64Type),
255+
UInt8 => instantiate_primitive_max_accumulator!(data_type, u8, UInt8Type),
256+
UInt16 => instantiate_primitive_max_accumulator!(data_type, u16, UInt16Type),
257+
UInt32 => instantiate_primitive_max_accumulator!(data_type, u32, UInt32Type),
258+
UInt64 => instantiate_primitive_max_accumulator!(data_type, u64, UInt64Type),
265259
Float16 => {
266-
instantiate_max_accumulator!(data_type, f16, Float16Type)
260+
instantiate_primitive_max_accumulator!(data_type, f16, Float16Type)
267261
}
268262
Float32 => {
269-
instantiate_max_accumulator!(data_type, f32, Float32Type)
263+
instantiate_primitive_max_accumulator!(data_type, f32, Float32Type)
270264
}
271265
Float64 => {
272-
instantiate_max_accumulator!(data_type, f64, Float64Type)
266+
instantiate_primitive_max_accumulator!(data_type, f64, Float64Type)
273267
}
274-
Date32 => instantiate_max_accumulator!(data_type, i32, Date32Type),
275-
Date64 => instantiate_max_accumulator!(data_type, i64, Date64Type),
268+
Date32 => instantiate_primitive_max_accumulator!(data_type, i32, Date32Type),
269+
Date64 => instantiate_primitive_max_accumulator!(data_type, i64, Date64Type),
276270
Time32(Second) => {
277-
instantiate_max_accumulator!(data_type, i32, Time32SecondType)
271+
instantiate_primitive_max_accumulator!(data_type, i32, Time32SecondType)
278272
}
279273
Time32(Millisecond) => {
280-
instantiate_max_accumulator!(data_type, i32, Time32MillisecondType)
274+
instantiate_primitive_max_accumulator!(
275+
data_type,
276+
i32,
277+
Time32MillisecondType
278+
)
281279
}
282280
Time64(Microsecond) => {
283-
instantiate_max_accumulator!(data_type, i64, Time64MicrosecondType)
281+
instantiate_primitive_max_accumulator!(
282+
data_type,
283+
i64,
284+
Time64MicrosecondType
285+
)
284286
}
285287
Time64(Nanosecond) => {
286-
instantiate_max_accumulator!(data_type, i64, Time64NanosecondType)
288+
instantiate_primitive_max_accumulator!(
289+
data_type,
290+
i64,
291+
Time64NanosecondType
292+
)
287293
}
288294
Timestamp(Second, _) => {
289-
instantiate_max_accumulator!(data_type, i64, TimestampSecondType)
295+
instantiate_primitive_max_accumulator!(
296+
data_type,
297+
i64,
298+
TimestampSecondType
299+
)
290300
}
291301
Timestamp(Millisecond, _) => {
292-
instantiate_max_accumulator!(data_type, i64, TimestampMillisecondType)
302+
instantiate_primitive_max_accumulator!(
303+
data_type,
304+
i64,
305+
TimestampMillisecondType
306+
)
293307
}
294308
Timestamp(Microsecond, _) => {
295-
instantiate_max_accumulator!(data_type, i64, TimestampMicrosecondType)
309+
instantiate_primitive_max_accumulator!(
310+
data_type,
311+
i64,
312+
TimestampMicrosecondType
313+
)
296314
}
297315
Timestamp(Nanosecond, _) => {
298-
instantiate_max_accumulator!(data_type, i64, TimestampNanosecondType)
316+
instantiate_primitive_max_accumulator!(
317+
data_type,
318+
i64,
319+
TimestampNanosecondType
320+
)
299321
}
300322
Decimal128(_, _) => {
301-
instantiate_max_accumulator!(data_type, i128, Decimal128Type)
323+
instantiate_primitive_max_accumulator!(data_type, i128, Decimal128Type)
302324
}
303325
Decimal256(_, _) => {
304-
instantiate_max_accumulator!(data_type, i256, Decimal256Type)
326+
instantiate_primitive_max_accumulator!(data_type, i256, Decimal256Type)
327+
}
328+
Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
329+
Ok(Box::new(MinMaxBytesAccumulator::new_max(data_type.clone())))
305330
}
306-
307-
// It would be nice to have a fast implementation for Strings as well
308-
// https://github.com/apache/datafusion/issues/6906
309331

310332
// This is only reached if groups_accumulator_supported is out of sync
311333
_ => internal_err!("GroupsAccumulator not supported for max({})", data_type),
@@ -1040,6 +1062,12 @@ impl AggregateUDFImpl for Min {
10401062
| Time32(_)
10411063
| Time64(_)
10421064
| Timestamp(_, _)
1065+
| Utf8
1066+
| LargeUtf8
1067+
| Utf8View
1068+
| Binary
1069+
| LargeBinary
1070+
| BinaryView
10431071
)
10441072
}
10451073

@@ -1051,58 +1079,86 @@ impl AggregateUDFImpl for Min {
10511079
use TimeUnit::*;
10521080
let data_type = args.return_type;
10531081
match data_type {
1054-
Int8 => instantiate_min_accumulator!(data_type, i8, Int8Type),
1055-
Int16 => instantiate_min_accumulator!(data_type, i16, Int16Type),
1056-
Int32 => instantiate_min_accumulator!(data_type, i32, Int32Type),
1057-
Int64 => instantiate_min_accumulator!(data_type, i64, Int64Type),
1058-
UInt8 => instantiate_min_accumulator!(data_type, u8, UInt8Type),
1059-
UInt16 => instantiate_min_accumulator!(data_type, u16, UInt16Type),
1060-
UInt32 => instantiate_min_accumulator!(data_type, u32, UInt32Type),
1061-
UInt64 => instantiate_min_accumulator!(data_type, u64, UInt64Type),
1082+
Int8 => instantiate_primitive_min_accumulator!(data_type, i8, Int8Type),
1083+
Int16 => instantiate_primitive_min_accumulator!(data_type, i16, Int16Type),
1084+
Int32 => instantiate_primitive_min_accumulator!(data_type, i32, Int32Type),
1085+
Int64 => instantiate_primitive_min_accumulator!(data_type, i64, Int64Type),
1086+
UInt8 => instantiate_primitive_min_accumulator!(data_type, u8, UInt8Type),
1087+
UInt16 => instantiate_primitive_min_accumulator!(data_type, u16, UInt16Type),
1088+
UInt32 => instantiate_primitive_min_accumulator!(data_type, u32, UInt32Type),
1089+
UInt64 => instantiate_primitive_min_accumulator!(data_type, u64, UInt64Type),
10621090
Float16 => {
1063-
instantiate_min_accumulator!(data_type, f16, Float16Type)
1091+
instantiate_primitive_min_accumulator!(data_type, f16, Float16Type)
10641092
}
10651093
Float32 => {
1066-
instantiate_min_accumulator!(data_type, f32, Float32Type)
1094+
instantiate_primitive_min_accumulator!(data_type, f32, Float32Type)
10671095
}
10681096
Float64 => {
1069-
instantiate_min_accumulator!(data_type, f64, Float64Type)
1097+
instantiate_primitive_min_accumulator!(data_type, f64, Float64Type)
10701098
}
1071-
Date32 => instantiate_min_accumulator!(data_type, i32, Date32Type),
1072-
Date64 => instantiate_min_accumulator!(data_type, i64, Date64Type),
1099+
Date32 => instantiate_primitive_min_accumulator!(data_type, i32, Date32Type),
1100+
Date64 => instantiate_primitive_min_accumulator!(data_type, i64, Date64Type),
10731101
Time32(Second) => {
1074-
instantiate_min_accumulator!(data_type, i32, Time32SecondType)
1102+
instantiate_primitive_min_accumulator!(data_type, i32, Time32SecondType)
10751103
}
10761104
Time32(Millisecond) => {
1077-
instantiate_min_accumulator!(data_type, i32, Time32MillisecondType)
1105+
instantiate_primitive_min_accumulator!(
1106+
data_type,
1107+
i32,
1108+
Time32MillisecondType
1109+
)
10781110
}
10791111
Time64(Microsecond) => {
1080-
instantiate_min_accumulator!(data_type, i64, Time64MicrosecondType)
1112+
instantiate_primitive_min_accumulator!(
1113+
data_type,
1114+
i64,
1115+
Time64MicrosecondType
1116+
)
10811117
}
10821118
Time64(Nanosecond) => {
1083-
instantiate_min_accumulator!(data_type, i64, Time64NanosecondType)
1119+
instantiate_primitive_min_accumulator!(
1120+
data_type,
1121+
i64,
1122+
Time64NanosecondType
1123+
)
10841124
}
10851125
Timestamp(Second, _) => {
1086-
instantiate_min_accumulator!(data_type, i64, TimestampSecondType)
1126+
instantiate_primitive_min_accumulator!(
1127+
data_type,
1128+
i64,
1129+
TimestampSecondType
1130+
)
10871131
}
10881132
Timestamp(Millisecond, _) => {
1089-
instantiate_min_accumulator!(data_type, i64, TimestampMillisecondType)
1133+
instantiate_primitive_min_accumulator!(
1134+
data_type,
1135+
i64,
1136+
TimestampMillisecondType
1137+
)
10901138
}
10911139
Timestamp(Microsecond, _) => {
1092-
instantiate_min_accumulator!(data_type, i64, TimestampMicrosecondType)
1140+
instantiate_primitive_min_accumulator!(
1141+
data_type,
1142+
i64,
1143+
TimestampMicrosecondType
1144+
)
10931145
}
10941146
Timestamp(Nanosecond, _) => {
1095-
instantiate_min_accumulator!(data_type, i64, TimestampNanosecondType)
1147+
instantiate_primitive_min_accumulator!(
1148+
data_type,
1149+
i64,
1150+
TimestampNanosecondType
1151+
)
10961152
}
10971153
Decimal128(_, _) => {
1098-
instantiate_min_accumulator!(data_type, i128, Decimal128Type)
1154+
instantiate_primitive_min_accumulator!(data_type, i128, Decimal128Type)
10991155
}
11001156
Decimal256(_, _) => {
1101-
instantiate_min_accumulator!(data_type, i256, Decimal256Type)
1157+
instantiate_primitive_min_accumulator!(data_type, i256, Decimal256Type)
1158+
}
1159+
Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
1160+
Ok(Box::new(MinMaxBytesAccumulator::new_min(data_type.clone())))
11021161
}
1103-
1104-
// It would be nice to have a fast implementation for Strings as well
1105-
// https://github.com/apache/datafusion/issues/6906
11061162

11071163
// This is only reached if groups_accumulator_supported is out of sync
11081164
_ => internal_err!("GroupsAccumulator not supported for min({})", data_type),

0 commit comments

Comments
 (0)