-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Remove waits from blocking threads reading spill files. #15654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can you please add a test, this solves a deadlock. Can you please add the following test to make sure that the read spill does not block: |
@rluvaton I added the test, although with 1 blocking thread and not 8. shouldn't be an issue IMO. |
@alamb and @andygrove can you please review? it looks fine by me |
FYI @2010YOUY01 |
Does anyone know if we have benchmarks for sorting / spilling I could run to verify the impact of this PR on their behavior? I took a brief look but didn't find any |
I think you can tweak the TPC benchmark to have less memory so it will spil |
I can test with Comet today. |
@Kontinuation fyi |
If I understand this PR correctly, the |
@Kontinuation indeed! Will be interesting to see if that's an actual bottleneck. |
) -> std::task::Poll<Option<Result<RecordBatch>>> { | ||
match &mut self.state { | ||
SpillReaderStreamState::Uninitialized(_) => { | ||
// Temporarily replace with `Done` to be able to pass the file to the task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another pattern for this that could avoid the unreachable
might be to change the origianl match to something like
// temporily mark as done:
let state = std::mem::replace(&mut self.state, SpillReaderStreamState::Done);
// Now you can match with an owned state
match state {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem with this is that it is easier to leave that done state, for example the futures::ready!
macro does return
in it in case of pending so we would not be able to use it and it is prune to errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is fair -- I don't have a strong preference about which pattern to use, I was just mentioning an alternate pattern as a possibility
I also filed a ticket to track adding a spilling benchmark: |
I created a PR in Comet to use DF from this PR - apache/datafusion-comet#1629 I did not have time to run benchmarks today but hope to tomorrow |
I tried a simple benchmark:
Result on my MacBook: Not quite sure why, I'm trying to understand how those IO interfaces work. |
@2010YOUY01 I checked your benchmark locally on my linux, Ryzen 7945HX, 3 times on each version and got I do worry that the benchmark might not measure the IO bottleneck accurately due to the OS caching the spill files. |
I tested this PR with Comet. Here are the most relevant configs for Comet related to this testing:
With Comet main branch, TPC-H q4 never completes due to deadlock. With the changes in this PR, the query completes with good performance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andygrove any chance you could check Comet's performance with this alternative implementation: https://github.com/ashdnazg/datafusion/tree/pull-batch-2 ? |
Yes, I'll do that now. |
I don't think Comet testing is going to help with this. Here are timings for q4 with this PR and the alternate for 5 runs of q4. In both cases there are tasks failing and restarting due to lack of memory. This PR
Alternate
|
Thank you @andygrove! |
Rebased |
I'll plan to merge this tomorrow unless anyone else would like more time to review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really appreciate the nice fix!
|
||
/// Stream that reads spill files from disk where each batch is read in a spawned blocking task | ||
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`] | ||
struct SpillReaderStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct SpillReaderStream { | |
/// A simpler solution would be spawning a long-running blocking task for each | |
/// file read (instead of each batch). This approach does not work because when | |
/// the number of concurrent reads exceeds the Tokio thread pool limit, | |
/// deadlocks can occur and block progress. | |
struct SpillReaderStream { |
I recommend to add a 'why' comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strongly agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Fixes apache#15323. The previous design of reading spill files was a `push` design, spawning long lived blocking tasks which repeatedly read records, send them and wait until they are received. This design had an issue where progress wasn't guaranteed (i.e., there was a deadlock) if there were more spill files than the blocking thread pool in tokio which were all waited for together. To solve this, the design is changed to a `pull` design, where blocking tasks are spawned for every read, removing waiting on the IO threads and guaranteeing progress. While there might be an added overhead for repeatedly calling `spawn_blocking`, it's probably insignificant compared to the IO cost of reading from the disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ashdnazg!
🚀 |
Extended test takes longer time and couldn't finish in 6hr after this change https://github.com/apache/datafusion/actions/runs/14419458859/job/40440288212 |
@jayzhan211 💩 |
I found some memory limit validation tests get stuck from the log: this test datafusion/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs Line 48 in 61e8a5d
And I am not able to reproduce this issue on my MacBook, it can progress and finish all the tests 🤦🏼 |
I do reproduce it here on ubuntu - when I run the test through the runner it takes much more time (or hangs entirely) than without. Just to see what happens, I tried to run the test in release mode, it finished very quickly in both cases. |
Seems to be contention with PR here: #15702 |
Fixes apache#15323. The previous design of reading spill files was a `push` design, spawning long lived blocking tasks which repeatedly read records, send them and wait until they are received. This design had an issue where progress wasn't guaranteed (i.e., there was a deadlock) if there were more spill files than the blocking thread pool in tokio which were all waited for together. To solve this, the design is changed to a `pull` design, where blocking tasks are spawned for every read, removing waiting on the IO threads and guaranteeing progress. While there might be an added overhead for repeatedly calling `spawn_blocking`, it's probably insignificant compared to the IO cost of reading from the disk.
Which issue does this PR close?
Rationale for this change
The previous design of reading spill files was a
push
design, spawninglong lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.
To solve this, the design is changed to a
pull
design, where blockingtasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.
While there might be an added overhead for repeatedly calling
spawn_blocking
, it's probably insignificant compared to the IO cost ofreading from the disk.
Are these changes tested?
Added a test which causes a deadlock in
main
but passes with this fix.Are there any user-facing changes?
No.