Skip to content

Commit 8233e64

Browse files
authored
Implement specialized group values for single Uft8/LargeUtf8/Binary/LargeBinary column (#8827)
* Implement GroupValuesBinary special case for for handling single column string grouping compiles. add test for null * Avoid overflow checking * avoid offsest validation * Update datafusion/physical-plan/src/aggregates/group_values/bytes.rs
1 parent 336dc66 commit 8233e64

File tree

9 files changed

+1307
-500
lines changed

9 files changed

+1307
-500
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`BytesDistinctCountAccumulator`] for Utf8/LargeUtf8/Binary/LargeBinary values
19+
20+
use crate::binary_map::{ArrowBytesSet, OutputType};
21+
use arrow_array::{ArrayRef, OffsetSizeTrait};
22+
use datafusion_common::cast::as_list_array;
23+
use datafusion_common::utils::array_into_list_array;
24+
use datafusion_common::ScalarValue;
25+
use datafusion_expr::Accumulator;
26+
use std::fmt::Debug;
27+
use std::sync::Arc;
28+
29+
/// Specialized implementation of
30+
/// `COUNT DISTINCT` for [`StringArray`] [`LargeStringArray`],
31+
/// [`BinaryArray`] and [`LargeBinaryArray`].
32+
///
33+
/// [`StringArray`]: arrow::array::StringArray
34+
/// [`LargeStringArray`]: arrow::array::LargeStringArray
35+
/// [`BinaryArray`]: arrow::array::BinaryArray
36+
/// [`LargeBinaryArray`]: arrow::array::LargeBinaryArray
37+
#[derive(Debug)]
38+
pub(super) struct BytesDistinctCountAccumulator<O: OffsetSizeTrait>(ArrowBytesSet<O>);
39+
40+
impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
41+
pub(super) fn new(output_type: OutputType) -> Self {
42+
Self(ArrowBytesSet::new(output_type))
43+
}
44+
}
45+
46+
impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
47+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
48+
let set = self.0.take();
49+
let arr = set.into_state();
50+
let list = Arc::new(array_into_list_array(arr));
51+
Ok(vec![ScalarValue::List(list)])
52+
}
53+
54+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
55+
if values.is_empty() {
56+
return Ok(());
57+
}
58+
59+
self.0.insert(&values[0]);
60+
61+
Ok(())
62+
}
63+
64+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
65+
if states.is_empty() {
66+
return Ok(());
67+
}
68+
assert_eq!(
69+
states.len(),
70+
1,
71+
"count_distinct states must be single array"
72+
);
73+
74+
let arr = as_list_array(&states[0])?;
75+
arr.iter().try_for_each(|maybe_list| {
76+
if let Some(list) = maybe_list {
77+
self.0.insert(&list);
78+
};
79+
Ok(())
80+
})
81+
}
82+
83+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
84+
Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
85+
}
86+
87+
fn size(&self) -> usize {
88+
std::mem::size_of_val(self) + self.0.size()
89+
}
90+
}

datafusion/physical-expr/src/aggregate/count_distinct/mod.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
mod bytes;
1819
mod native;
19-
mod strings;
2020

2121
use std::any::Any;
2222
use std::collections::HashSet;
@@ -37,11 +37,12 @@ use arrow_array::types::{
3737
use datafusion_common::{Result, ScalarValue};
3838
use datafusion_expr::Accumulator;
3939

40+
use crate::aggregate::count_distinct::bytes::BytesDistinctCountAccumulator;
4041
use crate::aggregate::count_distinct::native::{
4142
FloatDistinctCountAccumulator, PrimitiveDistinctCountAccumulator,
4243
};
43-
use crate::aggregate::count_distinct::strings::StringDistinctCountAccumulator;
4444
use crate::aggregate::utils::down_cast_any_ref;
45+
use crate::binary_map::OutputType;
4546
use crate::expressions::format_state_name;
4647
use crate::{AggregateExpr, PhysicalExpr};
4748

@@ -144,8 +145,16 @@ impl AggregateExpr for DistinctCount {
144145
Float32 => Box::new(FloatDistinctCountAccumulator::<Float32Type>::new()),
145146
Float64 => Box::new(FloatDistinctCountAccumulator::<Float64Type>::new()),
146147

147-
Utf8 => Box::new(StringDistinctCountAccumulator::<i32>::new()),
148-
LargeUtf8 => Box::new(StringDistinctCountAccumulator::<i64>::new()),
148+
Utf8 => Box::new(BytesDistinctCountAccumulator::<i32>::new(OutputType::Utf8)),
149+
LargeUtf8 => {
150+
Box::new(BytesDistinctCountAccumulator::<i64>::new(OutputType::Utf8))
151+
}
152+
Binary => Box::new(BytesDistinctCountAccumulator::<i32>::new(
153+
OutputType::Binary,
154+
)),
155+
LargeBinary => Box::new(BytesDistinctCountAccumulator::<i64>::new(
156+
OutputType::Binary,
157+
)),
149158

150159
_ => Box::new(DistinctCountAccumulator {
151160
values: HashSet::default(),
@@ -175,7 +184,7 @@ impl PartialEq<dyn Any> for DistinctCount {
175184
/// General purpose distinct accumulator that works for any DataType by using
176185
/// [`ScalarValue`]. Some types have specialized accumulators that are (much)
177186
/// more efficient such as [`PrimitiveDistinctCountAccumulator`] and
178-
/// [`StringDistinctCountAccumulator`]
187+
/// [`BytesDistinctCountAccumulator`]
179188
#[derive(Debug)]
180189
struct DistinctCountAccumulator {
181190
values: HashSet<ScalarValue, RandomState>,

0 commit comments

Comments
 (0)