-
Notifications
You must be signed in to change notification settings - Fork 1.5k
refactor: Use SpillManager for all spilling scenarios #15405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 lgtm
I'm also thinking if we should give a spill manager the read spilled files functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a very nice refactor to me -- thank you @2010YOUY01
@@ -981,28 +978,30 @@ impl GroupedHashAggregateStream { | |||
Ok(()) | |||
} | |||
|
|||
/// Emit all rows, sort them, and store them on disk. | |||
/// Emit all intermediate aggregation states, sort them, and store them on disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
fn spill(&mut self) -> Result<()> { | ||
// Emit and sort intermediate aggregation state | ||
let Some(emit) = self.emit(EmitTo::All, true)? else { | ||
return Ok(()); | ||
}; | ||
let sorted = sort_batch(&emit, self.spill_state.spill_expr.as_ref(), None)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually it might make sense to have the spill manager handle sorting the runs too (so it could potentially merge multiple files into a single run to reduce fanout, etc
@@ -92,6 +50,10 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> { | |||
|
|||
/// Spill the `RecordBatch` to disk as smaller batches | |||
/// split by `batch_size_rows` | |||
#[deprecated( | |||
since = "46.0.0", | |||
note = "This method is deprecated. Use `SpillManager::spill_record_batch_by_size` instead." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Yes there is already a basic one:
In the future if there are more functions to read spilled files, I think they should also be included inside SpillManager
|
Thank you @2010YOUY01 and @comphead |
* Use SpillManager in all spilling scenarios * resolve conflict * fix ci format
* Use SpillManager in all spilling scenarios * resolve conflict * fix ci format
Which issue does this PR close?
SpillManager
inAggregateExec
andSortMergeJoinExec
#15374Rationale for this change
#15355 Introduced
SpillManager
as a new interface for spilling related operations, and updateSortExec
to use it.This PR update all spilling related operations to use the new
SpillManager
interface.What changes are included in this PR?
AggregateExec
,SortMergeJoinExec
]:SpillMetrics
spill_manager
insideSpillManager
interfaceAre these changes tested?
Existing tests.
Are there any user-facing changes?
No