From d700b33e60e9cd61c027892dfe4c88aa770aa73e Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Tue, 31 Oct 2023 13:42:08 +0100 Subject: [PATCH 1/7] allow initiating peer to close stream gracefully I have a usecase where I would like to remove a worker without killing it. Currently, trying to disconnect from the head node will cause the message handler loop to throw a fatal exception, so this adds a check that the connection is still open when trying to read new messages. --- src/process_messages.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/process_messages.jl b/src/process_messages.jl index 3032917..192d497 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -167,7 +167,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) readbytes!(r_stream, boundary, length(MSG_BOUNDARY)) - while true + while !(incoming && eof(r_stream)) reset_state(serializer) header = deserialize_hdr_raw(r_stream) # println("header: ", header) From 7bb2144b5fdce8f5b1ea13558533ed8ee50ededc Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Fri, 6 Sep 2024 14:25:46 +0200 Subject: [PATCH 2/7] add integration tests --- test/persistent_workers.jl | 43 ++++++++++++++++ test/runtests.jl | 1 + test/testhelpers/PersistentWorkers.jl | 70 +++++++++++++++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 test/persistent_workers.jl create mode 100644 test/testhelpers/PersistentWorkers.jl diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl new file mode 100644 index 0000000..cd46c7e --- /dev/null +++ b/test/persistent_workers.jl @@ -0,0 +1,43 @@ +include("testhelpers/PersistentWorkers.jl") +using .PersistentWorkers +using Test +using Random +using Distributed + +@testset "PersistentWorkers.jl" begin + cookie = randstring(16) + port = rand(9128:9999) # TODO: make sure port is available? + worker = run( + `$(Base.julia_exename()) --startup=no --project=$(dirname(@__DIR__)) -L testhelpers/PersistentWorkers.jl + -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"`; + wait=false) + try + cluster_cookie(cookie) + sleep(1) + + p = addprocs(PersistentWorkerManager(port))[] + @test procs() == [1, p] + @test workers() == [p] + @test remotecall_fetch(myid, p) == p + rmprocs(p) + @test procs() == [1] + @test workers() == [1] + @test process_running(worker) + # this shouldn't error + @everywhere 1+1 + + # try the same thing again for the same worker + p = addprocs(PersistentWorkerManager(port))[] + @test procs() == [1, p] + @test workers() == [p] + @test remotecall_fetch(myid, p) == p + rmprocs(p) + @test procs() == [1] + @test workers() == [1] + @test process_running(worker) + # this shouldn't error + @everywhere 1+1 + finally + kill(worker) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index d34d07c..0c94a74 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -12,3 +12,4 @@ if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on end include("managers.jl") +include("persistent_workers.jl") diff --git a/test/testhelpers/PersistentWorkers.jl b/test/testhelpers/PersistentWorkers.jl new file mode 100644 index 0000000..778153f --- /dev/null +++ b/test/testhelpers/PersistentWorkers.jl @@ -0,0 +1,70 @@ +module PersistentWorkers + +using Distributed: Distributed, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED +using Sockets: InetAddr, localhost + +export PersistentWorkerManager, start_worker_loop + +struct PersistentWorkerManager{IP} <: Distributed.ClusterManager + addr::InetAddr{IP} +end + +PersistentWorkerManager(host, port::Integer) = PersistentWorkerManager(InetAddr(host, port)) +PersistentWorkerManager(port::Integer) = PersistentWorkerManager(localhost, port) + +function Distributed.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST}) + (; host, port) = cm.addr + wc = WorkerConfig() + wc.io = nothing + wc.host = string(host) + wc.bind_addr = string(host) + wc.port = Int(port) + push!(launched, wc) + notify(launch_ntfy) + return nothing +end + +function Distributed.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end + +# don't actually kill the worker, just close the streams +function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig) + w = worker_from_id(pid) + close(w.r_stream) + close(w.w_stream) + set_worker_state(w, W_TERMINATED) + return nothing +end + +using Distributed: LPROC, init_worker, process_messages, cluster_cookie +using Sockets: IPAddr, listen, listenany, accept + +function start_worker_loop(host::IPAddr, port::Union{Nothing, Integer}; cluster_cookie=cluster_cookie()) + init_worker(cluster_cookie) + LPROC.bind_addr = string(host) + if port === nothing + port_hint = 9000 + (getpid() % 1000) + port, sock = listenany(host, UInt16(port_hint)) + else + sock = listen(host, port) + end + LPROC.bind_port = port + t = let sock=sock + @async while isopen(sock) + client = accept(sock) + process_messages(client, client, true) + end + end + errormonitor(t) + @info "Listening on $host:$port, cluster_cookie=$cluster_cookie" + return t, host, port +end + +function start_worker_loop((; host, port)::InetAddr; cluster_cookie=cluster_cookie()) + return start_worker_loop(host, port; cluster_cookie) +end + +function start_worker_loop(port::Union{Nothing, Integer}=nothing; cluster_cookie=cluster_cookie()) + return start_worker_loop(localhost, port; cluster_cookie) +end + +end From 35a095d09ed74146ac9e0083c4ec43914e3c0c92 Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Wed, 30 Oct 2024 10:58:31 +0100 Subject: [PATCH 3/7] Distributed -> DistributedNext --- test/persistent_workers.jl | 2 +- test/testhelpers/PersistentWorkers.jl | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl index cd46c7e..30487e4 100644 --- a/test/persistent_workers.jl +++ b/test/persistent_workers.jl @@ -2,7 +2,7 @@ include("testhelpers/PersistentWorkers.jl") using .PersistentWorkers using Test using Random -using Distributed +using DistributedNext @testset "PersistentWorkers.jl" begin cookie = randstring(16) diff --git a/test/testhelpers/PersistentWorkers.jl b/test/testhelpers/PersistentWorkers.jl index 778153f..c79f7bd 100644 --- a/test/testhelpers/PersistentWorkers.jl +++ b/test/testhelpers/PersistentWorkers.jl @@ -1,18 +1,18 @@ module PersistentWorkers -using Distributed: Distributed, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED +using DistributedNext: DistributedNext, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED using Sockets: InetAddr, localhost export PersistentWorkerManager, start_worker_loop -struct PersistentWorkerManager{IP} <: Distributed.ClusterManager +struct PersistentWorkerManager{IP} <: ClusterManager addr::InetAddr{IP} end PersistentWorkerManager(host, port::Integer) = PersistentWorkerManager(InetAddr(host, port)) PersistentWorkerManager(port::Integer) = PersistentWorkerManager(localhost, port) -function Distributed.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST}) +function DistributedNext.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST}) (; host, port) = cm.addr wc = WorkerConfig() wc.io = nothing @@ -24,7 +24,7 @@ function Distributed.launch(cm::PersistentWorkerManager, ::Dict, launched::Array return nothing end -function Distributed.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end +function DistributedNext.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end # don't actually kill the worker, just close the streams function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig) @@ -35,7 +35,7 @@ function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig) return nothing end -using Distributed: LPROC, init_worker, process_messages, cluster_cookie +using DistributedNext: LPROC, init_worker, process_messages, cluster_cookie using Sockets: IPAddr, listen, listenany, accept function start_worker_loop(host::IPAddr, port::Union{Nothing, Integer}; cluster_cookie=cluster_cookie()) From 8a583125236ce17eb033caa6461652309eba5c72 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Thu, 31 Oct 2024 13:13:03 +0100 Subject: [PATCH 4/7] fixup! add integration tests --- test/persistent_workers.jl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl index 30487e4..39c96f3 100644 --- a/test/persistent_workers.jl +++ b/test/persistent_workers.jl @@ -7,10 +7,9 @@ using DistributedNext @testset "PersistentWorkers.jl" begin cookie = randstring(16) port = rand(9128:9999) # TODO: make sure port is available? - worker = run( - `$(Base.julia_exename()) --startup=no --project=$(dirname(@__DIR__)) -L testhelpers/PersistentWorkers.jl - -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"`; - wait=false) + helpers_path = joinpath(@__DIR__, "testhelpers", "PersistentWorkers.jl") + cmd = `$(Base.julia_exename()) --startup=no --project=$(Base.active_project()) -L $(helpers_path) -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"` + worker = run(pipeline(cmd; stdout, stderr); wait=false) try cluster_cookie(cookie) sleep(1) From d3b66554d054b6599d70fd6b6dcc07c576e3b6df Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Thu, 31 Oct 2024 13:13:37 +0100 Subject: [PATCH 5/7] fixup! add integration tests --- test/runtests.jl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 0c94a74..8d00864 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,15 +1,15 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -# Run the distributed test outside of the main driver since it needs its own -# set of dedicated workers. -include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl")) -disttestfile = joinpath(@__DIR__, "distributed_exec.jl") +# # Run the distributed test outside of the main driver since it needs its own +# # set of dedicated workers. +# include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl")) +# disttestfile = joinpath(@__DIR__, "distributed_exec.jl") -cmd = `$test_exename $test_exeflags $disttestfile` +# cmd = `$test_exename $test_exeflags $disttestfile` -if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 - error("Distributed test failed, cmd : $cmd") -end +# if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 +# error("Distributed test failed, cmd : $cmd") +# end -include("managers.jl") +# include("managers.jl") include("persistent_workers.jl") From 9bf52179e96135dff9c54540ed87fa430ec13550 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Thu, 31 Oct 2024 13:18:05 +0100 Subject: [PATCH 6/7] fixup! add integration tests --- test/persistent_workers.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl index 39c96f3..60cb4bc 100644 --- a/test/persistent_workers.jl +++ b/test/persistent_workers.jl @@ -38,5 +38,6 @@ using DistributedNext @everywhere 1+1 finally kill(worker) + wait(worker) end end From 719be616f6b6eff5918716d3eaaf484391d23597 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Thu, 31 Oct 2024 13:21:55 +0100 Subject: [PATCH 7/7] fixup! add integration tests --- test/persistent_workers.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl index 60cb4bc..fd5c058 100644 --- a/test/persistent_workers.jl +++ b/test/persistent_workers.jl @@ -11,8 +11,9 @@ using DistributedNext cmd = `$(Base.julia_exename()) --startup=no --project=$(Base.active_project()) -L $(helpers_path) -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"` worker = run(pipeline(cmd; stdout, stderr); wait=false) try + @show worker.cmd cluster_cookie(cookie) - sleep(1) + sleep(10) p = addprocs(PersistentWorkerManager(port))[] @test procs() == [1, p]