From fd96d91fec62f384893af50972262c9481455449 Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Tue, 31 Oct 2023 13:42:08 +0100 Subject: [PATCH 1/3] allow initiating peer to close stream gracefully I have a usecase where I would like to remove a worker without killing it. Currently, trying to disconnect from the head node will cause the message handler loop to throw a fatal exception, so this adds a check that the connection is still open when trying to read new messages. --- src/process_messages.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/process_messages.jl b/src/process_messages.jl index 3032917..192d497 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -167,7 +167,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) readbytes!(r_stream, boundary, length(MSG_BOUNDARY)) - while true + while !(incoming && eof(r_stream)) reset_state(serializer) header = deserialize_hdr_raw(r_stream) # println("header: ", header) From 3a4bdf44b80524503d66c9c82c7ef62c72bb5e2b Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Fri, 6 Sep 2024 14:25:46 +0200 Subject: [PATCH 2/3] add integration tests --- test/persistent_workers.jl | 43 ++++++++++++++++ test/runtests.jl | 1 + test/testhelpers/PersistentWorkers.jl | 70 +++++++++++++++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 test/persistent_workers.jl create mode 100644 test/testhelpers/PersistentWorkers.jl diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl new file mode 100644 index 0000000..cd46c7e --- /dev/null +++ b/test/persistent_workers.jl @@ -0,0 +1,43 @@ +include("testhelpers/PersistentWorkers.jl") +using .PersistentWorkers +using Test +using Random +using Distributed + +@testset "PersistentWorkers.jl" begin + cookie = randstring(16) + port = rand(9128:9999) # TODO: make sure port is available? + worker = run( + `$(Base.julia_exename()) --startup=no --project=$(dirname(@__DIR__)) -L testhelpers/PersistentWorkers.jl + -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"`; + wait=false) + try + cluster_cookie(cookie) + sleep(1) + + p = addprocs(PersistentWorkerManager(port))[] + @test procs() == [1, p] + @test workers() == [p] + @test remotecall_fetch(myid, p) == p + rmprocs(p) + @test procs() == [1] + @test workers() == [1] + @test process_running(worker) + # this shouldn't error + @everywhere 1+1 + + # try the same thing again for the same worker + p = addprocs(PersistentWorkerManager(port))[] + @test procs() == [1, p] + @test workers() == [p] + @test remotecall_fetch(myid, p) == p + rmprocs(p) + @test procs() == [1] + @test workers() == [1] + @test process_running(worker) + # this shouldn't error + @everywhere 1+1 + finally + kill(worker) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index d34d07c..0c94a74 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -12,3 +12,4 @@ if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on end include("managers.jl") +include("persistent_workers.jl") diff --git a/test/testhelpers/PersistentWorkers.jl b/test/testhelpers/PersistentWorkers.jl new file mode 100644 index 0000000..778153f --- /dev/null +++ b/test/testhelpers/PersistentWorkers.jl @@ -0,0 +1,70 @@ +module PersistentWorkers + +using Distributed: Distributed, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED +using Sockets: InetAddr, localhost + +export PersistentWorkerManager, start_worker_loop + +struct PersistentWorkerManager{IP} <: Distributed.ClusterManager + addr::InetAddr{IP} +end + +PersistentWorkerManager(host, port::Integer) = PersistentWorkerManager(InetAddr(host, port)) +PersistentWorkerManager(port::Integer) = PersistentWorkerManager(localhost, port) + +function Distributed.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST}) + (; host, port) = cm.addr + wc = WorkerConfig() + wc.io = nothing + wc.host = string(host) + wc.bind_addr = string(host) + wc.port = Int(port) + push!(launched, wc) + notify(launch_ntfy) + return nothing +end + +function Distributed.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end + +# don't actually kill the worker, just close the streams +function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig) + w = worker_from_id(pid) + close(w.r_stream) + close(w.w_stream) + set_worker_state(w, W_TERMINATED) + return nothing +end + +using Distributed: LPROC, init_worker, process_messages, cluster_cookie +using Sockets: IPAddr, listen, listenany, accept + +function start_worker_loop(host::IPAddr, port::Union{Nothing, Integer}; cluster_cookie=cluster_cookie()) + init_worker(cluster_cookie) + LPROC.bind_addr = string(host) + if port === nothing + port_hint = 9000 + (getpid() % 1000) + port, sock = listenany(host, UInt16(port_hint)) + else + sock = listen(host, port) + end + LPROC.bind_port = port + t = let sock=sock + @async while isopen(sock) + client = accept(sock) + process_messages(client, client, true) + end + end + errormonitor(t) + @info "Listening on $host:$port, cluster_cookie=$cluster_cookie" + return t, host, port +end + +function start_worker_loop((; host, port)::InetAddr; cluster_cookie=cluster_cookie()) + return start_worker_loop(host, port; cluster_cookie) +end + +function start_worker_loop(port::Union{Nothing, Integer}=nothing; cluster_cookie=cluster_cookie()) + return start_worker_loop(localhost, port; cluster_cookie) +end + +end From dcc7da82dfda5d9dc7fe6ee057c631f9c6bbdbf1 Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Fri, 6 Sep 2024 14:45:37 +0200 Subject: [PATCH 3/3] try whether giving the process more time fixes this --- test/persistent_workers.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl index cd46c7e..906e4c2 100644 --- a/test/persistent_workers.jl +++ b/test/persistent_workers.jl @@ -13,7 +13,7 @@ using Distributed wait=false) try cluster_cookie(cookie) - sleep(1) + sleep(10) p = addprocs(PersistentWorkerManager(port))[] @test procs() == [1, p]