-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use queued tasks in adaptive target #8037
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 19 files - 1 19 suites - 1 10h 16m 5s ⏱️ + 1h 6m 50s For more details on these failures, see this check. Results for commit 2da9df9. ± Comparison against base commit 145c13a. This pull request removes 2 and adds 4 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
In case this helps Matt or reviewers: I tried out this PR and it makes my own go-to adaptive example go from scaling smaller than I expect to larger than I expect (because my workers have 4 threads each):
My back-of-the-envelope says this should give me about 42 workers. Before this PR, it gave me 17. After this PR, it gives me 167, which is roughly 42*4, because adaptive doesn't account for worker threads:
But I realize that's a separate problem that shouldn't block this PR. Just adding the context/example in case it helps. |
It looks like |
Thanks. Fixed. |
distributed/scheduler.py
Outdated
if len(self.queued) < 100: | ||
queued_occupancy = 0 | ||
for ts in self.queued: | ||
if ts.prefix.duration_average == -1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope for this PR: I think this is problematic. duration among the same TaskPrefix can vary wildly; I would much rather use a metric that is TaskGroup-specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree. I also suspect that this will be fine in most cases.
distributed/scheduler.py
Outdated
queued_occupancy += self.UNKNOWN_TASK_DURATION | ||
else: | ||
queued_occupancy += ts.prefix.duration_average |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope nit: this screams for encapsulation in a smart duration_average property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is get_task_duration
which also handles the case of user provided estimates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both this PR and main fail (as the tests demonstrate) in the use case where
- there are 0 workers, and
- queuing is disabled (
distributed.scheduler.worker-saturation: .inf
)
In this use case, all tasks will end up in Scheduler.unrunnable
instead of Scheduler.queued
.
Even when queueing is enabled, this fails when
- there are no workers
- the tasks require resources
(I understand that the expectation in an adaptive cluster with resources is that all dynamically-started workers provide the resource, e.g. {"GPU": 1}
).
Again, tasks will end up in Scheduler.unrunnable
.
Please add a test for this use case.
while not s.tasks: | ||
await asyncio.sleep(0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while not s.tasks: | |
await asyncio.sleep(0.001) | |
await async_poll_for(lambda: s.tasks, timeout=5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to use these. I find that this removes one line but at the cost of adding a new abstraction (async_poll_for
). The tradeoff here doesn't seem positive to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using these ubiquitously. I think this is not a design choice that should be left to the whim and taste of the individual developers; if you don't like them we should have a team talk about them which should result in either using them everywhere or removing them completely.
while len(s.tasks) != 200: | ||
await asyncio.sleep(0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while len(s.tasks) != 200: | |
await asyncio.sleep(0.001) | |
await async_poll_for(lambda: len(s.tasks) == 200, timeout=5) |
In short: from itertools import chain
from toolz import peekn
# Note: this relies on HeapSet.__iter__ and set.__iter__ to yield elements in pseudo-random order
queued, _ = peekn(100, chain(self.queued, self.unrunnable))
queued_occupancy = 0
for ts in queued:
if ts.prefix.duration_average == -1:
queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average |
As a heads-up I'm unlikely to spend a bunch of time on this. It's more likely that I ask folks like @fjetter to ask people around him (maybe even @crusaderky ) to pick this up. I'm hopeful that this can be a small fix. I would be mildly sad/surprised if it required a large effort (not that that's what you're saying). |
Never mind. The use of take removes much of the concern about cost here (thanks for the suggestion). I do think that the request around supporting |
Fixes #8035