Skip to content

Remove waits from blocking threads reading spill files. #15654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2025

Conversation

ashdnazg
Copy link
Contributor

@ashdnazg ashdnazg commented Apr 9, 2025

Which issue does this PR close?

Rationale for this change

The previous design of reading spill files was a push design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a pull design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
spawn_blocking, it's probably insignificant compared to the IO cost of
reading from the disk.

Are these changes tested?

Added a test which causes a deadlock in main but passes with this fix.

Are there any user-facing changes?

No.

@rluvaton
Copy link
Contributor

rluvaton commented Apr 9, 2025

Can you please add a test, this solves a deadlock.

Can you please add the following test to make sure that the read spill does not block:
create a tokio runtime with 8 blocking threads and create 9 read spills and wait for all of them to be available (same as in the SortPreservingMergeStream) current implementation should have a dead lock - a timeout reached, new implementation should not deadlock, the test finish)

@ashdnazg
Copy link
Contributor Author

ashdnazg commented Apr 9, 2025

@rluvaton I added the test, although with 1 blocking thread and not 8. shouldn't be an issue IMO.
The test deadlocks on main but passes here.

@rluvaton
Copy link
Contributor

rluvaton commented Apr 9, 2025

@alamb and @andygrove can you please review? it looks fine by me

@alamb
Copy link
Contributor

alamb commented Apr 9, 2025

FYI @2010YOUY01

@alamb
Copy link
Contributor

alamb commented Apr 9, 2025

Thanks @ashdnazg and @rluvaton -- I started the CI checks

@alamb
Copy link
Contributor

alamb commented Apr 9, 2025

Does anyone know if we have benchmarks for sorting / spilling I could run to verify the impact of this PR on their behavior?

I took a brief look but didn't find any

@rluvaton
Copy link
Contributor

rluvaton commented Apr 9, 2025

Does anyone know if we have benchmarks for sorting / spilling I could run to verify the impact of this PR on their behavior?

I took a brief look but didn't find any

I think you can tweak the TPC benchmark to have less memory so it will spil

@andygrove
Copy link
Member

Does anyone know if we have benchmarks for sorting / spilling I could run to verify the impact of this PR on their behavior?

I can test with Comet today.

@andygrove
Copy link
Member

@Kontinuation fyi

@Kontinuation
Copy link
Member

If I understand this PR correctly, the SpillReaderStream in this PR will read the next batch only when the stream is polled, so the latency of polling a batch is the time spent reading a batch plus some scheduling overhead. The original approach buffers at most 2 batches in each stream. If the batch is already buffered, the latency of polling a batch is the time spent consuming a batch from the mpsc channel, thus hiding the latency of reading files.

@ashdnazg
Copy link
Contributor Author

ashdnazg commented Apr 9, 2025

@Kontinuation indeed!
I originally considered removing the Waiting state, to get rid of the scheduling overhead, or even moving buffering into the stream, but it felt somewhat contrary to the lazy spirit of Futures in Rust.

Will be interesting to see if that's an actual bottleneck.

) -> std::task::Poll<Option<Result<RecordBatch>>> {
match &mut self.state {
SpillReaderStreamState::Uninitialized(_) => {
// Temporarily replace with `Done` to be able to pass the file to the task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pattern for this that could avoid the unreachable might be to change the origianl match to something like

// temporily mark as done:
let state = std::mem::replace(&mut self.state, SpillReaderStreamState::Done);

// Now you can match with an owned state
match state { 
...
}

Copy link
Contributor

@rluvaton rluvaton Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with this is that it is easier to leave that done state, for example the futures::ready! macro does return in it in case of pending so we would not be able to use it and it is prune to errors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fair -- I don't have a strong preference about which pattern to use, I was just mentioning an alternate pattern as a possibility

@alamb
Copy link
Contributor

alamb commented Apr 9, 2025

Does anyone know if we have benchmarks for sorting / spilling I could run to verify the impact of this PR on their behavior?
I took a brief look but didn't find any

I think you can tweak the TPC benchmark to have less memory so it will spil

I also filed a ticket to track adding a spilling benchmark:

@andygrove
Copy link
Member

I created a PR in Comet to use DF from this PR - apache/datafusion-comet#1629

I did not have time to run benchmarks today but hope to tomorrow

@2010YOUY01
Copy link
Contributor

I tried a simple benchmark:

  1. Under datafusion/datafusion-cli, compile and run with 100M memory limit
    cargo run --profile release-nonlto -- --mem-pool-type fair -m 100M
  2. Execute a query triggered out-of-core sorting
    explain analyze select * from generate_series(1, 1000000000) as t1(v1) order by v1;

Result on my MacBook:
main: 35s
PR: 38s

Not quite sure why, I'm trying to understand how those IO interfaces work.

@ashdnazg
Copy link
Contributor Author

ashdnazg commented Apr 10, 2025

@2010YOUY01 I checked your benchmark locally on my linux, Ryzen 7945HX, 3 times on each version and got
main: ~58s
PR: ~57s
which is not much better than noise.
I also checked a version with the buffering done inside the stream using a tokio channel, which should reduce the spawn_blocking overhead, and that one got ~56s.
Again, not a significant difference but the code is significantly more complicated and fragile.
I pushed that version to https://github.com/ashdnazg/datafusion/tree/pull-batch-2. It would be interesting to see its performance on the MacBook.

I do worry that the benchmark might not measure the IO bottleneck accurately due to the OS caching the spill files.

@andygrove
Copy link
Member

I tested this PR with Comet. Here are the most relevant configs for Comet related to this testing:

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --jars $COMET_JAR \
    --driver-class-path $COMET_JAR \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=1 \
    --conf spark.executor.cores=8 \
    --conf spark.cores.max=8 \
    --conf spark.executor.memory=8g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=1g \
    --conf spark.executorEnv.COMET_WORKER_THREADS=32 \
    --conf spark.executorEnv.COMET_MAX_BLOCKING_THREADS=80 \

With Comet main branch, TPC-H q4 never completes due to deadlock. With the changes in this PR, the query completes with good performance.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me and the benchmarks sound good

Thank you @ashdnazg

As noted, we will have to wait for #15653 to merge (I'll do that later today unless anyone else would like a chance to review) and then we'll get this one ready

@ashdnazg
Copy link
Contributor Author

ashdnazg commented Apr 10, 2025

@andygrove any chance you could check Comet's performance with this alternative implementation: https://github.com/ashdnazg/datafusion/tree/pull-batch-2 ?
It attempts to remove the spawn overhead and to make buffering more efficient.

@andygrove
Copy link
Member

@andygrove any chance you could check Comet's performance with this alternative implementation: https://github.com/ashdnazg/datafusion/tree/pull-batch-2 ? It attempts to remove the spawn overhead and to make buffering more efficient.

Yes, I'll do that now.

@andygrove
Copy link
Member

@andygrove any chance you could check Comet's performance with this alternative implementation: https://github.com/ashdnazg/datafusion/tree/pull-batch-2 ? It attempts to remove the spawn overhead and to make buffering more efficient.

I don't think Comet testing is going to help with this. Here are timings for q4 with this PR and the alternate for 5 runs of q4. In both cases there are tasks failing and restarting due to lack of memory.

This PR

        14.834558725357056,
        11.173914194107056,
        11.313692808151245,
        10.791407823562622,
        11.371635913848877

Alternate

        13.932721853256226,
        12.08954644203186,
        11.981270551681519,
        12.231445550918579,
        10.979195594787598

@ashdnazg
Copy link
Contributor Author

ashdnazg commented Apr 10, 2025

Thank you @andygrove!
Seems clear that we should stay with the simpler approach for now.

@github-actions github-actions bot removed the common Related to common crate label Apr 10, 2025
@ashdnazg
Copy link
Contributor Author

Rebased

@alamb
Copy link
Contributor

alamb commented Apr 10, 2025

I'll plan to merge this tomorrow unless anyone else would like more time to review

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really appreciate the nice fix!


/// Stream that reads spill files from disk where each batch is read in a spawned blocking task
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
struct SpillReaderStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct SpillReaderStream {
/// A simpler solution would be spawning a long-running blocking task for each
/// file read (instead of each batch). This approach does not work because when
/// the number of concurrent reads exceeds the Tokio thread pool limit,
/// deadlocks can occur and block progress.
struct SpillReaderStream {

I recommend to add a 'why' comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@alamb alamb mentioned this pull request Apr 11, 2025
39 tasks
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ashdnazg!

@alamb alamb merged commit b6a5174 into apache:main Apr 12, 2025
27 checks passed
@alamb
Copy link
Contributor

alamb commented Apr 12, 2025

🚀

@jayzhan211
Copy link
Contributor

Extended test takes longer time and couldn't finish in 6hr after this change

https://github.com/apache/datafusion/actions/runs/14419458859/job/40440288212

@ashdnazg
Copy link
Contributor Author

@jayzhan211 💩 ☹️
On it.

@2010YOUY01
Copy link
Contributor

Extended test takes longer time and couldn't finish in 6hr after this change

https://github.com/apache/datafusion/actions/runs/14419458859/job/40440288212

I found some memory limit validation tests get stuck from the log: this test

's outer runner got stuck, but inner test completed.

And I am not able to reproduce this issue on my MacBook, it can progress and finish all the tests 🤦🏼

@ashdnazg
Copy link
Contributor Author

I do reproduce it here on ubuntu - when I run the test through the runner it takes much more time (or hangs entirely) than without.

Just to see what happens, I tried to run the test in release mode, it finished very quickly in both cases.

@ashdnazg
Copy link
Contributor Author

Seems to be contention with refresh_all in the memory monitoring task.

PR here: #15702

nirnayroy pushed a commit to nirnayroy/datafusion that referenced this pull request May 2, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce number of tokio blocking threads in SortExec spill
7 participants