-
Notifications
You must be signed in to change notification settings - Fork 0
POC: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: file_group_stats
Are you sure you want to change the base?
Conversation
@@ -174,6 +175,21 @@ impl ExecutionPlan for DataSourceExec { | |||
self.data_source.statistics() | |||
} | |||
|
|||
fn statistics_by_partition(&self) -> datafusion_common::Result<Vec<Statistics>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partition-level statistic of DataSource
will be passed to downstream nodes.
We need to implement the method for different nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think this may end up being a decent amount of work. For our use case at polygon.io we are probably going to want to impl it for FilterExec and UnionExec at a minimum, I'm probably missing some others too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have our own in-house version of ProgressiveEval, which we are currently iterating upon to make generalized. This part (extracting and ordering all partitions in the DAG below) is indeed the majority of the work thus far.
We've debugged and gotten unit tests for the majority of that code. I'll have a chat with @alamb about how much we can move upstream to the apache project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this part will work well after this PR is merged: apache#15432 which introduces the partition-level statistics.
And after apache#15432 is merged, we can add the statistics_by_partition
API to ExecutionPlan
and implement it for all nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly good to me, save for the issue about returning a SortMerge. Also seemed like there were some stray calls to unwrap
that shouldn't be there
ed95de9
to
ced059c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a really quick skim to be helpful; not sure if these comments may lead you astray (we have a lot of tech debt in our implementation which I'm still extracting).
Let me know if you want to have a chat and sync on this project. I'm on the datafusion discord at @holometabola.
let partition_groups = input | ||
.statistics_by_partition() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This partition group mapping (swapping ordering by statistics) is also the approach we are using.
Self { | ||
input, | ||
expr, | ||
metrics: ExecutionPlanMetricsSet::new(), | ||
fetch: None, | ||
cache, | ||
enable_round_robin_repartition: true, | ||
progressive_eval_exec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are instead replacing the SPM node with a ProgressEval node during an optimizer run. I assume this is a WIP and that will occur eventually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, doing it in Optimizer makes sense to me
} | ||
|
||
/// Concat input streams until reaching the fetch limit | ||
struct ProgressiveEvalStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of your stream structures are a close mirror to how we do it too. 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, from the PR
@@ -174,6 +175,21 @@ impl ExecutionPlan for DataSourceExec { | |||
self.data_source.statistics() | |||
} | |||
|
|||
fn statistics_by_partition(&self) -> datafusion_common::Result<Vec<Statistics>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have our own in-house version of ProgressiveEval, which we are currently iterating upon to make generalized. This part (extracting and ordering all partitions in the DAG below) is indeed the majority of the work thus far.
We've debugged and gotten unit tests for the majority of that code. I'll have a chat with @alamb about how much we can move upstream to the apache project.
.map(|min_max_stats| { | ||
let res = min_max_stats.first_fit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The binpacking into nonoverlapping chains occurs here.
We do something a bit different. We are ungrouping at the data source level such that we increase the number of non-overlapping partitions (and it's min/max range). It works for the plans we usually build, but may not be the best solution for the general approach. 🤔
Thanks, @wiedld. After reading your review comments, I think we're very close. I'll create a subgroup in DF Discord, and we can talk more next Monday. And Happy Weekend! |
I agree it will be good to have a chat -- from my perspective if we can work with @suremarc and @xudong963 to implement the more general analysis upstream that is probably preferable to doing it ourself internally. Once it is implemented upstream we could then backport it into our codebase temporarily as we work through the upgrades, perhaps |
d8090fe
to
04bb1d6
Compare
In case anyone else is interested, the subgroup in discord is: https://discord.com/channels/885562378132000778/1356122416258220114/1356122427591098378 |
04bb1d6
to
e775493
Compare
This is the POC for optimized SPM to avoid merging non-overlapping partitions.
The PR glues three PRs:
SortPreservingMergeExec
to avoid merging non-overlapping partitions apache/datafusion#13296 to optimizeSortPreservingMergeExec
to avoid merging non-overlapping partitionsFileGroup
, aka, partition-level statistics.ProgressiveEval
operator apache/datafusion#10490Finally, we get the following result!!