Skip to content

Commit 626c6bc

Browse files
authored
support merge batch for distinct array aggregate (#10526)
Signed-off-by: jayzhan211 <[email protected]>
1 parent eddec8e commit 626c6bc

File tree

2 files changed

+72
-6
lines changed

2 files changed

+72
-6
lines changed

datafusion/physical-expr/src/aggregate/array_agg_distinct.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,11 @@ impl Accumulator for DistinctArrayAggAccumulator {
153153
return Ok(());
154154
}
155155

156-
let array = &states[0];
157-
158-
assert_eq!(array.len(), 1, "state array should only include 1 row!");
159-
// Unwrap outer ListArray then do update batch
160-
let inner_array = array.as_list::<i32>().value(0);
161-
self.update_batch(&[inner_array])
156+
states[0]
157+
.as_list::<i32>()
158+
.iter()
159+
.flatten()
160+
.try_for_each(|val| self.update_batch(&[val]))
162161
}
163162

164163
fn evaluate(&mut self) -> Result<ScalarValue> {

datafusion/sqllogictest/test_files/aggregate.slt

+67
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,73 @@ statement error This feature is not implemented: LIMIT not supported in ARRAY_AG
198198
SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100
199199

200200

201+
# Test distinct aggregate function with merge batch
202+
query II
203+
with A as (
204+
select 1 as id, 2 as foo
205+
UNION ALL
206+
select 1, null
207+
UNION ALL
208+
select 1, null
209+
UNION ALL
210+
select 1, 3
211+
UNION ALL
212+
select 1, 2
213+
---- The order is non-deterministic, verify with length
214+
) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a group by a.id;
215+
----
216+
3 1
217+
218+
# It has only AggregateExec with FinalPartitioned mode, so `merge_batch` is used
219+
# If the plan is changed, whether the `merge_batch` is used should be verified to ensure the test coverage
220+
query TT
221+
explain with A as (
222+
select 1 as id, 2 as foo
223+
UNION ALL
224+
select 1, null
225+
UNION ALL
226+
select 1, null
227+
UNION ALL
228+
select 1, 3
229+
UNION ALL
230+
select 1, 2
231+
) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a group by a.id;
232+
----
233+
logical_plan
234+
01)Projection: array_length(ARRAY_AGG(DISTINCT a.foo)), SUM(DISTINCT Int64(1))
235+
02)--Aggregate: groupBy=[[a.id]], aggr=[[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]]
236+
03)----SubqueryAlias: a
237+
04)------SubqueryAlias: a
238+
05)--------Union
239+
06)----------Projection: Int64(1) AS id, Int64(2) AS foo
240+
07)------------EmptyRelation
241+
08)----------Projection: Int64(1) AS id, Int64(NULL) AS foo
242+
09)------------EmptyRelation
243+
10)----------Projection: Int64(1) AS id, Int64(NULL) AS foo
244+
11)------------EmptyRelation
245+
12)----------Projection: Int64(1) AS id, Int64(3) AS foo
246+
13)------------EmptyRelation
247+
14)----------Projection: Int64(1) AS id, Int64(2) AS foo
248+
15)------------EmptyRelation
249+
physical_plan
250+
01)ProjectionExec: expr=[array_length(ARRAY_AGG(DISTINCT a.foo)@1) as array_length(ARRAY_AGG(DISTINCT a.foo)), SUM(DISTINCT Int64(1))@2 as SUM(DISTINCT Int64(1))]
251+
02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]
252+
03)----CoalesceBatchesExec: target_batch_size=8192
253+
04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5
254+
05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]
255+
06)----------UnionExec
256+
07)------------ProjectionExec: expr=[1 as id, 2 as foo]
257+
08)--------------PlaceholderRowExec
258+
09)------------ProjectionExec: expr=[1 as id, NULL as foo]
259+
10)--------------PlaceholderRowExec
260+
11)------------ProjectionExec: expr=[1 as id, NULL as foo]
261+
12)--------------PlaceholderRowExec
262+
13)------------ProjectionExec: expr=[1 as id, 3 as foo]
263+
14)--------------PlaceholderRowExec
264+
15)------------ProjectionExec: expr=[1 as id, 2 as foo]
265+
16)--------------PlaceholderRowExec
266+
267+
201268
# FIX: custom absolute values
202269
# csv_query_avg_multi_batch
203270

0 commit comments

Comments
 (0)