Skip to content

POC: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: file_group_stats
Choose a base branch
from

Conversation

xudong963
Copy link
Owner

@xudong963 xudong963 commented Mar 27, 2025

This is the POC for optimized SPM to avoid merging non-overlapping partitions.

The PR glues three PRs:

  1. The origin PR: feat: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions apache/datafusion#13296 to optimize SortPreservingMergeExec to avoid merging non-overlapping partitions
  2. The PR: Support computing statistics for FileGroup apache/datafusion#15432 which adds statistics for FileGroup, aka, partition-level statistics.
  3. Add ProgressiveEval operator: feat: Add ProgressiveEval operator apache/datafusion#10490

Finally, we get the following result!!

DataFusion CLI v46.0.1
> set datafusion.execution.collect_statistics = true;
0 row(s) fetched. 
Elapsed 0.003 seconds.

> CREATE EXTERNAL TABLE t2 (id INT not null, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC);
0 row(s) fetched. 
Elapsed 0.006 seconds.

> INSERT INTO t2 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04');
+-------+
| count |
+-------+
| 4     |
+-------+
1 row(s) fetched. 
Elapsed 0.022 seconds.

> EXPLAIN SELECT * FROM t2 ORDER BY id ASC;
[datafusion/core/src/datasource/listing/table.rs:859:9] state.config().collect_statistics() = true
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: t2.id ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |   TableScan: t2 projection=[id, date]                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| physical_plan | ProgressiveEvalExec: partition_groups=[[3, 2, 1, 0]]                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   DataSourceExec: file_groups={4 groups: [[Users/xudong/opensource/datafusion/data/date=2025-03-01/GE7lpLxg3gu27zCY.parquet], [Users/xudong/opensource/datafusion/data/date=2025-03-02/GE7lpLxg3gu27zCY.parquet], [Users/xudong/opensource/datafusion/data/date=2025-03-03/GE7lpLxg3gu27zCY.parquet], [Users/xudong/opensource/datafusion/data/date=2025-03-04/GE7lpLxg3gu27zCY.parquet]]}, projection=[id, date], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.012 seconds.

> SELECT * FROM t2 ORDER BY id ASC;
[datafusion/core/src/datasource/listing/table.rs:859:9] state.config().collect_statistics() = true
+----+------------+
| id | date       |
+----+------------+
| 1  | 2025-03-04 |
| 2  | 2025-03-03 |
| 3  | 2025-03-02 |
| 4  | 2025-03-01 |
+----+------------+
4 row(s) fetched. 
Elapsed 0.012 seconds.

> EXPLAIN SELECT * FROM t2 ORDER BY id ASC limit 2;
[datafusion/core/src/datasource/listing/table.rs:859:9] state.config().collect_statistics() = true
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: t2.id ASC NULLS LAST, fetch=2                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |   TableScan: t2 projection=[id, date]                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| physical_plan | ProgressiveEvalExec: fetch=2, partition_groups=[[3, 2, 1, 0]]                                                                                                                                                                                                                                                                                                                                                                                                               |
|               |   DataSourceExec: file_groups={4 groups: [[Users/xudong/opensource/datafusion/data/date=2025-03-01/GE7lpLxg3gu27zCY.parquet], [Users/xudong/opensource/datafusion/data/date=2025-03-02/GE7lpLxg3gu27zCY.parquet], [Users/xudong/opensource/datafusion/data/date=2025-03-03/GE7lpLxg3gu27zCY.parquet], [Users/xudong/opensource/datafusion/data/date=2025-03-04/GE7lpLxg3gu27zCY.parquet]]}, projection=[id, date], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.008 seconds.

> SELECT * FROM t2 ORDER BY id ASC limit 2;
[datafusion/core/src/datasource/listing/table.rs:859:9] state.config().collect_statistics() = true
+----+------------+
| id | date       |
+----+------------+
| 1  | 2025-03-04 |
| 2  | 2025-03-03 |
+----+------------+
2 row(s) fetched. 
Elapsed 0.011 seconds.

> 

@@ -174,6 +175,21 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.statistics()
}

fn statistics_by_partition(&self) -> datafusion_common::Result<Vec<Statistics>> {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partition-level statistic of DataSource will be passed to downstream nodes.

We need to implement the method for different nodes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this may end up being a decent amount of work. For our use case at polygon.io we are probably going to want to impl it for FilterExec and UnionExec at a minimum, I'm probably missing some others too.

Copy link

@wiedld wiedld Mar 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have our own in-house version of ProgressiveEval, which we are currently iterating upon to make generalized. This part (extracting and ordering all partitions in the DAG below) is indeed the majority of the work thus far.

We've debugged and gotten unit tests for the majority of that code. I'll have a chat with @alamb about how much we can move upstream to the apache project.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part will work well after this PR is merged: apache#15432 which introduces the partition-level statistics.

And after apache#15432 is merged, we can add the statistics_by_partition API to ExecutionPlan and implement it for all nodes.

Copy link

@suremarc suremarc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good to me, save for the issue about returning a SortMerge. Also seemed like there were some stray calls to unwrap that shouldn't be there

Copy link

@wiedld wiedld left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a really quick skim to be helpful; not sure if these comments may lead you astray (we have a lot of tech debt in our implementation which I'm still extracting).

Let me know if you want to have a chat and sync on this project. I'm on the datafusion discord at @holometabola.

Comment on lines +115 to +116
let partition_groups = input
.statistics_by_partition()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This partition group mapping (swapping ordering by statistics) is also the approach we are using.

Self {
input,
expr,
metrics: ExecutionPlanMetricsSet::new(),
fetch: None,
cache,
enable_round_robin_repartition: true,
progressive_eval_exec,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are instead replacing the SPM node with a ProgressEval node during an optimizer run. I assume this is a WIP and that will occur eventually?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, doing it in Optimizer makes sense to me

}

/// Concat input streams until reaching the fetch limit
struct ProgressiveEvalStream {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of your stream structures are a close mirror to how we do it too. 😆

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache#10490

Yes, from the PR

@@ -174,6 +175,21 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.statistics()
}

fn statistics_by_partition(&self) -> datafusion_common::Result<Vec<Statistics>> {
Copy link

@wiedld wiedld Mar 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have our own in-house version of ProgressiveEval, which we are currently iterating upon to make generalized. This part (extracting and ordering all partitions in the DAG below) is indeed the majority of the work thus far.

We've debugged and gotten unit tests for the majority of that code. I'll have a chat with @alamb about how much we can move upstream to the apache project.

Comment on lines +122 to +123
.map(|min_max_stats| {
let res = min_max_stats.first_fit();
Copy link

@wiedld wiedld Mar 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The binpacking into nonoverlapping chains occurs here.

We do something a bit different. We are ungrouping at the data source level such that we increase the number of non-overlapping partitions (and it's min/max range). It works for the plans we usually build, but may not be the best solution for the general approach. 🤔

@xudong963
Copy link
Owner Author

Let me know if you want to have a chat and sync on this project. I'm on the datafusion discord at @holometabola.

Thanks, @wiedld. After reading your review comments, I think we're very close. I'll create a subgroup in DF Discord, and we can talk more next Monday. And Happy Weekend!

@alamb
Copy link

alamb commented Mar 29, 2025

We've debugged and gotten unit tests for the majority of that code. I'll have a chat with @alamb about how much we can move upstream to the apache project.

I agree it will be good to have a chat -- from my perspective if we can work with @suremarc and @xudong963 to implement the more general analysis upstream that is probably preferable to doing it ourself internally. Once it is implemented upstream we could then backport it into our codebase temporarily as we work through the upgrades, perhaps

@xudong963 xudong963 force-pushed the file_group_stats branch 2 times, most recently from d8090fe to 04bb1d6 Compare March 31, 2025 01:32
@alamb
Copy link

alamb commented Mar 31, 2025

Let me know if you want to have a chat and sync on this project. I'm on the datafusion discord at @holometabola.

Thanks, @wiedld. After reading your review comments, I think we're very close. I'll create a subgroup in DF Discord, and we can talk more next Monday. And Happy Weekend!

In case anyone else is interested, the subgroup in discord is: https://discord.com/channels/885562378132000778/1356122416258220114/1356122427591098378

@alamb
Copy link

alamb commented Apr 1, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants