Skip to content

Commit 962583e

Browse files
authored
fix rmprocs/addprocs locking issue. (JuliaLang#19716)
1 parent 07b7a7f commit 962583e

File tree

4 files changed

+82
-31
lines changed

4 files changed

+82
-31
lines changed

NEWS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ This section lists changes that do not have deprecation warnings.
6060
* The array-scalar operations `div`, `mod`, `rem`, `&`, `|`, `xor`, `/`, `\`, `*`, `+`, and `-`
6161
now follow broadcast promotion rules ([#19692]).
6262

63+
* `rmprocs` now throws an exception if requested workers have not been completely
64+
removed before `waitfor` seconds. With a `waitfor=0`, `rmprocs` returns immediately
65+
without waiting for worker exits.
66+
6367
Library improvements
6468
--------------------
6569

base/multi.jl

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -505,52 +505,72 @@ function workers()
505505
end
506506

507507
"""
508-
rmprocs(pids...; waitfor=0.0)
508+
rmprocs(pids...; waitfor=typemax(Int))
509509
510-
Removes the specified workers. Note that only
511-
process 1 can add or remove workers - if another
512-
worker tries to call `rmprocs`, an error will be
513-
thrown. The optional argument `waitfor` determines
514-
how long the first process will wait for the workers
515-
to shut down.
510+
Removes the specified workers. Note that only process 1 can add or remove
511+
workers.
512+
513+
Argument `waitfor` specifies how long to wait for the workers to shut down:
514+
- If unspecified, `rmprocs` will wait until all requested `pids` are removed.
515+
- An `ErrorException` is raised if all workers cannot be terminated before
516+
the requested `waitfor` seconds.
517+
- With a `waitfor` value of 0, the call returns immediately with the workers
518+
scheduled for removal in a different task. The scheduled `Task` object is
519+
returned. The user should call `wait` on the task before invoking any other
520+
parallel calls.
516521
"""
517-
function rmprocs(pids...; waitfor = 0.0)
522+
function rmprocs(pids...; waitfor=typemax(Int))
518523
# Only pid 1 can add and remove processes
519524
if myid() != 1
520-
error("only process 1 can add and remove processes")
525+
throw(ErrorException("only process 1 can add and remove processes"))
521526
end
522527

528+
pids = vcat(pids...)
529+
if waitfor == 0
530+
t = @schedule _rmprocs(pids, typemax(Int))
531+
yield()
532+
return t
533+
else
534+
_rmprocs(pids, waitfor)
535+
# return a dummy task object that user code can wait on.
536+
return @schedule nothing
537+
end
538+
end
539+
540+
function _rmprocs(pids, waitfor)
523541
lock(worker_lock)
524542
try
525543
rmprocset = []
526-
for i in vcat(pids...)
527-
if i == 1
544+
for p in vcat(pids...)
545+
if p == 1
528546
warn("rmprocs: process 1 not removed")
529547
else
530-
if haskey(map_pid_wrkr, i)
531-
w = map_pid_wrkr[i]
548+
if haskey(map_pid_wrkr, p)
549+
w = map_pid_wrkr[p]
532550
set_worker_state(w, W_TERMINATING)
533-
kill(w.manager, i, w.config)
551+
kill(w.manager, p, w.config)
534552
push!(rmprocset, w)
535553
end
536554
end
537555
end
538556

539557
start = time()
540558
while (time() - start) < waitfor
541-
if all(w -> w.state == W_TERMINATED, rmprocset)
542-
break
543-
else
544-
sleep(0.1)
545-
end
559+
all(w -> w.state == W_TERMINATED, rmprocset) && break
560+
sleep(min(0.1, waitfor - (time() - start)))
546561
end
547562

548-
((waitfor > 0) && any(w -> w.state != W_TERMINATED, rmprocset)) ? :timed_out : :ok
563+
unremoved = [wrkr.id for wrkr in filter(w -> w.state != W_TERMINATED, rmprocset)]
564+
if length(unremoved) > 0
565+
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
566+
throw(ErrorException(estr))
567+
end
549568
finally
550569
unlock(worker_lock)
551570
end
552571
end
553572

573+
554574
"""
555575
ProcessExitedException()
556576
@@ -2238,18 +2258,18 @@ function check_same_host(pids)
22382258
end
22392259

22402260
function terminate_all_workers()
2241-
if myid() != 1
2242-
return
2243-
end
2261+
myid() != 1 && return
22442262

22452263
if nprocs() > 1
2246-
ret = rmprocs(workers(); waitfor=0.5)
2247-
if ret !== :ok
2264+
try
2265+
rmprocs(workers(); waitfor=5.0)
2266+
catch _ex
22482267
warn("Forcibly interrupting busy workers")
22492268
# Might be computation bound, interrupt them and try again
22502269
interrupt(workers())
2251-
ret = rmprocs(workers(); waitfor=0.5)
2252-
if ret !== :ok
2270+
try
2271+
rmprocs(workers(); waitfor=5.0)
2272+
catch _ex2
22532273
warn("Unable to terminate all workers")
22542274
end
22552275
end

test/parallel_exec.jl

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -953,9 +953,7 @@ if is_unix() # aka have ssh
953953
end
954954
end
955955

956-
@test :ok == remotecall_fetch(1, new_pids) do p
957-
rmprocs(p; waitfor=5.0)
958-
end
956+
remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids)
959957
end
960958

961959
print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")
@@ -1250,3 +1248,30 @@ function test_add_procs_threaded_blas()
12501248
rmprocs(processes_added)
12511249
end
12521250
test_add_procs_threaded_blas()
1251+
1252+
#19687
1253+
# ensure no race conditions between rmprocs and addprocs
1254+
for i in 1:5
1255+
p = addprocs(1)[1]
1256+
@spawnat p sleep(5)
1257+
rmprocs(p; waitfor=0)
1258+
end
1259+
1260+
# Test if a wait has been called on rmprocs(...;waitfor=0), further remotecalls
1261+
# don't throw errors.
1262+
for i in 1:5
1263+
p = addprocs(1)[1]
1264+
np = nprocs()
1265+
@spawnat p sleep(5)
1266+
wait(rmprocs(p; waitfor=0))
1267+
for pid in procs()
1268+
@test pid == remotecall_fetch(myid, pid)
1269+
end
1270+
@test nprocs() == np - 1
1271+
end
1272+
1273+
# Test that an exception is thrown if workers are unable to be removed within requested time.
1274+
if DoFullTest
1275+
pids=addprocs(4);
1276+
@test_throws ErrorException rmprocs(pids; waitfor=0.001);
1277+
end

test/topology.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
include("testdefs.jl")
44
addprocs(4; topology="master_slave")
5+
using Base.Test
6+
57
@test_throws RemoteException remotecall_fetch(()->remotecall_fetch(myid, 3), 2)
68

79
function test_worker_counts()
@@ -19,7 +21,7 @@ end
1921

2022
function remove_workers_and_test()
2123
while nworkers() > 0
22-
@test :ok == rmprocs(workers()[1]; waitfor=2.0)
24+
rmprocs(workers()[1]; waitfor=2.0)
2325
test_worker_counts()
2426
if nworkers() == nprocs()
2527
break

0 commit comments

Comments
 (0)