From 64aba0095e2178011fe33dc8b5988f0dea701dc5 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Sun, 5 Jan 2025 16:08:09 +0100 Subject: [PATCH] Add support for worker statuses --- docs/src/_changelog.md | 2 ++ docs/src/index.md | 2 ++ src/cluster.jl | 71 +++++++++++++++++++++++++++++++++++++--- test/distributed_exec.jl | 32 +++++++++++++++--- 4 files changed, 97 insertions(+), 10 deletions(-) diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index b297e3e..304a1f0 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -20,6 +20,8 @@ This documents notable changes in DistributedNext.jl. The format is based on exported ([#18]). - Implemented callback support for workers being added/removed etc ([#17]). - Added a package extension to support Revise.jl ([#17]). +- Added support for setting worker statuses with [`setstatus`](@ref) and + [`getstatus`](@ref) ([#17]). ## [v1.0.0] - 2024-12-02 diff --git a/docs/src/index.md b/docs/src/index.md index e0cf6cf..c3cc5b1 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -14,6 +14,8 @@ DistributedNext.rmprocs DistributedNext.interrupt DistributedNext.myid DistributedNext.pmap +DistributedNext.getstatus +DistributedNext.setstatus DistributedNext.RemoteException DistributedNext.ProcessExitedException DistributedNext.Future diff --git a/src/cluster.jl b/src/cluster.jl index 9a29114..b1d9c7a 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -870,6 +870,8 @@ const LPROC = LocalProcess() const LPROCROLE = Ref{Symbol}(:master) const HDR_VERSION_LEN=16 const HDR_COOKIE_LEN=16 +const map_pid_statuses = Dict{Int, Any}() +const map_pid_statuses_lock = ReentrantLock() const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() const map_sock_wrkr = IdDict() const map_del_wrkr = Set{Int}() @@ -1010,15 +1012,16 @@ for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker segfaulting etc). Chooses and returns a unique key for the callback if `key` is not specified. -The callback will be called with the worker ID and the final -`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an +The callback will be called with the worker ID, the final +`Distributed.WorkerState` of the worker, and the last status of the worker as +set by [`setstatus`](@ref), e.g. `f(w::Int, state, status)`. `state` is an enum, a value of `WorkerState_terminated` means a graceful exit and a value of `WorkerState_exterminated` means the worker died unexpectedly. If the callback throws an exception it will be caught and printed. """ add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks; - arg_types=Tuple{Int, WorkerState}) + arg_types=Tuple{Int, WorkerState, Any}) """ remove_worker_exited_callback(key) @@ -1206,6 +1209,59 @@ Identical to [`workers()`](@ref) except that the current worker is filtered out. """ other_workers() = filter(!=(myid()), workers()) +""" + setstatus(x, pid::Int=myid()) + +Set the status for worker `pid` to `x`. `x` may be any serializable object but +it's recommended to keep it small enough to cheaply send over a network. The +status will be passed to the worker-exited callbacks (see +[`add_worker_exited_callback`](@ref)) when the worker exits. + +This can be handy if you want a way to know what a worker is doing at any given +time, or (in combination with a worker-exited callback) for knowing what a +worker was last doing before it died. + +# Examples +```julia-repl +julia> DistributedNext.setstatus("working on dataset 42") +"working on dataset 42" + +julia> DistributedNext.getstatus() +"working on dataset 42" +``` +""" +function setstatus(x, pid::Int=myid()) + if pid ∉ procs() + throw(ArgumentError("Worker $(pid) does not exist, cannot set its status")) + end + + if myid() == 1 + @lock map_pid_statuses_lock map_pid_statuses[pid] = x + else + remotecall_fetch(setstatus, 1, x, myid()) + end +end + +_getstatus(pid) = @lock map_pid_statuses_lock get!(map_pid_statuses, pid, nothing) + +""" + getstatus(pid::Int=myid()) + +Get the status for worker `pid`. If one was never explicitly set with +[`setstatus`](@ref) this will return `nothing`. +""" +function getstatus(pid::Int=myid()) + if pid ∉ procs() + throw(ArgumentError("Worker $(pid) does not exist, cannot get its status")) + end + + if myid() == 1 + _getstatus(pid) + else + remotecall_fetch(getstatus, 1, pid) + end +end + function cluster_mgmt_from_master_check() if myid() != 1 throw(ErrorException("Only process 1 can add and remove workers")) @@ -1425,15 +1481,20 @@ function deregister_worker(pg, pid) end end - # Call callbacks on the master if myid() == 1 + status = _getstatus(pid) + + # Call callbacks on the master for (name, callback) in worker_exited_callbacks try - callback(pid, w.state) + callback(pid, w.state, status) catch ex @error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace()) end end + + # Delete its status + @lock map_pid_statuses_lock delete!(map_pid_statuses, pid) end return diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index a235a16..b15c426 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -3,7 +3,7 @@ import Revise using DistributedNext, Random, Serialization, Sockets import DistributedNext -import DistributedNext: launch, manage +import DistributedNext: launch, manage, getstatus, setstatus @test cluster_cookie() isa String @@ -1940,6 +1940,24 @@ include("splitrange.jl") end end +@testset "Worker statuses" begin + rmprocs(other_workers()) + + # Test with the local worker + @test isnothing(getstatus()) + setstatus("foo") + @test getstatus() == "foo" + @test_throws ArgumentError getstatus(2) + + # Test with a remote worker + pid = only(addprocs(1)) + @test isnothing(getstatus(pid)) + remotecall_wait(setstatus, pid, "bar", pid) + @test remotecall_fetch(getstatus, pid) == "bar" + + rmprocs(pid) +end + @testset "Worker state callbacks" begin rmprocs(other_workers()) @@ -1954,7 +1972,7 @@ end starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager)) started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo"))) exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid)) - exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state))) + exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> push!(exited_workers, (pid, state, status))) # Test that the worker-started exception bubbles up @test_throws TaskFailedException addprocs(1) @@ -1964,7 +1982,7 @@ end @test started_workers == [pid] rmprocs(workers()) @test exiting_workers == [pid] - @test exited_workers == [(pid, DistributedNext.WorkerState_terminated)] + @test exited_workers == [(pid, DistributedNext.WorkerState_terminated, nothing)] # Trying to reset an existing callback should fail @test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key) @@ -1997,16 +2015,20 @@ end @test length(exiting_workers) == 1 @test length(exited_workers) == 1 - # Test that workers that were killed forcefully are detected as such + # Test that workers that were killed forcefully are detected as such, and + # that statuses are passed properly. exit_state = nothing - exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state) + last_status = nothing + exited_key = DistributedNext.add_worker_exited_callback((pid, state, status) -> (exit_state = state; last_status = status)) pid = only(addprocs(1)) + setstatus("foo", pid) redirect_stderr(devnull) do remote_do(exit, pid) timedwait(() -> !isnothing(exit_state), 10) end @test exit_state == DistributedNext.WorkerState_exterminated + @test last_status == "foo" DistributedNext.remove_worker_exited_callback(exited_key) end