-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revise support and worker state callbacks #17
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff! I have some questions about the semantics of these callbacks w.r.t error conditions, but other than that, this all seems solid.
src/cluster.jl
Outdated
new_workers = @lock worker_lock addprocs_locked(manager::ClusterManager; kwargs...) | ||
for worker in new_workers | ||
for callback in values(worker_added_callbacks) | ||
callback(worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If one of these callbacks throws, what should we do? Right now we'll just bail out of addprocs
, but it might make sense to make it a non-fatal error (printed with @error
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I think we should definitely throw the exception so that it's obvious that worker initialization somehow failed, otherwise code that runs later assuming initialization succeeded may cause even more errors. But I did change how we run them in 90f44f6 so that they execute concurrently to not slow down addprocs()
too much and so that we can have warnings about slow callbacks.
src/cluster.jl
Outdated
add_worker_added_callback(f::Base.Callable; key=nothing) | ||
|
||
Register a callback to be called on the master process whenever a worker is | ||
added. The callback will be called with the added worker ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should document the on-error behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, fixed in 90f44f6.
src/cluster.jl
Outdated
""" | ||
remove_worker_added_callback(key) | ||
|
||
Remove the callback for `key`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the callback for `key`. | |
Remove the callback for `key` that was added via `add_worker_added_callback`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 90f44f6.
src/cluster.jl
Outdated
|
||
Register a callback to be called on the master process whenever a worker is | ||
added. The callback will be called with the added worker ID, | ||
e.g. `f(w::Int)`. Returns a unique key for the callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. `f(w::Int)`. Returns a unique key for the callback. | |
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback | |
if `key` is not specified. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 90f44f6.
src/cluster.jl
Outdated
Register a callback to be called on the master process when a worker has exited | ||
for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker | ||
segfaulting etc). The callback will be called with the worker ID, | ||
e.g. `f(w::Int)`. Returns a unique key for the callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. `f(w::Int)`. Returns a unique key for the callback. | |
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback | |
if `key` is not specified. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 90f44f6.
src/cluster.jl
Outdated
""" | ||
remove_worker_exiting_callback(key) | ||
|
||
Remove the callback for `key`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the callback for `key`. | |
Remove the callback for `key` that was added via `add_worker_exiting_callback`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 90f44f6.
src/cluster.jl
Outdated
""" | ||
remove_worker_exited_callback(key) | ||
|
||
Remove the callback for `key`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the callback for `key`. | |
Remove the callback for `key` that was added via `add_worker_exited_callback`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 90f44f6.
src/cluster.jl
Outdated
end | ||
|
||
if timedwait(() -> all(istaskdone.(callback_tasks)), callback_timeout) === :timed_out | ||
@warn "Some callbacks timed out, continuing to remove workers anyway" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@warn "Some callbacks timed out, continuing to remove workers anyway" | |
@warn "Some worker-exiting callbacks have not yet finished, continuing to remove workers anyway" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 90f44f6.
src/cluster.jl
Outdated
# Call callbacks on the master | ||
if myid() == 1 | ||
for callback in values(worker_exited_callbacks) | ||
callback(pid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try/catch -> @error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's better, added in 90f44f6.
One additional thought: should we have a way to detect a worker exit reason? If a worker exits due to a segfault or OOM, this might be important to know so that I can do some recovery actions or reporting to the user. It could also be something we add later, as its own set of callbacks, but I figure it's worth thinking about before merging this. |
Previously we were not filtering out the current worker when calling `deregister_worker()` on `workers()`.
That is an excellent point 🤔 I think it should go with the current set of callbacks, e.g. what if the signature for the worker-exiting callbacks was |
Btw, I think this is really cool stuff @JamesWrigley! @jpsamaroo and I where discussing this in the context advanced scheduling (i.e. your compute might be growing or shrinking) -- or you're on shared nodes where the sysadmin might kill a worker which is leaking memory. I this context I've been building "nanny workers" that will re-add dead workers, etc. But this has been a pain in the ^%#@ I am happy with this PR as is, but fi there are spare cycles, I want propose additional improvements:
|
Thanks for taking a look :) About those things:
|
In general I find it harder to change an API after the fact ;) -- Anyway the situation you're describing is where one worker is responsible for managing the overall workflow, as is common. However this is not always the case at scale (eg. what if we had >10k workers?). In those situations you often lay out your workers on a tree with several "management" nodes (eg. think fat trees, but for workers and not networks). In this situation you want to build callbacks that notify those manager nodes that the leaves have just changed (or are about to change).
Slurm can be configured to send a signal
That's neat! I love it -- it can also help with overall workflow tooling. All-in-all I am happy with this PR as is -- my comments are meant to make somethings that's great even better. |
Alrighty, switched to
On second thoughts I'm unsure about promising logs because I don't know how the Am I correct in thinking that worker statuses and the worker-exited callbacks are sufficient for the uses you're talking about? The way I think about it is that there's three possible scenarios for exits:
|
The new `WorkerState_exterminated` state is for indicating that a worker was killed by something other than us.
Some updates:
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17 +/- ##
==========================================
- Coverage 88.24% 88.11% -0.13%
==========================================
Files 11 12 +1
Lines 2068 2138 +70
==========================================
+ Hits 1825 1884 +59
- Misses 243 254 +11 ☔ View full report in Codecov by Sentry. |
This should fix an exception seen in CI from the lingering timeout task: ``` Test Summary: | Pass Total Time Deserialization error recovery and include() | 11 11 3.9s From worker 4: Unhandled Task ERROR: EOFError: read end of file From worker 4: Stacktrace: From worker 4: [1] wait From worker 4: @ .\asyncevent.jl:159 [inlined] From worker 4: [2] sleep(sec::Float64) From worker 4: @ Base .\asyncevent.jl:265 From worker 4: [3] (::DistributedNext.var"#34#37"{DistributedNext.Worker, Float64})() From worker 4: @ DistributedNext D:\a\DistributedNext.jl\DistributedNext.jl\src\cluster.jl:213 ```
Alrighty, implemented worker statuses in 64aba00. Now they'll be passed to the worker-exited callbacks. Apologies for how big this PR is becoming 😅 I've tried to keep the commits atomic so you can review them one-by-one. |
Hmm, interestingly 0d5aaa3 seems to have almost entirely fixed #6. There are no more timeouts on Linux/OSX and I see only one on Windows. The common problem with these hangs seems to be lingering tasks blocking Julia from exiting. At some point we should probably audit Distributed[Next] to remove all of them. |
(bump) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome changes, I think the newest implementation is really clear! I do have concerns about setstatus/getstatus
, but otherwise everything looks good.
src/cluster.jl
Outdated
end | ||
|
||
# Wait on the tasks so that exceptions bubble up | ||
wait.(values(callback_tasks)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait.(values(callback_tasks)) | |
foreach(wait, values(callback_tasks)) |
Should reduce allocations a bit, maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a9f5dc8.
src/cluster.jl
Outdated
|
||
!!! warning | ||
Adding workers can fail so it is not guaranteed that the workers requested | ||
will exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "will exist" mean here? It feels unclear what the implied behavior is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that if e.g. 2 workers are requested on specific nodes they may not actually exist in the future since adding them may fail if the nodes are unreachable or something. I'll reword that to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a9f5dc8.
src/cluster.jl
Outdated
will exist. | ||
|
||
The worker-starting callbacks will be executed concurrently. If one throws an | ||
exception it will not be caught and will bubble up through [`addprocs`](@ref). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"bubble up" isn't the clearest language - I think we mean to say that any errors will be rethrown in addprocs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed, reworded it in a9f5dc8.
src/cluster.jl
Outdated
not specified. | ||
|
||
The worker-started callbacks will be executed concurrently. If one throws an | ||
exception it will not be caught and will bubble up through [`addprocs()`](@ref). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto to above w.r.t "bubble up"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a9f5dc8.
src/cluster.jl
Outdated
""" | ||
add_worker_started_callback(f::Base.Callable; key=nothing) | ||
|
||
Register a callback to be called on the master process whenever a worker is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Register a callback to be called on the master process whenever a worker is | |
Register a callback to be called on the master process whenever a worker has been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a9f5dc8.
src/cluster.jl
Outdated
if `key` is not specified. | ||
|
||
All worker-exiting callbacks will be executed concurrently and if they don't | ||
all finish before the `callback_timeout` passed to `rmprocs()` then the process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all finish before the `callback_timeout` passed to `rmprocs()` then the process | |
all finish before the `callback_timeout` passed to `rmprocs()` then the worker |
To keep terminology consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I went through all the docstrings and replaced process
with worker
in a9f5dc8.
src/cluster.jl
Outdated
"working on dataset 42" | ||
``` | ||
""" | ||
function setstatus(x, pid::Int=myid()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mutating the cluster-global status so I think a !
is warranted:
function setstatus(x, pid::Int=myid()) | |
function setstatus!(x, pid::Int=myid()) |
etc. for all other calls. Agree or disagree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this is in the same class of function as the IO methods so it's not necessary, but I don't feel very strongly about it and it is more explicit this way. Renamed in 8ebd9bf.
@@ -1041,14 +1209,67 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out. | |||
""" | |||
other_workers() = filter(!=(myid()), workers()) | |||
|
|||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about the composability of this mechanism. If a library wants to set a status for debugging how it interacts with workers, that would conflict with the user or another library trying to do the same thing. Maybe we can also pass a key and track multiple statuses? I've used the calling Module
in the past as a key (usually automated with a macro) when a package-global key is needed for a system like this.
I know my proposal isn't as convenient for worker-exiting callbacks to handle, so I'm open to alternatives, but I do think we'll need to consider the usability of this mechanism before merging it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooo yes that's a very good point 😬 I'm inclined to agree about using the calling module as a key.
Right now the status is passed in explicitly, but what if we leave that up to the user and let them call e.g. @getstatus(2)
/getstatus(2, Dagger)
to get the statuses for the modules they're interested in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JBlaschke any thoughts on this? If not then I think I'll implement per-module statuses.
There's a few changes in here, I would recommend reviewing each commit individually. The big ones are:
Depends on timholy/Revise.jl#871.