Skip to content

Reduce number of tokio blocking threads in SortExec spill #15323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Tracked by #15271
andygrove opened this issue Mar 19, 2025 · 11 comments · Fixed by #15654
Closed
Tracked by #15271

Reduce number of tokio blocking threads in SortExec spill #15323

andygrove opened this issue Mar 19, 2025 · 11 comments · Fixed by #15654
Labels
enhancement New feature or request

Comments

@andygrove
Copy link
Member

Is your feature request related to a problem or challenge?

In Comet, we see some queries "hang" when running with minimal memory. The issue appears to be that we have hundreds of spill files and each spill requires its own tokio blocking thread and Comet does not have enough threads available.

See apache/datafusion-comet#1523 (comment) for more detail.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

Do you see too many threads when writing the spill files or when reading?

@andygrove
Copy link
Member Author

Do you see too many threads when writing the spill files or when reading?

This is when reading, during the merge operation.

In merge phase, each spill file will be wrapped by a stream backed by a blocking thread (see read_spill_as_stream), so we'll spawn at least 183 blocking threads when there are 183 spill files to merge spilled data.

@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

Makes sense -- with 183 spill files, we probably would need to merge in stages

For example starting with 183 spill files

  1. run 10 jobs, each merging about 10 files into one (results in 10 files)
  2. run the final merge of 10 files

This results in 2x the IO (have to read / write each row twice) but it would be possible at least to parallelize the merges of the earlier step

I think @2010YOUY01 was starting to look into a SpillFileManager -- this is the kind of behavior I would imagine being part of such a thing

@rluvaton
Copy link
Contributor

rluvaton commented Apr 3, 2025

I think I have the the same problem but in AggregateExec when using row_hash, as it spills as well and use SortPreservingMergeStream.

I think the solution should actually be in SortPreservingMergeStream rather than SpillFileManager no? although it does not spawn blocking threads it should support the multiple levels to merge

@alamb
Copy link
Contributor

alamb commented Apr 3, 2025

I think I have the the same problem but in AggregateExec when using row_hash, as it spills as well and use SortPreservingMergeStream.

I think the solution should actually be in SortPreservingMergeStream rather than SpillFileManager no? although it does not spawn blocking threads it should support the multiple levels to merge

I am not sure / familiar enough with the code to know off the top of my head.

I do think having hash and sort use the same codepath (that we can then go optimize a lot) sounds like a great idea

@rluvaton
Copy link
Contributor

rluvaton commented Apr 6, 2025

I have a working version locally and will create a PR soon.

there is a problem though, tokio don't expose the maximum number of blocking threads, and if you try to call spawn_blocking while there are no available threads, no error will be returned.

this is important as for example Comet set this by default to 10, and tokio default is 512 IIRC.

the working version can be improved with some optimization like prefetch and more, but it will be good enough for now and we can iterate further

@andygrove
Copy link
Member Author

I have a working version locally and will create a PR soon, just one problem, I don't think we can know the number of blocking threads tokio is configured with.

this is important as for example Comet set this by default to 10, and tokio default is 512 IIRC.

the working version can be improved with some optimization like prefetch and more, but it will be good enough for now and we can iterate further

Comet currently creates a new tokio runtime per plan but there is a proposal to move to a global tokio runtime (per executor) instead.

apache/datafusion-comet#1590

@rluvaton
Copy link
Contributor

rluvaton commented Apr 6, 2025

Comet currently creates a new tokio runtime per plan but there is a proposal to move to a global tokio runtime (per executor) instead.

apache/datafusion-comet#1590

even if you use global tokio runtime and set the number of blocking threads to be a 1000 for example, there can be 1001 spill files. the problem is the same

@alamb
Copy link
Contributor

alamb commented Apr 6, 2025

even if you use global tokio runtime and set the number of blocking threads to be a 1000 for example, there can be 1001 spill files. the problem is the same

At some point the system is going to be IO bound so having more blocking threads doing I/O isn't going to help IO and will likely consume non trivial time context switching between them

I think a better solution is to more carefully manage how many files are being spilled / read at any time. This will be more complicated (as we'll likely have to do multiple merge phases, etc) but I think it is a better approach in the long run

@rluvaton
Copy link
Contributor

rluvaton commented Apr 6, 2025

I created a draft PR with a solution, would appreciate your opinion:

ashdnazg added a commit to ashdnazg/datafusion that referenced this issue Apr 9, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
@rluvaton
Copy link
Contributor

rluvaton commented Apr 9, 2025

Removed the PR in favor of @ashdnazg better PR:

ashdnazg added a commit to ashdnazg/datafusion that referenced this issue Apr 9, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
ashdnazg added a commit to ashdnazg/datafusion that referenced this issue Apr 9, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
ashdnazg added a commit to ashdnazg/datafusion that referenced this issue Apr 10, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
ashdnazg added a commit to ashdnazg/datafusion that referenced this issue Apr 11, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
@alamb alamb closed this as completed in b6a5174 Apr 12, 2025
nirnayroy pushed a commit to nirnayroy/datafusion that referenced this issue May 2, 2025
Fixes apache#15323.

The previous design of reading spill files was a `push` design, spawning
long lived blocking tasks which repeatedly read records, send them and
wait until they are received. This design had an issue where progress
wasn't guaranteed (i.e., there was a deadlock) if there were more spill
files than the blocking thread pool in tokio which were all waited for
together.

To solve this, the design is changed to a `pull` design, where blocking
tasks are spawned for every read, removing waiting on the IO threads and
guaranteeing progress.

While there might be an added overhead for repeatedly calling
`spawn_blocking`, it's probably insignificant compared to the IO cost of
reading from the disk.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants