Skip to content

Fix panic in array_agg(distinct) query #10526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,11 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}

let array = &states[0];

assert_eq!(array.len(), 1, "state array should only include 1 row!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay that we just delete the assertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because it is no longer true, this PR is dealing with the array that has len more than one

// Unwrap outer ListArray then do update batch
let inner_array = array.as_list::<i32>().value(0);
self.update_batch(&[inner_array])
states[0]
.as_list::<i32>()
.iter()
.flatten()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the flatten do? Discard Option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

.try_for_each(|val| self.update_batch(&[val]))
}

fn evaluate(&mut self) -> Result<ScalarValue> {
Expand Down
67 changes: 67 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,73 @@ statement error This feature is not implemented: LIMIT not supported in ARRAY_AG
SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100


# Test distinct aggregate function with merge batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test without the changes in this PR and it panic'd as expected:

cargo test --test sqllogictests
...
thread 'tokio-runtime-worker' panicked at datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:158:9:
assertion `left == right` failed: state array should only include 1 row!
  left: 5
 right: 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

query II
with A as (
select 1 as id, 2 as foo
UNION ALL
select 1, null
UNION ALL
select 1, null
UNION ALL
select 1, 3
UNION ALL
select 1, 2
---- The order is non-deterministic, verify with length
) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a group by a.id;
----
3 1

# It has only AggregateExec with FinalPartitioned mode, so `merge_batch` is used
# If the plan is changed, whether the `merge_batch` is used should be verified to ensure the test coverage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for the explanatory comments about why this explain is here

query TT
explain with A as (
select 1 as id, 2 as foo
UNION ALL
select 1, null
UNION ALL
select 1, null
UNION ALL
select 1, 3
UNION ALL
select 1, 2
) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a group by a.id;
----
logical_plan
01)Projection: array_length(ARRAY_AGG(DISTINCT a.foo)), SUM(DISTINCT Int64(1))
02)--Aggregate: groupBy=[[a.id]], aggr=[[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]]
03)----SubqueryAlias: a
04)------SubqueryAlias: a
05)--------Union
06)----------Projection: Int64(1) AS id, Int64(2) AS foo
07)------------EmptyRelation
08)----------Projection: Int64(1) AS id, Int64(NULL) AS foo
09)------------EmptyRelation
10)----------Projection: Int64(1) AS id, Int64(NULL) AS foo
11)------------EmptyRelation
12)----------Projection: Int64(1) AS id, Int64(3) AS foo
13)------------EmptyRelation
14)----------Projection: Int64(1) AS id, Int64(2) AS foo
15)------------EmptyRelation
physical_plan
01)ProjectionExec: expr=[array_length(ARRAY_AGG(DISTINCT a.foo)@1) as array_length(ARRAY_AGG(DISTINCT a.foo)), SUM(DISTINCT Int64(1))@2 as SUM(DISTINCT Int64(1))]
02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5
05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]
06)----------UnionExec
07)------------ProjectionExec: expr=[1 as id, 2 as foo]
08)--------------PlaceholderRowExec
09)------------ProjectionExec: expr=[1 as id, NULL as foo]
10)--------------PlaceholderRowExec
11)------------ProjectionExec: expr=[1 as id, NULL as foo]
12)--------------PlaceholderRowExec
13)------------ProjectionExec: expr=[1 as id, 3 as foo]
14)--------------PlaceholderRowExec
15)------------ProjectionExec: expr=[1 as id, 2 as foo]
16)--------------PlaceholderRowExec


# FIX: custom absolute values
# csv_query_avg_multi_batch

Expand Down