-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: Redundant files spilled during external sort + introduce SpillManager
#15355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THanks @2010YOUY01 -- this is looking good
I left a question about the change to external sorting
I really like the idea of the SpillManager -- maybe we could start this project with a single PR to add the SpillManger and pull the common code out (in datafusion/physical-plan/src/spill/manager.rs for example) and then do a follow on ticket to add new features
@@ -65,23 +63,14 @@ struct ExternalSorterMetrics { | |||
/// metrics | |||
baseline: BaselineMetrics, | |||
|
|||
/// count of spills during the execution of the operator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
/// Arrow IPC format) | ||
/// Within the same spill file, the data might be chunked into multiple batches, | ||
/// and ordered by sort keys. | ||
finished_spill_files: Vec<RefCountedTempFile>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make more sense to have the SpillManager
own these files so there can't be different sets of references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be hard to define the semantics of those temp files if we put them inside SpillManager
, because different operators will interpret those files differently:
- For
SortExec
,vec<RefCountedTempFile>
is representing multiple sorted runs on sort keys. - For
ShuffleWriterExec
indatafusion-comet
, sinceSpark
's shuffle operator is blocking (due to spark's staged execution design), it might want to keepvec<InProgresSpillFile>
instead. - Similarly, if we want to spill
Row
s to accelerateSortExec
, or we want to implement spilling hash join, the temp files will have very different logical meanings.
Overall, the SpillManager
is designed only to do RecordBatch <-> raw file
with different configurations and stat accounting. Operators have more flexibility to implement specific utilities for managing raw files, which have diverse semantics.
Do you see any potential issues or improvements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The different semantics for different operations makes sense to me
I was thinking more mechnically, like just storing the Vecas a field on
SortManager` and allowing Sort and Hash, etc to access / manipulate it as required. I think it is fine to consider this in a future PR as well
/// Returns the amount of memory freed. | ||
async fn spill(&mut self) -> Result<usize> { | ||
/// When calling, all `in_mem_batches` must be sorted, and then all of them will | ||
/// be appended to the in-progress spill file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they must all be sorted, then maybe you can put an assert/check that self.in_mem_batches_sorted
is true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in bf4ab62
internal_datafusion_err!("In-progress spill file should be initialized") | ||
})?; | ||
|
||
for batch in batches { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this logic -- i thought that each individual self.in_mem_batches
was sorted but they aren't sorted overall
Thus if we write write them back to back to the same spill file, the spill file itself won't be sorted
Like if the two in memory batches are
A | B |
---|---|
1 | 10 |
2 | 10 |
2 | 10 |
A | B |
---|---|
1 | 10 |
2 | 10 |
2 | 10 |
I think this code would produce a single spill file like
A | B |
---|---|
1 | 10 |
2 | 10 |
2 | 10 |
1 | 10 |
2 | 10 |
2 | 10 |
Which is not sorted 🤔
On the other hand all the tests are passing so maybe I misunderstand what this is doing (or we have a testing gap)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they are globally sorted. In different stages, in_mem_batches
can either represent unordered input, or globally sorted run (but chunked into smaller batches)
I agree this approach has poor understandability and is error-prone, I'll try to improve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks -- maybe for this PR we could just add some comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #15372
@@ -223,25 +229,182 @@ impl IPCStreamWriter { | |||
} | |||
} | |||
|
|||
/// The `SpillManager` is responsible for the following tasks: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the spill manager 👍
I request that we do this feature in multiple smaller PRs which will be easier to review / understand BTW I think this PR may address some of this issue to0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01 -- I think this is a great step forward: the code is more nicely structured and I think the spilling works better.
I left several comments about potential future improvements, but most of them probably should be done as follow on PRs.
Perhaps @Kontinuation or @kazuyukitanimura has some time to review as well
} | ||
} | ||
|
||
pub(crate) struct InProgressSpillFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help to add some high level comments here about what an InProgressSpill file is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in bf4ab62
|
||
/// Finishes the in-progress spill file and moves it to the finished spill files. | ||
async fn spill_finish(&mut self) -> Result<()> { | ||
let mut in_progress_file = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am finding the various states of the ExternalSorter hard to track (specifically what are the valid combinations of in_mem_batches
, in_progress_spill_file
, spill
, and sorted_in_mem
I wonder if we could move to some sort of state enum that would make this easier to understand
Like
struct SortState
AllInMemory {...}
InProgressSpill { ... }
AllOnDisk {...}
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #15372
/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files. | ||
/// For example, all records within the same spill file are ordered according to a specific order. | ||
#[derive(Debug, Clone)] | ||
pub(crate) struct SpillManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow on PR, I suggest starting to break up this code into multiple modules (like spill/mod.rs
, spill/spill_manager.rs
, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #15373
It seems to be a regression introduced by #14823. |
That's true, so I feel obligated to fix it. Thank you for the review @alamb and @Kontinuation , I have addressed the review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 love tests. I think we need to move other ops like sort_merge_join or row hash to spilled manager?
I'll create a tickets if so
@2010YOUY01 is this something that should be tracked with a follow on ticket? |
It looks to me like there are 4 approvals of this PR and a bunch of potential work stacked up on it, so let's merge it to keep the code flowing Thank you everyone for the reviews and work. It is so exciting to see this area of DataFusion get love |
Thank you, I already did it by filing #15374 |
@alamb Ah no, I was referring to the fix in this PR. |
…anager` (apache#15355) * implement SpillManager * more comments
…anager` (apache#15355) * implement SpillManager * more comments
Which issue does this PR close?
max_temp_directory_size
to limit max disk usage for spilling queries #14975Rationale for this change
What's the inefficiency
Let's walkthrough an example, there is one external sort query with 1 partition, and sort exec has:
sort_spill_reservation_bytes
(see configuration explanation in https://datafusion.apache.org/user-guide/configs.html)During the execution:
SortExec
will read in batches, and 10MB memory limit is reachedInefficency: Now
ExternalSorter
will create a new spill file for those 1MB merged batches, after spilling all intermediates, all spilled files will be merged at once, then there are too many files to merge.Ideal case: All batches in a single sorted run can be incrementally appended to a single file.
Reproducer
Execute datafusion-cli with
cargo run --profile release-nonlto -- --mem-pool-type fair -m 10M
Main: 10 spills
PR: 2 spills
Rationale for the fix
Introduced a new spill interface
SpillManager
with the ability of incrementally appending batches to a written file.SpillManager
is designed to doRecordBatch <---> raw file
, and configurations can be put insideSpillManager
to control how do we do the serialization physically for future optimizations.Example configurations:
lz4
Arrow IPC
, or configurations to change theIPC Writer
behaviordatafusion-comet
's proprietary serde implementation in feat: Implement custom RecordBatch serde for shuffle for improved performance datafusion-comet#1190SpillManager
is not responsible for holding spilled files inside, because the logical representation of those files can vary, I think it's clearer to place those raw files inside spilling operators.For example,
vec<RefCountedTempFile>
is managed insideSortExec
, the implicit rule is within each file all entries are sorted by the sort keys, also inComet
'sShuffleWriterExec
, each partition should maintain one in-progress file. If we keep those tempfiles insideSpillManager
, it's hard to clearly define those implicit requirements.SpillManager
is responsible for updating related statistics, the spill-related metrics should be the same across operators, so this part of of code can also be reused. Also, total disk usage limit for spilled files can be easily implemented upon it.Why refactor and introduce
SpillManager
This fix can be implemented without a major refactor. However, this change is included to prepare for supporting disk limits for spilling queries, as described in #14975
What changes are included in this PR?
SpillManager
SortExec
to use the newSpillManager
interfaceTODO:
- [ ] There are two extra operators that can be changed to this new interface (Aggregate
andSortMergeJoin
), they're planned to be included in this PR. I plan to do it after getting some review feedback.It will be done as a follow on to minimize the current patch.
Are these changes tested?
For the too-many-spills issue: one test case is updated, and more comment is added above the assertion to prevent regression.
For
SpillManager
: unit tests are included.Are there any user-facing changes?
No.