Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise support and worker state callbacks #17

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open

Conversation

JamesWrigley
Copy link
Collaborator

There's a few changes in here, I would recommend reviewing each commit individually. The big ones are:

  • Support for worker added/exiting/exited callbacks.
  • Revise support (requires callbacks to implement it properly).

Depends on timholy/Revise.jl#871.

@JamesWrigley JamesWrigley self-assigned this Dec 10, 2024
Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff! I have some questions about the semantics of these callbacks w.r.t error conditions, but other than that, this all seems solid.

src/cluster.jl Outdated Show resolved Hide resolved
src/cluster.jl Outdated
new_workers = @lock worker_lock addprocs_locked(manager::ClusterManager; kwargs...)
for worker in new_workers
for callback in values(worker_added_callbacks)
callback(worker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one of these callbacks throws, what should we do? Right now we'll just bail out of addprocs, but it might make sense to make it a non-fatal error (printed with @error)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think we should definitely throw the exception so that it's obvious that worker initialization somehow failed, otherwise code that runs later assuming initialization succeeded may cause even more errors. But I did change how we run them in 90f44f6 so that they execute concurrently to not slow down addprocs() too much and so that we can have warnings about slow callbacks.

src/cluster.jl Outdated
add_worker_added_callback(f::Base.Callable; key=nothing)

Register a callback to be called on the master process whenever a worker is
added. The callback will be called with the added worker ID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document the on-error behavior here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed in 90f44f6.

src/cluster.jl Outdated
"""
remove_worker_added_callback(key)

Remove the callback for `key`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Remove the callback for `key`.
Remove the callback for `key` that was added via `add_worker_added_callback`.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated

Register a callback to be called on the master process whenever a worker is
added. The callback will be called with the added worker ID,
e.g. `f(w::Int)`. Returns a unique key for the callback.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
e.g. `f(w::Int)`. Returns a unique key for the callback.
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback
if `key` is not specified.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
Register a callback to be called on the master process when a worker has exited
for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
segfaulting etc). The callback will be called with the worker ID,
e.g. `f(w::Int)`. Returns a unique key for the callback.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
e.g. `f(w::Int)`. Returns a unique key for the callback.
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback
if `key` is not specified.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
"""
remove_worker_exiting_callback(key)

Remove the callback for `key`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Remove the callback for `key`.
Remove the callback for `key` that was added via `add_worker_exiting_callback`.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
"""
remove_worker_exited_callback(key)

Remove the callback for `key`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Remove the callback for `key`.
Remove the callback for `key` that was added via `add_worker_exited_callback`.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
end

if timedwait(() -> all(istaskdone.(callback_tasks)), callback_timeout) === :timed_out
@warn "Some callbacks timed out, continuing to remove workers anyway"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@warn "Some callbacks timed out, continuing to remove workers anyway"
@warn "Some worker-exiting callbacks have not yet finished, continuing to remove workers anyway"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 90f44f6.

src/cluster.jl Outdated
# Call callbacks on the master
if myid() == 1
for callback in values(worker_exited_callbacks)
callback(pid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try/catch -> @error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's better, added in 90f44f6.

@jpsamaroo jpsamaroo mentioned this pull request Dec 12, 2024
8 tasks
@jpsamaroo
Copy link
Member

One additional thought: should we have a way to detect a worker exit reason? If a worker exits due to a segfault or OOM, this might be important to know so that I can do some recovery actions or reporting to the user. It could also be something we add later, as its own set of callbacks, but I figure it's worth thinking about before merging this.

Previously we were not filtering out the current worker when calling
`deregister_worker()` on `workers()`.
@JamesWrigley
Copy link
Collaborator Author

One additional thought: should we have a way to detect a worker exit reason? If a worker exits due to a segfault or OOM, this might be important to know so that I can do some recovery actions or reporting to the user. It could also be something we add later, as its own set of callbacks, but I figure it's worth thinking about before merging this.

That is an excellent point 🤔 I think it should go with the current set of callbacks, e.g. what if the signature for the worker-exiting callbacks was f(w::Int, reason::ExitReasonEnum)?

@JBlaschke
Copy link

JBlaschke commented Dec 23, 2024

Btw, I think this is really cool stuff @JamesWrigley!

@jpsamaroo and I where discussing this in the context advanced scheduling (i.e. your compute might be growing or shrinking) -- or you're on shared nodes where the sysadmin might kill a worker which is leaking memory. I this context I've been building "nanny workers" that will re-add dead workers, etc. But this has been a pain in the ^%#@

I am happy with this PR as is, but fi there are spare cycles, I want propose additional improvements:

  1. Instead of the {added, exiting, exited} states, I propose using {starting, started, exiting, exited}. I.e follow the semantics of event => conclusion. So request was made to add a worker (starting) => the worker is ready for work (started); and request was made to shut down a worker (exiting) => worker is gone (exited). This brings the API in line with resource managers like Slurm (which have distinct "I've started doing something" and "I'm done doing that thing" phases).
  2. Is there a way to more effectively capture the state of an exiting worker? I.e. capturing the exit code is a step in the right direction, but perhaps we also want to capture a little more context that will help an application automatically recover from an unexpected failure (e.g. "working on dataset 23", or /path/to/coredump). I.e. have the user be able to define additional optional input args to some of the callbacks?

@JamesWrigley
Copy link
Collaborator Author

Thanks for taking a look :)

About those things:

  1. I don't have any objection to adding a started callback, but what would it be used for? I can't think of a situation where a library that isn't calling addprocs() itself (in which case it would already know which workers are being started) would need to know about a worker it can't connect to yet.

  2. In the case of the worker being killed gracefully I think this could be implemented already by an exiting callback to extract some status information from the worker. But if it died from e.g. a segfault/OOM it gets a bit more complicated 🤔 Checking for a coredump would involve adding another worker on the same node (which might not even be possible if its slurm allocation is expired), and that's a bit more side-effects than I'm comfortable with. Maybe we could pass the last ~100 lines of the logs to the callback?

    Alternatively, we could support some kind of per-worker status that a worker can set for itself (e.g. setmystatus("working on dataset 23")), and pass the last status to the exited callback?

@JBlaschke
Copy link

  • I don't have any objection to adding a started callback, but what would it be used for? I can't think of a situation where a library that isn't calling addprocs() itself (in which case it would already know which workers are being started) would need to know about a worker it can't connect to yet.

In general I find it harder to change an API after the fact ;) -- Anyway the situation you're describing is where one worker is responsible for managing the overall workflow, as is common. However this is not always the case at scale (eg. what if we had >10k workers?). In those situations you often lay out your workers on a tree with several "management" nodes (eg. think fat trees, but for workers and not networks). In this situation you want to build callbacks that notify those manager nodes that the leaves have just changed (or are about to change).

  • In the case of the worker being killed gracefully I think this could be implemented already by an exiting callback to extract some status information from the worker. But if it died from e.g. a segfault/OOM it gets a bit more complicated 🤔 Checking for a coredump would involve adding another worker on the same node (which might not even be possible if its slurm allocation is expired), and that's a bit more side-effects than I'm comfortable with. Maybe we could pass the last ~100 lines of the logs to the callback?

Slurm can be configured to send a signal N seconds before killing a process. We also have the ability to add hooks to Slurm jobs upon process startup/completion. Oh, and coredumps don't have to land in /tmp -- I recommend $SCRATCH for these things at NERSC. Things like passing back part of the Slurm log is definitely in the right direction. I was thinking of something that the users and sysadmins can configure for a given library / application though.

Alternatively, we could support some kind of per-worker status that a worker can set for itself (e.g. setmystatus("working on dataset 23")), and pass the last status to the exited callback?

That's neat! I love it -- it can also help with overall workflow tooling.

All-in-all I am happy with this PR as is -- my comments are meant to make somethings that's great even better.

@JamesWrigley
Copy link
Collaborator Author

Alrighty, switched to {starting, started, exiting, exited} callbacks in 8b04241 / 21da9fc.

Slurm can be configured to send a signal N seconds before killing a process. We also have the ability to add hooks to Slurm jobs upon process startup/completion. Oh, and coredumps don't have to land in /tmp -- I recommend $SCRATCH for these things at NERSC. Things like passing back part of the Slurm log is definitely in the right direction. I was thinking of something that the users and sysadmins can configure for a given library / application though.

On second thoughts I'm unsure about promising logs because I don't know how the RemoteLogger's thing will pan out: JuliaLang/Distributed.jl#94. We can always promise to send the stdout of the worker because we're guaranteed to get that, but it might not have any useful information.

Am I correct in thinking that worker statuses and the worker-exited callbacks are sufficient for the uses you're talking about? The way I think about it is that there's three possible scenarios for exits:

  1. Worker gets a request to exit (e.g. by slurm signal): it sets its status and then remotecalls the master to rmproc() itself. The worker-exited callback is called like f(pid, last_status, ExitKind_graceful).
  2. Worker is removed by the master: it may get a chance to set a final status if there's a worker-exiting callback that will tell it do so, either way the worker-exited callback gets called like f(pid, last_status, ExitKind_graceful).
  3. Worker dies suddenly (segfault/OOM etc): the worker-exited callback gets called like f(pid, last_status, ExitKind_forceful) and it can look for coredumps etc.

The new `WorkerState_exterminated` state is for indicating that a worker was
killed by something other than us.
@JamesWrigley
Copy link
Collaborator Author

Some updates:

  • Implement an interface for Distributed-like libraries timholy/Revise.jl#871 is merged and released now, but with a new minimum Julia version of 1.10 so I bumped the minimum Julia version for DistributedNext too in 914e50c.
  • I took the liberty of rebasing since the commit history was getting a bit long.
  • The worker-exited callbacks now get the worker state as well as their ID, which will tell them if the worker exited gracefully or not. As part of this I renamed the WorkerState instances and added a new state for forceful exits (WorkerState_exterminated) in d5fd837. The next thing to do is add support for worker statuses and pass those to the worker-exited callbacks, and then I think this will be ready to merge.

@codecov-commenter
Copy link

codecov-commenter commented Jan 2, 2025

Codecov Report

Attention: Patch coverage is 95.37037% with 5 lines in your changes missing coverage. Please review.

Project coverage is 88.11%. Comparing base (0cca4d3) to head (ae3225e).

Files with missing lines Patch % Lines
src/cluster.jl 94.68% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master      #17      +/-   ##
==========================================
- Coverage   88.24%   88.11%   -0.13%     
==========================================
  Files          11       12       +1     
  Lines        2068     2138      +70     
==========================================
+ Hits         1825     1884      +59     
- Misses        243      254      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

This should fix an exception seen in CI from the lingering timeout task:
```
 Test Summary:                                | Pass  Total  Time
Deserialization error recovery and include() |   11     11  3.9s
      From worker 4:	Unhandled Task ERROR: EOFError: read end of file
      From worker 4:	Stacktrace:
      From worker 4:	 [1] wait
      From worker 4:	   @ .\asyncevent.jl:159 [inlined]
      From worker 4:	 [2] sleep(sec::Float64)
      From worker 4:	   @ Base .\asyncevent.jl:265
      From worker 4:	 [3] (::DistributedNext.var"#34#37"{DistributedNext.Worker, Float64})()
      From worker 4:	   @ DistributedNext D:\a\DistributedNext.jl\DistributedNext.jl\src\cluster.jl:213
```
@JamesWrigley
Copy link
Collaborator Author

Alrighty, implemented worker statuses in 64aba00. Now they'll be passed to the worker-exited callbacks.

Apologies for how big this PR is becoming 😅 I've tried to keep the commits atomic so you can review them one-by-one.

@JamesWrigley
Copy link
Collaborator Author

Hmm, interestingly 0d5aaa3 seems to have almost entirely fixed #6. There are no more timeouts on Linux/OSX and I see only one on Windows.

The common problem with these hangs seems to be lingering tasks blocking Julia from exiting. At some point we should probably audit Distributed[Next] to remove all of them.

@JamesWrigley
Copy link
Collaborator Author

(bump)

Copy link
Member

@jpsamaroo jpsamaroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome changes, I think the newest implementation is really clear! I do have concerns about setstatus/getstatus, but otherwise everything looks good.

src/cluster.jl Outdated
end

# Wait on the tasks so that exceptions bubble up
wait.(values(callback_tasks))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
wait.(values(callback_tasks))
foreach(wait, values(callback_tasks))

Should reduce allocations a bit, maybe

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a9f5dc8.

src/cluster.jl Outdated

!!! warning
Adding workers can fail so it is not guaranteed that the workers requested
will exist.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "will exist" mean here? It feels unclear what the implied behavior is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that if e.g. 2 workers are requested on specific nodes they may not actually exist in the future since adding them may fail if the nodes are unreachable or something. I'll reword that to make it more clear.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a9f5dc8.

src/cluster.jl Outdated
will exist.

The worker-starting callbacks will be executed concurrently. If one throws an
exception it will not be caught and will bubble up through [`addprocs`](@ref).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"bubble up" isn't the clearest language - I think we mean to say that any errors will be rethrown in addprocs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed, reworded it in a9f5dc8.

src/cluster.jl Outdated
not specified.

The worker-started callbacks will be executed concurrently. If one throws an
exception it will not be caught and will bubble up through [`addprocs()`](@ref).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto to above w.r.t "bubble up"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a9f5dc8.

src/cluster.jl Outdated
"""
add_worker_started_callback(f::Base.Callable; key=nothing)

Register a callback to be called on the master process whenever a worker is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Register a callback to be called on the master process whenever a worker is
Register a callback to be called on the master process whenever a worker has been

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a9f5dc8.

src/cluster.jl Outdated
if `key` is not specified.

All worker-exiting callbacks will be executed concurrently and if they don't
all finish before the `callback_timeout` passed to `rmprocs()` then the process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
all finish before the `callback_timeout` passed to `rmprocs()` then the process
all finish before the `callback_timeout` passed to `rmprocs()` then the worker

To keep terminology consistent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I went through all the docstrings and replaced process with worker in a9f5dc8.

src/cluster.jl Outdated
"working on dataset 42"
```
"""
function setstatus(x, pid::Int=myid())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mutating the cluster-global status so I think a ! is warranted:

Suggested change
function setstatus(x, pid::Int=myid())
function setstatus!(x, pid::Int=myid())

etc. for all other calls. Agree or disagree?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this is in the same class of function as the IO methods so it's not necessary, but I don't feel very strongly about it and it is more explicit this way. Renamed in 8ebd9bf.

@@ -1041,14 +1209,67 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out.
"""
other_workers() = filter(!=(myid()), workers())

"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the composability of this mechanism. If a library wants to set a status for debugging how it interacts with workers, that would conflict with the user or another library trying to do the same thing. Maybe we can also pass a key and track multiple statuses? I've used the calling Module in the past as a key (usually automated with a macro) when a package-global key is needed for a system like this.

I know my proposal isn't as convenient for worker-exiting callbacks to handle, so I'm open to alternatives, but I do think we'll need to consider the usability of this mechanism before merging it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooo yes that's a very good point 😬 I'm inclined to agree about using the calling module as a key.
Right now the status is passed in explicitly, but what if we leave that up to the user and let them call e.g. @getstatus(2)/getstatus(2, Dagger) to get the statuses for the modules they're interested in?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JBlaschke any thoughts on this? If not then I think I'll implement per-module statuses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants