Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skipped_aggregation_rows metric to aggregate operator #11706

Merged
merged 1 commit into from
Aug 7, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jul 29, 2024

Which issue does this PR close?

Rationale for this change

#11627 from @korowa adds a "partial aggregation skipping" mode to the hash aggregate exec that switches aggregate behavior dynamically at runtime,

It would be very nice to know if the path is being executed or not, and the way to do this in DataFusion is metrics.

What changes are included in this PR?

Add a "skipped_aggregation_rows" counter which records the number of rows

Are these changes tested?

I tested them manually

For example, this line shows skipped_aggregation_rows=98293561:

|                   |               AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(*)], metrics=[output_rows=99997497, elapsed_compute=190.246833ms, skipped_aggregation_rows=98293561]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |

Here is the entire output of testing with a query:

$ ./datafusion-cli-skip-partial-metrics -c "EXPLAIN ANALYZE SELECT \"WatchID\", \"ClientIP\", COUNT(*) AS c FROM 'hits.parquet' GROUP BY \"WatchID\", \"ClientIP\" ORDER BY c DESC LIMIT 10;"
DataFusion CLI v40.0.0
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=10, elapsed_compute=14.916µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |   SortPreservingMergeExec: [c@2 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=3.667µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                   |     SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=764.776988ms, row_replacements=164]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|                   |       ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, count(*)@2 as c], metrics=[output_rows=99997493, elapsed_compute=1.960225ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |         AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(*)], metrics=[output_rows=99997493, elapsed_compute=17.934170583s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |           CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=99997497, elapsed_compute=598.045136ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|                   |             RepartitionExec: partitioning=Hash([WatchID@0, ClientIP@1], 16), input_partitions=16, metrics=[repart_time=1.018381539s, send_time=8.695161456s, fetch_time=1.684569996s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|    here-->        |               AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(*)], metrics=[output_rows=99997497, elapsed_compute=190.246833ms, skipped_aggregation_rows=98293561]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |                 ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits.parquet:3694994112..4618742640], ...]}, projection=[WatchID, ClientIP], metrics=[output_rows=99997497, elapsed_compute=16ns, page_index_rows_filtered=0, row_groups_matched_statistics=0, row_groups_matched_bloom_filter=0, file_scan_errors=0, row_groups_pruned_bloom_filter=0, predicate_evaluation_errors=0, bytes_scanned=1050612937, pushdown_rows_filtered=0, file_open_errors=0, num_predicate_creation_errors=0, row_groups_pruned_statistics=0, pushdown_eval_time=32ns, time_elapsed_processing=978.395665ms, page_index_eval_time=32ns, time_elapsed_scanning_until_data=68.642586ms, time_elapsed_scanning_total=16.706024614s, time_elapsed_opening=380.053666ms] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 1.485 seconds.

Are there any user-facing changes?

Another metric in a plan if partial aggregation is skipped

@github-actions github-actions bot added documentation Improvements or additions to documentation logical-expr Logical plan and expressions sqllogictest SQL Logic Tests (.slt) labels Jul 29, 2024
@alamb alamb changed the title Add metrics for skipped rows Add skipped_aggregation_rowsto aggregate operator Jul 29, 2024
@alamb alamb force-pushed the alamb/aggregation_metrics branch from a9a34db to ae6f4d6 Compare August 5, 2024 13:04
@github-actions github-actions bot removed documentation Improvements or additions to documentation logical-expr Logical plan and expressions sqllogictest SQL Logic Tests (.slt) labels Aug 5, 2024
@alamb alamb marked this pull request as ready for review August 5, 2024 13:06
@alamb alamb force-pushed the alamb/aggregation_metrics branch from ae6f4d6 to 223b959 Compare August 5, 2024 13:07
@@ -611,6 +629,9 @@ impl Stream for GroupedHashAggregateStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let _timer = elapsed_compute.timer();
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
probe.record_skipped(&batch);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual call that records the metrics . The rest of the PR is comments and plumbing

options.skip_partial_aggregation_probe_rows_threshold;
let probe_ratio_threshold =
options.skip_partial_aggregation_probe_ratio_threshold;
let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that it should be better not to add this counter into all group bys (as another baseline metric) by default 👍

@alamb
Copy link
Contributor Author

alamb commented Aug 6, 2024

FYI @Dandandan would you have time to review this PR (to help performance profiling of the group by aggregation skipping code)?

@alamb alamb changed the title Add skipped_aggregation_rowsto aggregate operator Add skipped_aggregation_rows metric to aggregate operator Aug 6, 2024
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @alamb

@alamb
Copy link
Contributor Author

alamb commented Aug 7, 2024

Thanks for the review @andygrove and @korowa

@alamb alamb merged commit 679a85f into apache:main Aug 7, 2024
25 checks passed
@alamb alamb deleted the alamb/aggregation_metrics branch August 7, 2024 16:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Metrics for when partial aggregation mode is hit
3 participants