Skip to content

Commit 137dae6

Browse files
committed
Specialize remotecall_pool(remotecall) to wait for the remotecall
Otherwise the worker would prematurely be put back into the pool, causing oversubscription. Also added a warning about oversubscription to the docstring for `remote_do(f, ::AbstractWorkerPool)`.
1 parent 0cca4d3 commit 137dae6

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

docs/src/_changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ This documents notable changes in DistributedNext.jl. The format is based on
1111

1212
### Fixed
1313
- Fixed a cause of potential hangs when exiting the process ([#16]).
14+
- Fixed a subtle bug in `remotecall(f, ::AbstractWorkerPool)`, previously the
15+
implementation would take a worker out of the pool and immediately put it back
16+
in without waiting for the returned [`Future`](@ref). Now it will wait for the
17+
`Future` before putting the worker back in the pool ([#20]).
1418

1519
### Added
1620
- A watcher mechanism has been added to detect when both the Distributed stdlib

src/workerpool.jl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,22 @@ function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
135135
end
136136
end
137137

138+
# Specialization for remotecall. We have to wait for the Future it returns
139+
# before putting the worker back in the pool.
140+
function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, args...; kwargs...)
141+
worker = take!(pool)
142+
x = rc_f(f, worker, args...; kwargs...)
143+
144+
t = Threads.@spawn try
145+
wait(x)
146+
finally
147+
put!(pool, worker)
148+
end
149+
errormonitor(t)
150+
151+
return x
152+
end
153+
138154
# Check if pool is local or remote and forward calls if required.
139155
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
140156
# it avoids the overhead associated with a local remotecall.
@@ -242,6 +258,10 @@ remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_p
242258
243259
[`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and
244260
perform a `remote_do` on it.
261+
262+
Note that it's not possible to wait for the result of a `remote_do()` to finish
263+
so the worker will immediately be put back in the pool (i.e. potentially causing
264+
oversubscription).
245265
"""
246266
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...)
247267

test/distributed_exec.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,19 @@ f16091b = () -> 1
10141014
@test_throws RemoteException fetch(ref)
10151015
end
10161016

1017+
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
1018+
# keep the worker out of the pool until the underlying remotecall has
1019+
# finished.
1020+
remotechan = RemoteChannel(wrkr1)
1021+
pool = WorkerPool([wrkr1])
1022+
put_future = remotecall(() -> wait(remotechan), pool)
1023+
@test !isready(pool)
1024+
put!(remotechan, 1)
1025+
wait(put_future)
1026+
# The task that waits on the future to put it back into the pool runs
1027+
# asynchronously so we use timedwait() to check when the worker is back in.
1028+
@test timedwait(() -> isready(pool), 10) === :ok
1029+
10171030
# Test calling @everywhere from a module not defined on the workers
10181031
LocalBar.bar()
10191032
for p in procs()

0 commit comments

Comments
 (0)