Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Optimizer Sanity Checker, improve sortedness equivalence properties #11196

Merged
merged 34 commits into from
Jul 3, 2024

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Jul 1, 2024

Rationale for this change

Most of the contribution in this PR comes from @yfy- . Thanks @yfy- for working on this issue.

We extend the existing PipelineChecker with additional checks of order and distribution requirements. In a physical plan each plan should be pipeline friendly (previously checked by PipelineChecker) and each child plan's output order and partitioning should satisfy the parent's respective requirements. We combine these checks into the SanityCheckPlan proposed with this PR.

With the SanityCheckPlan, as the optimizer steps change and grow, we ensure that the order and distribution requirements of the final phsyical plan are always satisfied so that it can yield correct results.

What changes are included in this PR?

  1. The SanityCheckPlan step as the last step of the physical plan optimizations.
  2. SanityCheckPlan contains the former PipelineChecker, that's why PipelineChecker is deleted.
  3. Add a ConstExpr API to manage tracking constant expressions

Are these changes tested?

We use the existing test cases of the former PipelineChecker. In addition following cases are added:

Using BoundedWindowAggExec

2 cases: First with the child order requirement is satisfied and another one that is not satisifed.

Using GlobalLimitExec

2 cases: First with the child distribution requirement is satisfied and another one that is not satisifed.

Using LocalLimitExec

We check when there are no requirements at all our check passes.

Using SortMergeJoinExec

3 cases: First case with both children satisify both requirements. Second case, where the second child does not satisfy the order requirement. Finally, a case where the second child does not satisfy distribution requirements.

yfy- and others added 30 commits June 9, 2024 20:36
Only includes sort reqs, docs will be added.
@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jul 1, 2024
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this PR carefully and it looks good to me. Thank you @yfy- and @mustafasrepo for collaborating on this.

Hopefully this rule will make us discover plan bugs much earlier, prevent regressions and increase developer discipline on operator and rule implementations.

@alamb alamb changed the title Optimizer Sanity Checker Add Optimizer Sanity Checker, improve sortedness equivalence properties Jul 2, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo (and @yfy- ) - I found this PR very well written, documented and tested.

I had a few small comment suggestions as I read this PR, but overall I thought it looked quite nice

I also added a few notes to the description / title of the PR to make it clear this also improves some of the sortedness matching calculations in addtion to adding a new sanity check

datafusion/core/src/physical_optimizer/sanity_checker.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/equivalence/class.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/equivalence/class.rs Outdated Show resolved Hide resolved
@@ -173,6 +174,12 @@ impl EquivalenceProperties {
self.oeq_class.clear();
}

/// Removes constant expressions that may change across partitions.
/// This method should be used when different partitions are fused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a "fused" partition mean? That is not a term I have run into previously

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is data from multiple partitions ends up in another partition (operators that does this CoalescePartitionsExec, SortPreservingMergeExec, InterleaveExec, RepartitionExec). I changed to term to "merge" in 747b69b. However, If you have other suggestions which communicates the intent in this context better (maybe more common vocabulary in literature). I can update with that term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge sounds good -- thank you

// First, project existing constants. For example, assume that `a + b`
// is known to be constant. If the projection were `a as a_new`, `b as b_new`,
// then we would project constant `a + b` as `a_new + b_new`.
let mut projected_constants = self
.constants
.iter()
.flat_map(|expr| self.eq_group.project_expr(mapping, expr))
.flat_map(|const_expr| {
self.eq_group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could add an API like ConstExpr::map that applied a function to the contained Expr. That way this could look like

            .flat_map(|expr| expr.map(|expr| self.eq_group.project_expr(mapping, expr))))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 747b69b.

12)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
13)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
14)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
03)----CoalesceBatchesExec: target_batch_size=2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these results due to changing the setting

set datafusion.optimizer.prefer_existing_sort = true;

Or to the changes in the calculations for the sortedness calculations?

The plan seems better to me (there is no sort)

Copy link
Contributor Author

@mustafasrepo mustafasrepo Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, previous plan was wrong (Another case this new rule helped us discover), since one of the children of the SortMergeJoin doesn't have ordering: [a ASC]. After fix, generated plan was quite different. the intent of this test was (as far as I can tell) to see order by at the top of the SortMergeJoin can be pushed down through SortMergeJoin. Hence, I changed the setting to preserve this behaviour.

The result change (where sort is removed) comes from the setting change indeed. However, as I said the plan has changed with the setting set datafusion.optimizer.prefer_existing_sort = false; also. Hence, I decided to change flag to preserve the intent of the test.

@alamb alamb merged commit b76c1b7 into apache:main Jul 3, 2024
23 checks passed
@alamb
Copy link
Contributor

alamb commented Jul 3, 2024

Thanks @mustafasrepo !

@alamb
Copy link
Contributor

alamb commented Jul 7, 2024

FYI this check is now failing on one of the sql planning benchmarks: #11322

comphead pushed a commit to comphead/arrow-datafusion that referenced this pull request Jul 8, 2024
…es (apache#11196)

* Initial optimizer sanity checker.

Only includes sort reqs, docs will be added.

* Add distro and pipeline friendly checks

* Also check the plans we create are correct.

* Add distribution test cases using global limit exec.

* Add test for multiple children using SortMergeJoinExec.

* Move PipelineChecker to SanityCheckPlan

* Fix some tests and add docs

* Add some test docs and fix clippy diagnostics.

* Fix some failing tests

* Replace PipelineChecker with SanityChecker in .slt files.

* Initial commit

* Slt tests pass

* Resolve linter errors

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Sort PreservingMerge clear per partition

* Minor changes

* Update output_requirements.rs

* Address reviews

* Update datafusion/core/src/physical_optimizer/optimizer.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Update datafusion/core/src/physical_optimizer/sanity_checker.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Address reviews

* Minor changes

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Update comment

* Add map implementation

---------

Co-authored-by: Erman Yafay <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…es (apache#11196)

* Initial optimizer sanity checker.

Only includes sort reqs, docs will be added.

* Add distro and pipeline friendly checks

* Also check the plans we create are correct.

* Add distribution test cases using global limit exec.

* Add test for multiple children using SortMergeJoinExec.

* Move PipelineChecker to SanityCheckPlan

* Fix some tests and add docs

* Add some test docs and fix clippy diagnostics.

* Fix some failing tests

* Replace PipelineChecker with SanityChecker in .slt files.

* Initial commit

* Slt tests pass

* Resolve linter errors

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Sort PreservingMerge clear per partition

* Minor changes

* Update output_requirements.rs

* Address reviews

* Update datafusion/core/src/physical_optimizer/optimizer.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Update datafusion/core/src/physical_optimizer/sanity_checker.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Address reviews

* Minor changes

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Update comment

* Add map implementation

---------

Co-authored-by: Erman Yafay <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
@alamb
Copy link
Contributor

alamb commented Jul 16, 2024

We found an issue (that I think is exposed by this change, not caused by it) while upgrading InfluxDB 3.0: #11492

appletreeisyellow pushed a commit to influxdata/arrow-datafusion that referenced this pull request Jul 17, 2024
…es (apache#11196)

* Initial optimizer sanity checker.

Only includes sort reqs, docs will be added.

* Add distro and pipeline friendly checks

* Also check the plans we create are correct.

* Add distribution test cases using global limit exec.

* Add test for multiple children using SortMergeJoinExec.

* Move PipelineChecker to SanityCheckPlan

* Fix some tests and add docs

* Add some test docs and fix clippy diagnostics.

* Fix some failing tests

* Replace PipelineChecker with SanityChecker in .slt files.

* Initial commit

* Slt tests pass

* Resolve linter errors

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Sort PreservingMerge clear per partition

* Minor changes

* Update output_requirements.rs

* Address reviews

* Update datafusion/core/src/physical_optimizer/optimizer.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Update datafusion/core/src/physical_optimizer/sanity_checker.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Address reviews

* Minor changes

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Update comment

* Add map implementation

---------

Co-authored-by: Erman Yafay <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
appletreeisyellow pushed a commit to influxdata/arrow-datafusion that referenced this pull request Jul 17, 2024
* fix(11397): do not surface errors for closed channels, and instead let the task join errors be surfaced

* fix(11397): terminate early on channel send failure

Add Optimizer Sanity Checker, improve sortedness equivalence properties (apache#11196)

* Initial optimizer sanity checker.

Only includes sort reqs, docs will be added.

* Add distro and pipeline friendly checks

* Also check the plans we create are correct.

* Add distribution test cases using global limit exec.

* Add test for multiple children using SortMergeJoinExec.

* Move PipelineChecker to SanityCheckPlan

* Fix some tests and add docs

* Add some test docs and fix clippy diagnostics.

* Fix some failing tests

* Replace PipelineChecker with SanityChecker in .slt files.

* Initial commit

* Slt tests pass

* Resolve linter errors

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Sort PreservingMerge clear per partition

* Minor changes

* Update output_requirements.rs

* Address reviews

* Update datafusion/core/src/physical_optimizer/optimizer.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Update datafusion/core/src/physical_optimizer/sanity_checker.rs

Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Address reviews

* Minor changes

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Update comment

* Add map implementation

---------

Co-authored-by: Erman Yafay <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
@alamb
Copy link
Contributor

alamb commented Jul 26, 2024

This check was being triggered in our downstream tests, but I think it actually found a real bug: #11675

@alamb
Copy link
Contributor

alamb commented Sep 12, 2024

I believe this check also found another bug / limitation: #12414

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants