Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sql-invoked function support for column statistics #23201

Merged

Conversation

ZacBlanco
Copy link
Contributor

@ZacBlanco ZacBlanco commented Jul 13, 2024

Description

Adds support in the ANALYZE infrastructure for adding projections or arguments to aggregation functions. This can support a wider range of functionality for connectors. This uses the infrastructure from the SqlInvokedScalarFunctions to parse expressions returned by a connector in ColumnStatisticMetadata.

Motivation and Context

The idea originally stemmed from discussion in #23114 (comment).

This is necessary to (1) support a CAST on the input argument for ANALYZE and (2) allows passing additional arguments to aggregation functions which may accept more than one argument. This could allow ANALYZE functions to be configurable through the session. One example use case of this is using a session variable to set the granularity at which a histogram stores information for statistics (#22365)

Impact

Only affects existing connectors which use custom function names for certain column statistic types. Currently the only connector which uses this is Iceberg.

Test Plan

  • Existing tests in iceberg connector exercise the new code path in the StatisticsAggregationPlanner

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== NO RELEASE NOTE ==

@ZacBlanco ZacBlanco force-pushed the upstream-connector-sqlfunction-support branch 2 times, most recently from 8664a51 to a1fe4c1 Compare July 15, 2024 15:55
@ZacBlanco ZacBlanco marked this pull request as ready for review July 15, 2024 17:08
@ZacBlanco ZacBlanco force-pushed the upstream-connector-sqlfunction-support branch from a1fe4c1 to 8951425 Compare July 15, 2024 18:27
@tdcmeehan tdcmeehan self-assigned this Jul 15, 2024
aaneja
aaneja previously approved these changes Jul 16, 2024
Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Can you post an text version of the generated ANALYZE plan (or query info JSON) to show an example run

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the enhancement, lgtm. Some little nits, and a small question for discussing.

@ZacBlanco ZacBlanco force-pushed the upstream-connector-sqlfunction-support branch 3 times, most recently from 2721a1a to 14cbc5e Compare July 16, 2024 19:20
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for this creative solution

presto-iceberg/pom.xml Outdated Show resolved Hide resolved
@ZacBlanco ZacBlanco force-pushed the upstream-connector-sqlfunction-support branch 3 times, most recently from fcd0458 to b9ac488 Compare July 16, 2024 19:41
hantangwangd
hantangwangd previously approved these changes Jul 17, 2024
Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, lgtm.

@ZacBlanco
Copy link
Contributor Author

@aaneja here are some sample plans. Both are on tpch.region. One of them uses the iceberg connector without Hive, the other with hive. The one without Hive is exercising the new codepath. The supported statistics are different, so there is quite a big difference between the number of fields in the plan. The main difference is in the references used to compute NDVs though as that's where the sketch_theta function is introduced.

Iceberg Without Hive

 - Output[PlanNodeId 4][rows] => [rows:bigint]
     - StatisticsWriter[PlanNodeId 2][TableHandle {connectorId='iceberg', connectorHandle='region$data@Optional[788143158172084398]', layout='Optional.empty'}] => [rows:bigint]
         - LocalExchange[PlanNodeId 227][SINGLE] () => [rowcount:bigint, number_of_distinct_values_regionkey:varbinary, number_of_distinct_values_name:varbinary, total_size_in_bytes_name:bigint, number_of_distinct_values_comment:varbinary, total_size_in_bytes_comment:bigint]
             - RemoteStreamingExchange[PlanNodeId 200][GATHER] => [rowcount:bigint, number_of_distinct_values_regionkey:varbinary, number_of_distinct_values_name:varbinary, total_size_in_bytes_name:bigint, number_of_distinct_values_comment:varbinary, total_size_in_bytes_comment:bigint]
                 - Aggregate(FINAL)[PlanNodeId 3] => [number_of_distinct_values_comment:varbinary, rowcount:bigint, total_size_in_bytes_comment:bigint, total_size_in_bytes_name:bigint, number_of_distinct_values_regionkey:varbinary, number_of_distinct_values_name:varbinary]
                         number_of_distinct_values_comment := "presto.default.sketch_theta"((sketch_theta_1)) (1:21)
                         rowcount := "presto.default.count"((count))
                         total_size_in_bytes_comment := "presto.default.sum_data_size_for_stats"((sum_data_size_for_stats_2)) (1:9)
                         total_size_in_bytes_name := "presto.default.sum_data_size_for_stats"((sum_data_size_for_stats)) (1:9)
                         number_of_distinct_values_regionkey := "presto.default.sketch_theta"((sketch_theta)) (1:21)
                         number_of_distinct_values_name := "presto.default.sketch_theta"((sketch_theta_0)) (1:21)
                     - LocalExchange[PlanNodeId 241][SINGLE] () => [sketch_theta:varbinary, sketch_theta_0:varbinary, sketch_theta_1:varbinary, sum_data_size_for_stats:bigint, count:bigint, sum_data_size_for_stats_2:bigint]
                         - RemoteStreamingExchange[PlanNodeId 247][GATHER] => [sketch_theta:varbinary, sketch_theta_0:varbinary, sketch_theta_1:varbinary, sum_data_size_for_stats:bigint, count:bigint, sum_data_size_for_stats_2:bigint]
                             - Aggregate(PARTIAL)[PlanNodeId 245] => [sketch_theta:varbinary, sketch_theta_0:varbinary, sketch_theta_1:varbinary, sum_data_size_for_stats:bigint, count:bigint, sum_data_size_for_stats_2:bigint]
                                     sketch_theta := "presto.default.sketch_theta"((regionkey)) (1:21)
                                     sketch_theta_0 := "presto.default.sketch_theta"((name)) (1:21)
                                     sketch_theta_1 := "presto.default.sketch_theta"((comment)) (1:21)
                                     sum_data_size_for_stats := "presto.default.sum_data_size_for_stats"((name)) (1:9)
                                     count := "presto.default.count"(*)
                                     sum_data_size_for_stats_2 := "presto.default.sum_data_size_for_stats"((comment)) (1:9)
                                 - TableScan[PlanNodeId 0][TableHandle {connectorId='iceberg', connectorHandle='region$data@Optional[788143158172084398]', layout='Optional[region$data@Optional[788143158172084398]]'}] => [regionkey:bigint, name:varchar, comment:varchar]
                                         Estimates: {source: CostBasedSourceInfo, rows: 5 (45B), cpu: 595.00, memory: 0.00, network: 0.00}
                                         regionkey := 1:regionkey:bigint (1:9)
                                         comment := 3:comment:varchar (1:9)
                                         name := 2:name:varchar (1:9)

Iceberg With Hive

- Output[PlanNodeId 4][rows] => [rows:bigint]                                                                                                                                                                                                                                                                                                                                                                                                                                                        >
     - StatisticsWriter[PlanNodeId 2][TableHandle {connectorId='iceberg', connectorHandle='region$data@Optional[5128365501455863835]', layout='Optional.empty'}] => [rows:bigint]                                                                                                                                                                                                                                                                                                                     >
         - LocalExchange[PlanNodeId 227][SINGLE] () => [rowcount:bigint, min_value_regionkey:bigint, max_value_regionkey:bigint, number_of_distinct_values_regionkey:bigint, number_of_non_null_values_regionkey:bigint, number_of_non_null_values_name:bigint, number_of_distinct_values_name:bigint, total_size_in_bytes_name:bigint, max_value_size_in_bytes_name:bigint, number_of_non_null_values_comment:bigint, number_of_distinct_values_comment:bigint, total_size_in_bytes_comment:bigint, m>
             - RemoteStreamingExchange[PlanNodeId 200][GATHER] => [rowcount:bigint, min_value_regionkey:bigint, max_value_regionkey:bigint, number_of_distinct_values_regionkey:bigint, number_of_non_null_values_regionkey:bigint, number_of_non_null_values_name:bigint, number_of_distinct_values_name:bigint, total_size_in_bytes_name:bigint, max_value_size_in_bytes_name:bigint, number_of_non_null_values_comment:bigint, number_of_distinct_values_comment:bigint, total_size_in_bytes_commen>
                 - Aggregate(FINAL)[PlanNodeId 3] => [number_of_non_null_values_comment:bigint, total_size_in_bytes_comment:bigint, total_size_in_bytes_name:bigint, number_of_non_null_values_regionkey:bigint, max_value_regionkey:bigint, number_of_distinct_values_comment:bigint, number_of_non_null_values_name:bigint, rowcount:bigint, min_value_regionkey:bigint, number_of_distinct_values_regionkey:bigint, max_value_size_in_bytes_name:bigint, max_value_size_in_bytes_comment:bigint, nu>
                         number_of_non_null_values_comment := "presto.default.count"((count_3)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                 >
                         total_size_in_bytes_comment := "presto.default.sum_data_size_for_stats"((sum_data_size_for_stats_5)) (1:9)                                                                                                                                                                                                                                                                                                                                                                   >
                         total_size_in_bytes_name := "presto.default.sum_data_size_for_stats"((sum_data_size_for_stats)) (1:9)                                                                                                                                                                                                                                                                                                                                                                        >
                         number_of_non_null_values_regionkey := "presto.default.count"((count_0)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                               >
                         max_value_regionkey := "presto.default.max"((max)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                     >
                         number_of_distinct_values_comment := "presto.default.approx_distinct"((approx_distinct_4)) (1:9)                                                                                                                                                                                                                                                                                                                                                                             >
                         number_of_non_null_values_name := "presto.default.count"((count_1)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                    >
                         rowcount := "presto.default.count"((count))                                                                                                                                                                                                                                                                                                                                                                                                                                  >
                         min_value_regionkey := "presto.default.min"((min)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                     >
                         number_of_distinct_values_regionkey := "presto.default.approx_distinct"((approx_distinct)) (1:9)                                                                                                                                                                                                                                                                                                                                                                             >
                         max_value_size_in_bytes_name := "presto.default.max_data_size_for_stats"((max_data_size_for_stats)) (1:9)                                                                                                                                                                                                                                                                                                                                                                    >
                         max_value_size_in_bytes_comment := "presto.default.max_data_size_for_stats"((max_data_size_for_stats_6)) (1:9)                                                                                                                                                                                                                                                                                                                                                               >
                         number_of_distinct_values_name := "presto.default.approx_distinct"((approx_distinct_2)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                >
                     - LocalExchange[PlanNodeId 241][SINGLE] () => [max_data_size_for_stats_6:bigint, approx_distinct:varbinary, count:bigint, count_1:bigint, count_0:bigint, count_3:bigint, max:bigint, sum_data_size_for_stats_5:bigint, approx_distinct_2:varbinary, approx_distinct_4:varbinary, sum_data_size_for_stats:bigint, max_data_size_for_stats:bigint, min:bigint]                                                                                                                    >
                         - RemoteStreamingExchange[PlanNodeId 247][GATHER] => [max_data_size_for_stats_6:bigint, approx_distinct:varbinary, count:bigint, count_1:bigint, count_0:bigint, count_3:bigint, max:bigint, sum_data_size_for_stats_5:bigint, approx_distinct_2:varbinary, approx_distinct_4:varbinary, sum_data_size_for_stats:bigint, max_data_size_for_stats:bigint, min:bigint]                                                                                                         >
                             - Aggregate(PARTIAL)[PlanNodeId 245] => [max_data_size_for_stats_6:bigint, approx_distinct:varbinary, count:bigint, count_1:bigint, count_0:bigint, count_3:bigint, max:bigint, sum_data_size_for_stats_5:bigint, approx_distinct_2:varbinary, approx_distinct_4:varbinary, sum_data_size_for_stats:bigint, max_data_size_for_stats:bigint, min:bigint]                                                                                                                  >
                                     max_data_size_for_stats_6 := "presto.default.max_data_size_for_stats"((comment)) (1:9)                                                                                                                                                                                                                                                                                                                                                                           >
                                     approx_distinct := "presto.default.approx_distinct"((regionkey)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                           >
                                     count := "presto.default.count"(*)                                                                                                                                                                                                                                                                                                                                                                                                                               >
                                     count_1 := "presto.default.count"((name)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                  >
                                     count_0 := "presto.default.count"((regionkey)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                             >
                                     count_3 := "presto.default.count"((comment)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                               >
                                     max := "presto.default.max"((regionkey)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                   >
                                     sum_data_size_for_stats_5 := "presto.default.sum_data_size_for_stats"((comment)) (1:9)                                                                                                                                                                                                                                                                                                                                                                           >
                                     approx_distinct_2 := "presto.default.approx_distinct"((name)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                              >
                                     approx_distinct_4 := "presto.default.approx_distinct"((comment)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                           >
                                     sum_data_size_for_stats := "presto.default.sum_data_size_for_stats"((name)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                >
                                     max_data_size_for_stats := "presto.default.max_data_size_for_stats"((name)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                >
                                     min := "presto.default.min"((regionkey)) (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                   >
                                 - TableScan[PlanNodeId 0][TableHandle {connectorId='iceberg', connectorHandle='region$data@Optional[5128365501455863835]', layout='Optional[region$data@Optional[5128365501455863835]]'}] => [regionkey:bigint, name:varchar, comment:varchar]                                                                                                                                                                                                                       >
                                         Estimates: {source: CostBasedSourceInfo, rows: 5 (45B), cpu: 595.00, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                                                                                                                            >
                                         regionkey := 1:regionkey:bigint (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                        >
                                         comment := 3:comment:varchar (1:9)                                                                                                                                                                                                                                                                                                                                                                                                                           >
                                         name := 2:name:varchar (1:9)

IMO big takeaway is that the additional project we added in the plan is actually optimized away at a later phase after plan generation, so it is essentially the same plan as before. When I add histograms in a later PR which requires CAST on inputs and has other constant parameters, the project node will stick around.

aaneja
aaneja previously approved these changes Jul 18, 2024
Adds support in the ANALYZE infrastructure for additional
projections or constant arguments to aggregation functions.
This allows connectors to support a wider range of functionality
when generating metadata for column statistic collection
@ZacBlanco ZacBlanco dismissed stale reviews from aaneja and hantangwangd via b7eb12f July 18, 2024 15:27
@ZacBlanco ZacBlanco force-pushed the upstream-connector-sqlfunction-support branch from b9ac488 to b7eb12f Compare July 18, 2024 15:27
@tdcmeehan tdcmeehan removed their assignment Jul 19, 2024
@ZacBlanco ZacBlanco merged commit d2d8642 into prestodb:master Jul 19, 2024
56 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants