Skip to content

Commit

Permalink
Add support for worker statuses
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesWrigley committed Jan 5, 2025
1 parent e23c490 commit 64aba00
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ This documents notable changes in DistributedNext.jl. The format is based on
exported ([#18]).
- Implemented callback support for workers being added/removed etc ([#17]).
- Added a package extension to support Revise.jl ([#17]).
- Added support for setting worker statuses with [`setstatus`](@ref) and
[`getstatus`](@ref) ([#17]).

## [v1.0.0] - 2024-12-02

Expand Down
2 changes: 2 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ DistributedNext.rmprocs
DistributedNext.interrupt
DistributedNext.myid
DistributedNext.pmap
DistributedNext.getstatus
DistributedNext.setstatus
DistributedNext.RemoteException
DistributedNext.ProcessExitedException
DistributedNext.Future
Expand Down
71 changes: 66 additions & 5 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,8 @@ const LPROC = LocalProcess()
const LPROCROLE = Ref{Symbol}(:master)
const HDR_VERSION_LEN=16
const HDR_COOKIE_LEN=16
const map_pid_statuses = Dict{Int, Any}()
const map_pid_statuses_lock = ReentrantLock()
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
const map_sock_wrkr = IdDict()
const map_del_wrkr = Set{Int}()
Expand Down Expand Up @@ -1010,15 +1012,16 @@ for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
segfaulting etc). Chooses and returns a unique key for the callback if `key` is
not specified.
The callback will be called with the worker ID and the final
`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an
The callback will be called with the worker ID, the final
`Distributed.WorkerState` of the worker, and the last status of the worker as
set by [`setstatus`](@ref), e.g. `f(w::Int, state, status)`. `state` is an
enum, a value of `WorkerState_terminated` means a graceful exit and a value of
`WorkerState_exterminated` means the worker died unexpectedly.
If the callback throws an exception it will be caught and printed.
"""
add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks;
arg_types=Tuple{Int, WorkerState})
arg_types=Tuple{Int, WorkerState, Any})

"""
remove_worker_exited_callback(key)
Expand Down Expand Up @@ -1206,6 +1209,59 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out.
"""
other_workers() = filter(!=(myid()), workers())

"""
setstatus(x, pid::Int=myid())
Set the status for worker `pid` to `x`. `x` may be any serializable object but
it's recommended to keep it small enough to cheaply send over a network. The
status will be passed to the worker-exited callbacks (see
[`add_worker_exited_callback`](@ref)) when the worker exits.
This can be handy if you want a way to know what a worker is doing at any given
time, or (in combination with a worker-exited callback) for knowing what a
worker was last doing before it died.
# Examples
```julia-repl
julia> DistributedNext.setstatus("working on dataset 42")
"working on dataset 42"
julia> DistributedNext.getstatus()
"working on dataset 42"
```
"""
function setstatus(x, pid::Int=myid())
if pid procs()
throw(ArgumentError("Worker $(pid) does not exist, cannot set its status"))
end

if myid() == 1
@lock map_pid_statuses_lock map_pid_statuses[pid] = x
else
remotecall_fetch(setstatus, 1, x, myid())
end
end

_getstatus(pid) = @lock map_pid_statuses_lock get!(map_pid_statuses, pid, nothing)

"""
getstatus(pid::Int=myid())
Get the status for worker `pid`. If one was never explicitly set with
[`setstatus`](@ref) this will return `nothing`.
"""
function getstatus(pid::Int=myid())
if pid procs()
throw(ArgumentError("Worker $(pid) does not exist, cannot get its status"))
end

if myid() == 1
_getstatus(pid)
else
remotecall_fetch(getstatus, 1, pid)
end
end

function cluster_mgmt_from_master_check()
if myid() != 1
throw(ErrorException("Only process 1 can add and remove workers"))
Expand Down Expand Up @@ -1425,15 +1481,20 @@ function deregister_worker(pg, pid)
end
end

# Call callbacks on the master
if myid() == 1
status = _getstatus(pid)

# Call callbacks on the master
for (name, callback) in worker_exited_callbacks
try
callback(pid, w.state)
callback(pid, w.state, status)
catch ex
@error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace())
end
end

# Delete its status
@lock map_pid_statuses_lock delete!(map_pid_statuses, pid)
end

return
Expand Down
32 changes: 27 additions & 5 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import Revise
using DistributedNext, Random, Serialization, Sockets
import DistributedNext
import DistributedNext: launch, manage
import DistributedNext: launch, manage, getstatus, setstatus


@test cluster_cookie() isa String
Expand Down Expand Up @@ -1940,6 +1940,24 @@ include("splitrange.jl")
end
end

@testset "Worker statuses" begin
rmprocs(other_workers())

# Test with the local worker
@test isnothing(getstatus())
setstatus("foo")
@test getstatus() == "foo"
@test_throws ArgumentError getstatus(2)

# Test with a remote worker
pid = only(addprocs(1))
@test isnothing(getstatus(pid))
remotecall_wait(setstatus, pid, "bar", pid)
@test remotecall_fetch(getstatus, pid) == "bar"

rmprocs(pid)
end

@testset "Worker state callbacks" begin
rmprocs(other_workers())

Expand All @@ -1954,7 +1972,7 @@ end
starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager))
started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo")))
exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid))
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state)))
exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> push!(exited_workers, (pid, state, status)))

# Test that the worker-started exception bubbles up
@test_throws TaskFailedException addprocs(1)
Expand All @@ -1964,7 +1982,7 @@ end
@test started_workers == [pid]
rmprocs(workers())
@test exiting_workers == [pid]
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated)]
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated, nothing)]

# Trying to reset an existing callback should fail
@test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key)
Expand Down Expand Up @@ -1997,16 +2015,20 @@ end
@test length(exiting_workers) == 1
@test length(exited_workers) == 1

# Test that workers that were killed forcefully are detected as such
# Test that workers that were killed forcefully are detected as such, and
# that statuses are passed properly.
exit_state = nothing
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state)
last_status = nothing
exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> (exit_state = state; last_status = status))
pid = only(addprocs(1))
setstatus("foo", pid)

redirect_stderr(devnull) do
remote_do(exit, pid)
timedwait(() -> !isnothing(exit_state), 10)
end
@test exit_state == DistributedNext.WorkerState_exterminated
@test last_status == "foo"
DistributedNext.remove_worker_exited_callback(exited_key)
end

Expand Down

0 comments on commit 64aba00

Please sign in to comment.