Skip to content

Commit 30cae5b

Browse files
authored
Merge pull request JuliaLang/julia#42255 from JuliaLang/backports-release-1.7
release-1.7: Backports for 1.7-rc2
2 parents 7156ef2 + 75bd990 commit 30cae5b

File tree

6 files changed

+137
-63
lines changed

6 files changed

+137
-63
lines changed

src/Distributed.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,15 @@ function _require_callback(mod::Base.PkgId)
8383
end
8484
end
8585

86-
const REF_ID = Ref(1)
87-
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)
86+
const REF_ID = Threads.Atomic{Int}(1)
87+
next_ref_id() = Threads.atomic_add!(REF_ID, 1)
8888

8989
struct RRID
9090
whence::Int
9191
id::Int
9292

93-
RRID() = RRID(myid(),next_ref_id())
94-
RRID(whence, id) = new(whence,id)
93+
RRID() = RRID(myid(), next_ref_id())
94+
RRID(whence, id) = new(whence, id)
9595
end
9696

9797
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))

src/cluster.jl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ end
9595
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
9696
mutable struct Worker
9797
id::Int
98-
del_msgs::Array{Any,1}
98+
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
99+
del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
99100
add_msgs::Array{Any,1}
100-
gcflag::Bool
101+
@atomic gcflag::Bool
101102
state::WorkerState
102103
c_state::Condition # wait for state changes
103104
ct_time::Float64 # creation time
@@ -133,7 +134,7 @@ mutable struct Worker
133134
if haskey(map_pid_wrkr, id)
134135
return map_pid_wrkr[id]
135136
end
136-
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
137+
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func)
137138
w.initialized = Event()
138139
register_worker(w)
139140
w
@@ -471,6 +472,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
471472
# The `launch` method should add an object of type WorkerConfig for every
472473
# worker launched. It provides information required on how to connect
473474
# to it.
475+
476+
# FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
477+
# but both are part of the public interface. This means we currently can't use
478+
# `Threads.@spawn` in the code below.
474479
launched = WorkerConfig[]
475480
launch_ntfy = Condition()
476481

src/macros.jl

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

3-
let nextidx = 0
3+
let nextidx = Threads.Atomic{Int}(0)
44
global nextproc
55
function nextproc()
6-
p = -1
7-
if p == -1
8-
p = workers()[(nextidx % nworkers()) + 1]
9-
nextidx += 1
10-
end
11-
p
6+
idx = Threads.atomic_add!(nextidx, 1)
7+
return workers()[(idx % nworkers()) + 1]
128
end
139
end
1410

src/messages.jl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,30 @@ function flush_gc_msgs(w::Worker)
126126
if !isdefined(w, :w_stream)
127127
return
128128
end
129-
w.gcflag = false
130-
new_array = Any[]
131-
msgs = w.add_msgs
132-
w.add_msgs = new_array
133-
if !isempty(msgs)
134-
remote_do(add_clients, w, msgs)
135-
end
129+
add_msgs = nothing
130+
del_msgs = nothing
131+
@lock w.msg_lock begin
132+
if !w.gcflag # No work needed for this worker
133+
return
134+
end
135+
@atomic w.gcflag = false
136+
if !isempty(w.add_msgs)
137+
add_msgs = w.add_msgs
138+
w.add_msgs = Any[]
139+
end
136140

137-
# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
138-
# XXX: threading requires this to be atomic
139-
new_array = Any[]
140-
msgs = w.del_msgs
141-
w.del_msgs = new_array
142-
if !isempty(msgs)
143-
#print("sending delete of $msgs\n")
144-
remote_do(del_clients, w, msgs)
141+
if !isempty(w.del_msgs)
142+
del_msgs = w.del_msgs
143+
w.del_msgs = Any[]
144+
end
145+
end
146+
if add_msgs !== nothing
147+
remote_do(add_clients, w, add_msgs)
148+
end
149+
if del_msgs !== nothing
150+
remote_do(del_clients, w, del_msgs)
145151
end
152+
return
146153
end
147154

148155
# Boundary inserted between messages on the wire, used for recovering
@@ -187,7 +194,7 @@ end
187194
function flush_gc_msgs()
188195
try
189196
for w in (PGRP::ProcessGroup).workers
190-
if isa(w,Worker) && w.gcflag && (w.state == W_CONNECTED)
197+
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
191198
flush_gc_msgs(w)
192199
end
193200
end

src/remotecall.jl

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,24 @@ end
8484

8585
function finalize_ref(r::AbstractRemoteRef)
8686
if r.where > 0 # Handle the case of the finalizer having been called manually
87-
if islocked(client_refs)
88-
# delay finalizer for later, when it's not already locked
87+
if trylock(client_refs.lock) # trylock doesn't call wait which causes yields
88+
try
89+
delete!(client_refs.ht, r) # direct removal avoiding locks
90+
if isa(r, RemoteChannel)
91+
send_del_client_no_lock(r)
92+
else
93+
# send_del_client only if the reference has not been set
94+
r.v === nothing && send_del_client_no_lock(r)
95+
r.v = nothing
96+
end
97+
r.where = 0
98+
finally
99+
unlock(client_refs.lock)
100+
end
101+
else
89102
finalizer(finalize_ref, r)
90103
return nothing
91104
end
92-
delete!(client_refs, r)
93-
if isa(r, RemoteChannel)
94-
send_del_client(r)
95-
else
96-
# send_del_client only if the reference has not been set
97-
r.v === nothing && send_del_client(r)
98-
r.v = nothing
99-
end
100-
r.where = 0
101105
end
102106
nothing
103107
end
@@ -229,13 +233,18 @@ del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
229233
del_client(id, client) = del_client(PGRP, id, client)
230234
function del_client(pg, id, client)
231235
lock(client_refs) do
232-
rv = get(pg.refs, id, false)
233-
if rv !== false
234-
delete!(rv.clientset, client)
235-
if isempty(rv.clientset)
236-
delete!(pg.refs, id)
237-
#print("$(myid()) collected $id\n")
238-
end
236+
_del_client(pg, id, client)
237+
end
238+
nothing
239+
end
240+
241+
function _del_client(pg, id, client)
242+
rv = get(pg.refs, id, false)
243+
if rv !== false
244+
delete!(rv.clientset, client)
245+
if isempty(rv.clientset)
246+
delete!(pg.refs, id)
247+
#print("$(myid()) collected $id\n")
239248
end
240249
end
241250
nothing
@@ -247,25 +256,67 @@ function del_clients(pairs::Vector)
247256
end
248257
end
249258

250-
const any_gc_flag = Condition()
259+
# The task below is coalescing the `flush_gc_msgs` call
260+
# across multiple producers, see `send_del_client`,
261+
# and `send_add_client`.
262+
# XXX: Is this worth the additional complexity?
263+
# `flush_gc_msgs` has to iterate over all connected workers.
264+
const any_gc_flag = Threads.Condition()
251265
function start_gc_msgs_task()
252-
errormonitor(@async while true
253-
wait(any_gc_flag)
254-
flush_gc_msgs()
255-
end)
266+
errormonitor(
267+
Threads.@spawn begin
268+
while true
269+
lock(any_gc_flag) do
270+
# this might miss events
271+
wait(any_gc_flag)
272+
end
273+
flush_gc_msgs() # handles throws internally
274+
end
275+
end
276+
)
256277
end
257278

279+
# Function can be called within a finalizer
258280
function send_del_client(rr)
259281
if rr.where == myid()
260282
del_client(rr)
261283
elseif id_in_procs(rr.where) # process only if a valid worker
262-
w = worker_from_id(rr.where)::Worker
263-
push!(w.del_msgs, (remoteref_id(rr), myid()))
264-
w.gcflag = true
284+
process_worker(rr)
285+
end
286+
end
287+
288+
function send_del_client_no_lock(rr)
289+
# for gc context to avoid yields
290+
if rr.where == myid()
291+
_del_client(PGRP, remoteref_id(rr), myid())
292+
elseif id_in_procs(rr.where) # process only if a valid worker
293+
process_worker(rr)
294+
end
295+
end
296+
297+
function publish_del_msg!(w::Worker, msg)
298+
lock(w.msg_lock) do
299+
push!(w.del_msgs, msg)
300+
@atomic w.gcflag = true
301+
end
302+
lock(any_gc_flag) do
265303
notify(any_gc_flag)
266304
end
267305
end
268306

307+
function process_worker(rr)
308+
w = worker_from_id(rr.where)::Worker
309+
msg = (remoteref_id(rr), myid())
310+
311+
# Needs to aquire a lock on the del_msg queue
312+
T = Threads.@spawn begin
313+
publish_del_msg!($w, $msg)
314+
end
315+
Base.errormonitor(T)
316+
317+
return
318+
end
319+
269320
function add_client(id, client)
270321
lock(client_refs) do
271322
rv = lookup_ref(id)
@@ -288,9 +339,13 @@ function send_add_client(rr::AbstractRemoteRef, i)
288339
# to the processor that owns the remote ref. it will add_client
289340
# itself inside deserialize().
290341
w = worker_from_id(rr.where)
291-
push!(w.add_msgs, (remoteref_id(rr), i))
292-
w.gcflag = true
293-
notify(any_gc_flag)
342+
lock(w.msg_lock) do
343+
push!(w.add_msgs, (remoteref_id(rr), i))
344+
@atomic w.gcflag = true
345+
end
346+
lock(any_gc_flag) do
347+
notify(any_gc_flag)
348+
end
294349
end
295350
end
296351

test/distributed_exec.jl

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,18 @@ end
132132
testf(id_me)
133133
testf(id_other)
134134

135+
function poll_while(f::Function; timeout_seconds::Integer = 60)
136+
start_time = time_ns()
137+
while f()
138+
sleep(1)
139+
if ( ( time_ns() - start_time )/1e9 ) > timeout_seconds
140+
@error "Timed out" timeout_seconds
141+
return false
142+
end
143+
end
144+
return true
145+
end
146+
135147
# Distributed GC tests for Futures
136148
function test_futures_dgc(id)
137149
f = remotecall(myid, id)
@@ -143,8 +155,7 @@ function test_futures_dgc(id)
143155
@test fetch(f) == id
144156
@test f.v !== nothing
145157
yield(); # flush gc msgs
146-
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid) == false
147-
158+
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid))
148159

149160
# if unfetched, it should be deleted after a finalize
150161
f = remotecall(myid, id)
@@ -153,7 +164,7 @@ function test_futures_dgc(id)
153164
@test f.v === nothing
154165
finalize(f)
155166
yield(); # flush gc msgs
156-
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid) == false
167+
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid))
157168
end
158169

159170
test_futures_dgc(id_me)
@@ -243,7 +254,7 @@ function test_remoteref_dgc(id)
243254
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid) == true
244255
finalize(rr)
245256
yield(); # flush gc msgs
246-
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid) == false
257+
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid))
247258
end
248259
test_remoteref_dgc(id_me)
249260
test_remoteref_dgc(id_other)

0 commit comments

Comments
 (0)