Skip to content

fix: Redundant files spilled during external sort + introduce SpillManager #15355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 24, 2025

Conversation

2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Mar 22, 2025

Which issue does this PR close?

Rationale for this change

What's the inefficiency

Let's walkthrough an example, there is one external sort query with 1 partition, and sort exec has:

During the execution:

  1. SortExec will read in batches, and 10MB memory limit is reached
  2. It will sorted all buffered batches in-place, and merge them at once. Note only the 1MB buffer from 10MB total limit is pre-reserved for merging, so there is only 1MB available to store the merged output.
  3. After we have collected 1MB of merged batch, one spill will be triggered. And this 1MB space will be cleared, the merging can continue.
    Inefficency: Now ExternalSorter will create a new spill file for those 1MB merged batches, after spilling all intermediates, all spilled files will be merged at once, then there are too many files to merge.
    Ideal case: All batches in a single sorted run can be incrementally appended to a single file.

Reproducer

Execute datafusion-cli with cargo run --profile release-nonlto -- --mem-pool-type fair -m 10M

set datafusion.execution.sort_spill_reservation_bytes = 1000000;
set datafusion.execution.target_partitions = 4;

explain analyze select * from generate_series(1, 1000000) as t1(v1) order by v1;

Main: 10 spills
PR: 2 spills

Rationale for the fix

Introduced a new spill interface SpillManager with the ability of incrementally appending batches to a written file.

  1. SpillManager is designed to do RecordBatch <---> raw file, and configurations can be put inside SpillManager to control how do we do the serialization physically for future optimizations.
    Example configurations:
  1. SpillManager is not responsible for holding spilled files inside, because the logical representation of those files can vary, I think it's clearer to place those raw files inside spilling operators.
    For example, vec<RefCountedTempFile> is managed inside SortExec, the implicit rule is within each file all entries are sorted by the sort keys, also in Comet's ShuffleWriterExec, each partition should maintain one in-progress file. If we keep those tempfiles inside SpillManager, it's hard to clearly define those implicit requirements.
  2. Additionally, SpillManager is responsible for updating related statistics, the spill-related metrics should be the same across operators, so this part of of code can also be reused. Also, total disk usage limit for spilled files can be easily implemented upon it.

Why refactor and introduce SpillManager

This fix can be implemented without a major refactor. However, this change is included to prepare for supporting disk limits for spilling queries, as described in #14975

What changes are included in this PR?

  1. Group spilling related metrics into one struct
  2. Introduce SpillManager
  3. Update SortExec to use the new SpillManager interface

TODO:

- [ ] There are two extra operators that can be changed to this new interface (Aggregate and SortMergeJoin), they're planned to be included in this PR. I plan to do it after getting some review feedback.
It will be done as a follow on to minimize the current patch.

Are these changes tested?

For the too-many-spills issue: one test case is updated, and more comment is added above the assertion to prevent regression.
For SpillManager: unit tests are included.

Are there any user-facing changes?

No.

@alamb
Copy link
Contributor

alamb commented Mar 22, 2025

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks @2010YOUY01 -- this is looking good

I left a question about the change to external sorting

I really like the idea of the SpillManager -- maybe we could start this project with a single PR to add the SpillManger and pull the common code out (in datafusion/physical-plan/src/spill/manager.rs for example) and then do a follow on ticket to add new features

@@ -65,23 +63,14 @@ struct ExternalSorterMetrics {
/// metrics
baseline: BaselineMetrics,

/// count of spills during the execution of the operator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

/// Arrow IPC format)
/// Within the same spill file, the data might be chunked into multiple batches,
/// and ordered by sort keys.
finished_spill_files: Vec<RefCountedTempFile>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to have the SpillManager own these files so there can't be different sets of references

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be hard to define the semantics of those temp files if we put them inside SpillManager, because different operators will interpret those files differently:

  • For SortExec, vec<RefCountedTempFile> is representing multiple sorted runs on sort keys.
  • For ShuffleWriterExec in datafusion-comet, since Spark's shuffle operator is blocking (due to spark's staged execution design), it might want to keep vec<InProgresSpillFile> instead.
  • Similarly, if we want to spill Rows to accelerate SortExec, or we want to implement spilling hash join, the temp files will have very different logical meanings.

Overall, the SpillManager is designed only to do RecordBatch <-> raw file with different configurations and stat accounting. Operators have more flexibility to implement specific utilities for managing raw files, which have diverse semantics.

Do you see any potential issues or improvements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different semantics for different operations makes sense to me

I was thinking more mechnically, like just storing the Vecas a field onSortManager` and allowing Sort and Hash, etc to access / manipulate it as required. I think it is fine to consider this in a future PR as well

/// Returns the amount of memory freed.
async fn spill(&mut self) -> Result<usize> {
/// When calling, all `in_mem_batches` must be sorted, and then all of them will
/// be appended to the in-progress spill file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they must all be sorted, then maybe you can put an assert/check that self.in_mem_batches_sorted is true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in bf4ab62

internal_datafusion_err!("In-progress spill file should be initialized")
})?;

for batch in batches {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this logic -- i thought that each individual self.in_mem_batches was sorted but they aren't sorted overall

Thus if we write write them back to back to the same spill file, the spill file itself won't be sorted

Like if the two in memory batches are

A B
1 10
2 10
2 10
A B
1 10
2 10
2 10

I think this code would produce a single spill file like

A B
1 10
2 10
2 10
1 10
2 10
2 10

Which is not sorted 🤔

On the other hand all the tests are passing so maybe I misunderstand what this is doing (or we have a testing gap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are globally sorted. In different stages, in_mem_batches can either represent unordered input, or globally sorted run (but chunked into smaller batches)
I agree this approach has poor understandability and is error-prone, I'll try to improve it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks -- maybe for this PR we could just add some comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #15372

@@ -223,25 +229,182 @@ impl IPCStreamWriter {
}
}

/// The `SpillManager` is responsible for the following tasks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the spill manager 👍

@alamb
Copy link
Contributor

alamb commented Mar 22, 2025

There are two extra operators that can be changed to this new interface (Aggregate and SortMergeJoin), they're planned to be included in this PR. I plan to do it after getting some review feedback.

I request that we do this feature in multiple smaller PRs which will be easier to review / understand

BTW I think this PR may address some of this issue to0:

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @2010YOUY01 -- I think this is a great step forward: the code is more nicely structured and I think the spilling works better.

I left several comments about potential future improvements, but most of them probably should be done as follow on PRs.

Perhaps @Kontinuation or @kazuyukitanimura has some time to review as well

}
}

pub(crate) struct InProgressSpillFile {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to add some high level comments here about what an InProgressSpill file is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in bf4ab62


/// Finishes the in-progress spill file and moves it to the finished spill files.
async fn spill_finish(&mut self) -> Result<()> {
let mut in_progress_file =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am finding the various states of the ExternalSorter hard to track (specifically what are the valid combinations of in_mem_batches, in_progress_spill_file, spill, and sorted_in_mem

I wonder if we could move to some sort of state enum that would make this easier to understand

Like

struct SortState
  AllInMemory {...}
  InProgressSpill { ... }
  AllOnDisk {...}
...
}

Copy link
Contributor Author

@2010YOUY01 2010YOUY01 Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #15372

/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
/// For example, all records within the same spill file are ordered according to a specific order.
#[derive(Debug, Clone)]
pub(crate) struct SpillManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow on PR, I suggest starting to break up this code into multiple modules (like spill/mod.rs, spill/spill_manager.rs, etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #15373

@Kontinuation
Copy link
Member

3. After we have collected 1MB of merged batch, one spill will be triggered. And this 1MB space will be cleared, the merging can continue.
Inefficency: Now ExternalSorter will create a new spill file for those 1MB merged batches, after spilling all intermediates, all spilled files will be merged at once, then there are too many files to merge.
Ideal case: All batches in a single sorted run can be incrementally appended to a single file.

It seems to be a regression introduced by #14823.

@2010YOUY01
Copy link
Contributor Author

  1. After we have collected 1MB of merged batch, one spill will be triggered. And this 1MB space will be cleared, the merging can continue.
    Inefficency: Now ExternalSorter will create a new spill file for those 1MB merged batches, after spilling all intermediates, all spilled files will be merged at once, then there are too many files to merge.
    Ideal case: All batches in a single sorted run can be incrementally appended to a single file.

It seems to be a regression introduced by #14823.

That's true, so I feel obligated to fix it.


Thank you for the review @alamb and @Kontinuation , I have addressed the review comments.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 love tests. I think we need to move other ops like sort_merge_join or row hash to spilled manager?

I'll create a tickets if so

#15401
#15400

@alamb
Copy link
Contributor

alamb commented Mar 24, 2025

  1. After we have collected 1MB of merged batch, one spill will be triggered. And this 1MB space will be cleared, the merging can continue.
    Inefficency: Now ExternalSorter will create a new spill file for those 1MB merged batches, after spilling all intermediates, all spilled files will be merged at once, then there are too many files to merge.
    Ideal case: All batches in a single sorted run can be incrementally appended to a single file.

It seems to be a regression introduced by #14823.

That's true, so I feel obligated to fix it.

@2010YOUY01 is this something that should be tracked with a follow on ticket?

@alamb
Copy link
Contributor

alamb commented Mar 24, 2025

It looks to me like there are 4 approvals of this PR and a bunch of potential work stacked up on it, so let's merge it to keep the code flowing

Thank you everyone for the reviews and work. It is so exciting to see this area of DataFusion get love

@alamb alamb merged commit 0c2aa0c into apache:main Mar 24, 2025
27 checks passed
@2010YOUY01
Copy link
Contributor Author

Thanks @2010YOUY01 love tests. I think we need to move other ops like sort_merge_join or row hash to spilled manager?

I'll create a tickets if so

#15401 #15400

Thank you, I already did it by filing #15374
I'll get to those tasks soon.

@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Mar 25, 2025

  1. After we have collected 1MB of merged batch, one spill will be triggered. And this 1MB space will be cleared, the merging can continue.
    Inefficency: Now ExternalSorter will create a new spill file for those 1MB merged batches, after spilling all intermediates, all spilled files will be merged at once, then there are too many files to merge.
    Ideal case: All batches in a single sorted run can be incrementally appended to a single file.

It seems to be a regression introduced by #14823.

That's true, so I feel obligated to fix it.

@2010YOUY01 is this something that should be tracked with a follow on ticket?

@alamb Ah no, I was referring to the fix in this PR.

qstommyshu pushed a commit to qstommyshu/datafusion that referenced this pull request Mar 27, 2025
nirnayroy pushed a commit to nirnayroy/datafusion that referenced this pull request May 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants