Skip to content

Commit

Permalink
fixup! Add support for worker state callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesWrigley committed Jan 16, 2025
1 parent 8ebd9bf commit a9f5dc8
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ function _run_callbacks_concurrently(callbacks_name, callbacks_dict, warning_int
end

# Wait on the tasks so that exceptions bubble up
wait.(values(callback_tasks))
foreach(wait, values(callback_tasks))
end

function _add_callback(f, key, dict; arg_types=Tuple{Int})
Expand All @@ -928,7 +928,7 @@ _remove_callback(key, dict) = delete!(dict, key)
"""
add_worker_starting_callback(f::Base.Callable; key=nothing)
Register a callback to be called on the master process immediately before new
Register a callback to be called on the master worker immediately before new
workers are started. The callback `f` will be called with the `ClusterManager`
instance that is being used and a dictionary of parameters related to adding
workers, i.e. `f(manager, params)`. The `params` dictionary is specific to the
Expand All @@ -939,10 +939,12 @@ file for their definitions.
!!! warning
Adding workers can fail so it is not guaranteed that the workers requested
will exist.
in `manager` will exist in the future. e.g. if a worker is requested on a
node that is unreachable then the worker-starting callbacks will be called
but the worker will never be added.
The worker-starting callbacks will be executed concurrently. If one throws an
exception it will not be caught and will bubble up through [`addprocs`](@ref).
exception it will not be caught and will be rethrown by [`addprocs`](@ref).
Keep in mind that the callbacks will add to the time taken to launch workers; so
try to either keep the callbacks fast to execute, or do the actual work
Expand All @@ -961,13 +963,13 @@ remove_worker_starting_callback(key) = _remove_callback(key, worker_starting_cal
"""
add_worker_started_callback(f::Base.Callable; key=nothing)
Register a callback to be called on the master process whenever a worker is
added. The callback will be called with the added worker ID,
Register a callback to be called on the master worker whenever a worker has
been added. The callback will be called with the added worker ID,
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback if `key` is
not specified.
The worker-started callbacks will be executed concurrently. If one throws an
exception it will not be caught and will bubble up through [`addprocs()`](@ref).
exception it will not be caught and will be rethrown by [`addprocs()`](@ref).
Keep in mind that the callbacks will add to the time taken to launch workers; so
try to either keep the callbacks fast to execute, or do the actual
Expand All @@ -986,13 +988,13 @@ remove_worker_started_callback(key) = _remove_callback(key, worker_started_callb
"""
add_worker_exiting_callback(f::Base.Callable; key=nothing)
Register a callback to be called on the master process immediately before a
Register a callback to be called on the master worker immediately before a
worker is removed with [`rmprocs()`](@ref). The callback will be called with the
worker ID, e.g. `f(w::Int)`. Chooses and returns a unique key for the callback
if `key` is not specified.
All worker-exiting callbacks will be executed concurrently and if they don't
all finish before the `callback_timeout` passed to `rmprocs()` then the process
all finish before the `callback_timeout` passed to `rmprocs()` then the worker
will be removed anyway.
"""
add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exiting_callbacks)
Expand All @@ -1007,7 +1009,7 @@ remove_worker_exiting_callback(key) = _remove_callback(key, worker_exiting_callb
"""
add_worker_exited_callback(f::Base.Callable; key=nothing)
Register a callback to be called on the master process when a worker has exited
Register a callback to be called on the master worker when a worker has exited
for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
segfaulting etc). Chooses and returns a unique key for the callback if `key` is
not specified.
Expand Down

0 comments on commit a9f5dc8

Please sign in to comment.