Skip to content

Commit 6aad19f

Browse files
author
mertak-synnada
authored
add valid distinct case for aggregate.slt (#11814)
1 parent 682bc2e commit 6aad19f

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

datafusion/optimizer/src/replace_distinct_aggregate.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,11 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
8282
for dep in input.schema().functional_dependencies().iter() {
8383
// If distinct is exactly the same with a previous GROUP BY, we can
8484
// simply remove it:
85-
if dep.source_indices[..field_count]
86-
.iter()
87-
.enumerate()
88-
.all(|(idx, f_idx)| idx == *f_idx)
85+
if dep.source_indices.len() >= field_count
86+
&& dep.source_indices[..field_count]
87+
.iter()
88+
.enumerate()
89+
.all(|(idx, f_idx)| idx == *f_idx)
8990
{
9091
return Ok(Transformed::yes(input.as_ref().clone()));
9192
}

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4521,6 +4521,34 @@ false
45214521
true
45224522
NULL
45234523

4524+
#
4525+
# Add valid distinct case as aggregation plan test
4526+
#
4527+
4528+
query TT
4529+
EXPLAIN SELECT DISTINCT c3, min(c1) FROM aggregate_test_100 group by c3 limit 5;
4530+
----
4531+
logical_plan
4532+
01)Limit: skip=0, fetch=5
4533+
02)--Aggregate: groupBy=[[aggregate_test_100.c3, MIN(aggregate_test_100.c1)]], aggr=[[]]
4534+
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[MIN(aggregate_test_100.c1)]]
4535+
04)------TableScan: aggregate_test_100 projection=[c1, c3]
4536+
physical_plan
4537+
01)GlobalLimitExec: skip=0, fetch=5
4538+
02)--CoalescePartitionsExec
4539+
03)----LocalLimitExec: fetch=5
4540+
04)------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, MIN(aggregate_test_100.c1)@1 as MIN(aggregate_test_100.c1)], aggr=[], lim=[5]
4541+
05)--------CoalesceBatchesExec: target_batch_size=8192
4542+
06)----------RepartitionExec: partitioning=Hash([c3@0, MIN(aggregate_test_100.c1)@1], 4), input_partitions=4
4543+
07)------------AggregateExec: mode=Partial, gby=[c3@0 as c3, MIN(aggregate_test_100.c1)@1 as MIN(aggregate_test_100.c1)], aggr=[], lim=[5]
4544+
08)--------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[MIN(aggregate_test_100.c1)]
4545+
09)----------------CoalesceBatchesExec: target_batch_size=8192
4546+
10)------------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
4547+
11)--------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[MIN(aggregate_test_100.c1)]
4548+
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
4549+
13)------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
4550+
4551+
45244552
#
45254553
# Push limit into distinct group-by aggregation tests
45264554
#

0 commit comments

Comments
 (0)