Skip to content

refactor: Use SpillManager for all spilling scenarios #15405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 26, 2025

Conversation

2010YOUY01
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

#15355 Introduced SpillManager as a new interface for spilling related operations, and update SortExec to use it.
This PR update all spilling related operations to use the new SpillManager interface.

What changes are included in this PR?

  1. for exeuctor in [AggregateExec, SortMergeJoinExec]:
    • Group all spilling-related metrics to SpillMetrics
    • Include a new field spill_manager inside
    • Change all spilling operations to use the SpillManager interface
  2. Remove/deprecate old spilling utility functions

Are these changes tested?

Existing tests.

Are there any user-facing changes?

No

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 lgtm
I'm also thinking if we should give a spill manager the read spilled files functions?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a very nice refactor to me -- thank you @2010YOUY01

@@ -981,28 +978,30 @@ impl GroupedHashAggregateStream {
Ok(())
}

/// Emit all rows, sort them, and store them on disk.
/// Emit all intermediate aggregation states, sort them, and store them on disk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

fn spill(&mut self) -> Result<()> {
// Emit and sort intermediate aggregation state
let Some(emit) = self.emit(EmitTo::All, true)? else {
return Ok(());
};
let sorted = sort_batch(&emit, self.spill_state.spill_expr.as_ref(), None)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually it might make sense to have the spill manager handle sorting the runs too (so it could potentially merge multiple files into a single run to reduce fanout, etc

@@ -92,6 +50,10 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {

/// Spill the `RecordBatch` to disk as smaller batches
/// split by `batch_size_rows`
#[deprecated(
since = "46.0.0",
note = "This method is deprecated. Use `SpillManager::spill_record_batch_by_size` instead."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Mar 26, 2025

Thanks @2010YOUY01 lgtm I'm also thinking if we should give a spill manager the read spilled files functions?

Yes there is already a basic one:


In the future if there are more functions to read spilled files, I think they should also be included inside SpillManager

@2010YOUY01
Copy link
Contributor Author

Thank you @alamb and @comphead for the review.
I pushed another commit to resolve the conflict caused by a related refactor (split SpillManager to a separate file)

@alamb alamb merged commit e1a8e9d into apache:main Mar 26, 2025
27 checks passed
@alamb
Copy link
Contributor

alamb commented Mar 26, 2025

Thank you @2010YOUY01 and @comphead

qstommyshu pushed a commit to qstommyshu/datafusion that referenced this pull request Mar 27, 2025
* Use SpillManager in all spilling scenarios

* resolve conflict

* fix ci format
nirnayroy pushed a commit to nirnayroy/datafusion that referenced this pull request May 2, 2025
* Use SpillManager in all spilling scenarios

* resolve conflict

* fix ci format
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants