Skip to content

Commit 536082c

Browse files
committed
arrow-ord: add support for partitioning nested types
This support is currently incorrectly assumed by `BoundedWindowAggExec`, so partitioning on a nested type (e.g. struct) causes a nested comparison failure on execution. This commit adds a check to use distinct on non-nested types and falls back to using make_comparator on nested types.
1 parent d4b9482 commit 536082c

File tree

1 file changed

+38
-4
lines changed

1 file changed

+38
-4
lines changed

arrow-ord/src/partition.rs

+38-4
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use std::ops::Range;
2121

2222
use arrow_array::{Array, ArrayRef};
2323
use arrow_buffer::BooleanBuffer;
24-
use arrow_schema::ArrowError;
24+
use arrow_schema::{ArrowError, SortOptions};
2525

2626
use crate::cmp::distinct;
27+
use crate::ord::make_comparator;
2728

2829
/// A computed set of partitions, see [`partition`]
2930
#[derive(Debug, Clone)]
@@ -156,18 +157,24 @@ fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
156157
let slice_len = v.len() - 1;
157158
let v1 = v.slice(0, slice_len);
158159
let v2 = v.slice(1, slice_len);
159-
Ok(distinct(&v1, &v2)?.values().clone())
160+
161+
if !v.data_type().is_nested() {
162+
return Ok(distinct(&v1, &v2)?.values().clone());
163+
}
164+
// Given that we're only comparing values, null ordering in the input or
165+
// sort options do not matter.
166+
let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
167+
Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
160168
}
161169

162170
#[cfg(test)]
163171
mod tests {
164172
use std::sync::Arc;
165173

174+
use super::*;
166175
use arrow_array::*;
167176
use arrow_schema::DataType;
168177

169-
use super::*;
170-
171178
#[test]
172179
fn test_partition_empty() {
173180
let err = partition(&[]).unwrap_err();
@@ -298,4 +305,31 @@ mod tests {
298305
vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
299306
);
300307
}
308+
309+
#[test]
310+
fn test_partition_nested() {
311+
let input = vec![
312+
Arc::new(
313+
StructArray::try_from(vec![(
314+
"f1",
315+
Arc::new(Int64Array::from(vec![
316+
None,
317+
None,
318+
Some(1),
319+
Some(2),
320+
Some(2),
321+
Some(2),
322+
Some(3),
323+
Some(4),
324+
])) as _,
325+
)])
326+
.unwrap(),
327+
) as _,
328+
Arc::new(Int64Array::from(vec![1, 1, 1, 2, 3, 3, 3, 4])) as _,
329+
];
330+
assert_eq!(
331+
partition(&input).unwrap().ranges(),
332+
vec![0..2, 2..3, 3..4, 4..6, 6..7, 7..8]
333+
)
334+
}
301335
}

0 commit comments

Comments
 (0)