Skip to content

Commit e36d6e3

Browse files
vtjnashKristofferC
authored andcommitted
Revert "Merge pull request JuliaLang/julia#38405 from JuliaLang/vc/distributed_ts" (JuliaLang/julia#41722)
Also reverts "fixup to pull request JuliaLang/julia#38405 (JuliaLang/julia#41641)" Seems to be causing hanging in CI testing. This reverts commit 740a33a and this reverts commit 5a1680533b461471f537f5f5d4ee3c2866b21e1f, reversing changes made to 02807b279a5e6d5acaeb7095e4c0527e2a5c190e. (cherry picked from commit bbc9429)
1 parent b14100c commit e36d6e3

File tree

7 files changed

+47
-169
lines changed

7 files changed

+47
-169
lines changed

src/cluster.jl

Lines changed: 13 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,13 @@ end
9595
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
9696
mutable struct Worker
9797
id::Int
98-
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
9998
del_msgs::Array{Any,1}
10099
add_msgs::Array{Any,1}
101100
gcflag::Bool
102101
state::WorkerState
103-
c_state::Threads.Condition # wait for state changes, lock for state
104-
ct_time::Float64 # creation time
105-
conn_func::Any # used to setup connections lazily
102+
c_state::Condition # wait for state changes
103+
ct_time::Float64 # creation time
104+
conn_func::Any # used to setup connections lazily
106105

107106
r_stream::IO
108107
w_stream::IO
@@ -134,7 +133,7 @@ mutable struct Worker
134133
if haskey(map_pid_wrkr, id)
135134
return map_pid_wrkr[id]
136135
end
137-
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func)
136+
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
138137
w.initialized = Event()
139138
register_worker(w)
140139
w
@@ -144,16 +143,12 @@ mutable struct Worker
144143
end
145144

146145
function set_worker_state(w, state)
147-
lock(w.c_state) do
148-
w.state = state
149-
notify(w.c_state; all=true)
150-
end
146+
w.state = state
147+
notify(w.c_state; all=true)
151148
end
152149

153150
function check_worker_state(w::Worker)
154-
lock(w.c_state)
155151
if w.state === W_CREATED
156-
unlock(w.c_state)
157152
if !isclusterlazy()
158153
if PGRP.topology === :all_to_all
159154
# Since higher pids connect with lower pids, the remote worker
@@ -173,8 +168,6 @@ function check_worker_state(w::Worker)
173168
errormonitor(t)
174169
wait_for_conn(w)
175170
end
176-
else
177-
unlock(w.c_state)
178171
end
179172
end
180173

@@ -193,25 +186,13 @@ function exec_conn_func(w::Worker)
193186
end
194187

195188
function wait_for_conn(w)
196-
lock(w.c_state)
197189
if w.state === W_CREATED
198-
unlock(w.c_state)
199190
timeout = worker_timeout() - (time() - w.ct_time)
200191
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
201192

202-
T = Threads.@spawn begin
203-
sleep($timeout)
204-
lock(w.c_state) do
205-
notify(w.c_state; all=true)
206-
end
207-
end
208-
errormonitor(T)
209-
lock(w.c_state) do
210-
wait(w.c_state)
211-
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
212-
end
213-
else
214-
unlock(w.c_state)
193+
@async (sleep(timeout); notify(w.c_state; all=true))
194+
wait(w.c_state)
195+
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
215196
end
216197
nothing
217198
end
@@ -490,10 +471,6 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
490471
# The `launch` method should add an object of type WorkerConfig for every
491472
# worker launched. It provides information required on how to connect
492473
# to it.
493-
494-
# FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
495-
# but both are part of the public interface. This means we currently can't use
496-
# `Threads.@spawn` in the code below.
497474
launched = WorkerConfig[]
498475
launch_ntfy = Condition()
499476

@@ -506,10 +483,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
506483
while true
507484
if isempty(launched)
508485
istaskdone(t_launch) && break
509-
@async begin
510-
sleep(1)
511-
notify(launch_ntfy)
512-
end
486+
@async (sleep(1); notify(launch_ntfy))
513487
wait(launch_ntfy)
514488
end
515489

@@ -662,12 +636,7 @@ function create_worker(manager, wconfig)
662636
# require the value of config.connect_at which is set only upon connection completion
663637
for jw in PGRP.workers
664638
if (jw.id != 1) && (jw.id < w.id)
665-
# wait for wl to join
666-
lock(jw.c_state) do
667-
if jw.state === W_CREATED
668-
wait(jw.c_state)
669-
end
670-
end
639+
(jw.state === W_CREATED) && wait(jw.c_state)
671640
push!(join_list, jw)
672641
end
673642
end
@@ -690,12 +659,7 @@ function create_worker(manager, wconfig)
690659
end
691660

692661
for wl in wlist
693-
lock(wl.c_state) do
694-
if wl.state === W_CREATED
695-
# wait for wl to join
696-
wait(wl.c_state)
697-
end
698-
end
662+
(wl.state === W_CREATED) && wait(wl.c_state)
699663
push!(join_list, wl)
700664
end
701665
end
@@ -712,11 +676,7 @@ function create_worker(manager, wconfig)
712676
@async manage(w.manager, w.id, w.config, :register)
713677
# wait for rr_ntfy_join with timeout
714678
timedout = false
715-
@async begin
716-
sleep($timeout)
717-
timedout = true
718-
put!(rr_ntfy_join, 1)
719-
end
679+
@async (sleep($timeout); timedout = true; put!(rr_ntfy_join, 1))
720680
wait(rr_ntfy_join)
721681
if timedout
722682
error("worker did not connect within $timeout seconds")

src/macros.jl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

3-
let nextidx = Threads.Atomic{Int}(0)
3+
let nextidx = 0
44
global nextproc
55
function nextproc()
6-
idx = Threads.atomic_add!(nextidx, 1)
7-
return workers()[(idx % nworkers()) + 1]
6+
p = -1
7+
if p == -1
8+
p = workers()[(nextidx % nworkers()) + 1]
9+
nextidx += 1
10+
end
11+
p
812
end
913
end
1014

src/managers.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
160160
# Wait for all launches to complete.
161161
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
162162
let machine=machine, cnt=cnt
163-
@async try
163+
@async try
164164
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
165165
catch e
166166
print(stderr, "exception launching on machine $(machine) : $(e)\n")

src/messages.jl

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,22 @@ function flush_gc_msgs(w::Worker)
126126
if !isdefined(w, :w_stream)
127127
return
128128
end
129-
lock(w.msg_lock) do
130-
w.gcflag || return # early exit if someone else got to this
131-
w.gcflag = false
132-
msgs = w.add_msgs
133-
w.add_msgs = Any[]
134-
if !isempty(msgs)
135-
remote_do(add_clients, w, msgs)
136-
end
129+
w.gcflag = false
130+
new_array = Any[]
131+
msgs = w.add_msgs
132+
w.add_msgs = new_array
133+
if !isempty(msgs)
134+
remote_do(add_clients, w, msgs)
135+
end
137136

138-
msgs = w.del_msgs
139-
w.del_msgs = Any[]
140-
if !isempty(msgs)
141-
remote_do(del_clients, w, msgs)
142-
end
137+
# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
138+
# XXX: threading requires this to be atomic
139+
new_array = Any[]
140+
msgs = w.del_msgs
141+
w.del_msgs = new_array
142+
if !isempty(msgs)
143+
#print("sending delete of $msgs\n")
144+
remote_do(del_clients, w, msgs)
143145
end
144146
end
145147

src/remotecall.jl

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -247,42 +247,22 @@ function del_clients(pairs::Vector)
247247
end
248248
end
249249

250-
# The task below is coalescing the `flush_gc_msgs` call
251-
# across multiple producers, see `send_del_client`,
252-
# and `send_add_client`.
253-
# XXX: Is this worth the additional complexity?
254-
# `flush_gc_msgs` has to iterate over all connected workers.
255-
const any_gc_flag = Threads.Condition()
250+
const any_gc_flag = Condition()
256251
function start_gc_msgs_task()
257-
errormonitor(
258-
Threads.@spawn begin
259-
while true
260-
lock(any_gc_flag) do
261-
wait(any_gc_flag)
262-
flush_gc_msgs() # handles throws internally
263-
end
264-
end
265-
end
266-
)
252+
errormonitor(@async while true
253+
wait(any_gc_flag)
254+
flush_gc_msgs()
255+
end)
267256
end
268257

269-
# Function can be called within a finalizer
270258
function send_del_client(rr)
271259
if rr.where == myid()
272260
del_client(rr)
273261
elseif id_in_procs(rr.where) # process only if a valid worker
274262
w = worker_from_id(rr.where)::Worker
275-
msg = (remoteref_id(rr), myid())
276-
# We cannot acquire locks from finalizers
277-
Threads.@spawn begin
278-
lock(w.msg_lock) do
279-
push!(w.del_msgs, msg)
280-
w.gcflag = true
281-
end
282-
lock(any_gc_flag) do
283-
notify(any_gc_flag)
284-
end
285-
end
263+
push!(w.del_msgs, (remoteref_id(rr), myid()))
264+
w.gcflag = true
265+
notify(any_gc_flag)
286266
end
287267
end
288268

@@ -308,13 +288,9 @@ function send_add_client(rr::AbstractRemoteRef, i)
308288
# to the processor that owns the remote ref. it will add_client
309289
# itself inside deserialize().
310290
w = worker_from_id(rr.where)
311-
lock(w.msg_lock) do
312-
push!(w.add_msgs, (remoteref_id(rr), i))
313-
w.gcflag = true
314-
end
315-
lock(any_gc_flag) do
316-
notify(any_gc_flag)
317-
end
291+
push!(w.add_msgs, (remoteref_id(rr), i))
292+
w.gcflag = true
293+
notify(any_gc_flag)
318294
end
319295
end
320296

test/distributed_exec.jl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1689,5 +1689,4 @@ include("splitrange.jl")
16891689
# Run topology tests last after removing all workers, since a given
16901690
# cluster at any time only supports a single topology.
16911691
rmprocs(workers())
1692-
include("threads.jl")
16931692
include("topology.jl")

test/threads.jl

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)