Skip to content

Commit ee6e544

Browse files
authored
Merge pull request #19 from JuliaParallel/multiple-workers
Support bind port hints
2 parents f9cee85 + 9550dcf commit ee6e544

File tree

4 files changed

+72
-17
lines changed

4 files changed

+72
-17
lines changed

docs/src/_changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@ This documents notable changes in DistributedNext.jl. The format is based on
1515
implementation would take a worker out of the pool and immediately put it back
1616
in without waiting for the returned [`Future`](@ref). Now it will wait for the
1717
`Future` before putting the worker back in the pool ([#20]).
18+
- Fixed cases like `addprocs([("machine 10.1.1.1:9000", 2)])` where the bind
19+
port is specified. Previously this would cause errors when the workers all
20+
tried to bind to the same port, now all additional workers will treat the bind
21+
port as a port hint ([#19]).
1822

1923
### Added
2024
- A watcher mechanism has been added to detect when both the Distributed stdlib
2125
and DistributedNext may be active and adding workers. This should help prevent
2226
incompatibilities from both libraries being used simultaneously ([#10]).
2327
- [`other_workers()`](@ref) and [`other_procs()`](@ref) were implemented and
2428
exported ([#18]).
29+
- The `SSHManager` now supports specifying a bind port hint in the machine
30+
specification ([#19], see the [`addprocs()`](@ref) docs).
2531

2632
### Changed
2733
- [`remotecall_eval`](@ref) is now exported ([#23]).

src/cluster.jl

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ end
214214
mutable struct LocalProcess
215215
id::Int
216216
bind_addr::String
217+
bind_port_hint::Int
217218
bind_port::Int
218219
cookie::String
219220
LocalProcess() = new(1)
@@ -257,8 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
257258
init_worker(cookie)
258259
interface = IPv4(LPROC.bind_addr)
259260
if LPROC.bind_port == 0
260-
port_hint = 9000 + (getpid() % 1000)
261-
(port, sock) = listenany(interface, port_hint)
261+
(port, sock) = listenany(interface, LPROC.bind_port_hint)
262262
LPROC.bind_port = Int(port)
263263
else
264264
sock = listen(interface, LPROC.bind_port)
@@ -733,8 +733,29 @@ function launch_additional(np::Integer, cmd::Cmd)
733733
io_objs = Vector{Any}(undef, np)
734734
addresses = Vector{Any}(undef, np)
735735

736+
worker_cmd = Cmd(cmd)
737+
bind_idx = findfirst(==("--bind-to"), cmd)
738+
if !isnothing(bind_idx)
739+
# The actual bind spec will be the next argument
740+
bind_idx += 1
741+
742+
bind_addr = worker_cmd[bind_idx]
743+
parts = split(bind_addr, ':')
744+
if length(parts) == 2
745+
port_str = parts[2]
746+
747+
# If the port is not specified as a port hint then we convert it
748+
# to a hint, otherwise the workers will try to bind to the same
749+
# port and error.
750+
if !startswith(port_str, '[')
751+
new_bind_addr = "$(parts[1]):[$(port_str)]"
752+
worker_cmd.exec[bind_idx] = new_bind_addr
753+
end
754+
end
755+
end
756+
736757
for i in 1:np
737-
io = open(detach(cmd), "r+")
758+
io = open(detach(worker_cmd), "r+")
738759
write_cookie(io)
739760
io_objs[i] = io.out
740761
end
@@ -1318,17 +1339,24 @@ end
13181339
# initialize the local proc network address / port
13191340
function init_bind_addr()
13201341
opts = JLOptions()
1342+
bind_port_hint = 9000 + (getpid() % 1000)
1343+
bind_port = 0
1344+
13211345
if opts.bindto != C_NULL
13221346
bind_to = split(unsafe_string(opts.bindto), ":")
13231347
bind_addr = string(parse(IPAddr, bind_to[1]))
13241348
if length(bind_to) > 1
1325-
bind_port = parse(Int,bind_to[2])
1326-
else
1327-
bind_port = 0
1349+
port_str = bind_to[2]
1350+
if startswith(port_str, '[')
1351+
if !endswith(port_str, ']')
1352+
error("Malformed bind port string, please see the addprocs documentation for the formatting rules: $(port_str)")
1353+
end
1354+
bind_port_hint = parse(Int, port_str[2:end - 1])
1355+
else
1356+
bind_port = parse(Int, port_str)
1357+
end
13281358
end
13291359
else
1330-
bind_port = 0
1331-
13321360
interfaces = _get_interfaces(IPv4)
13331361
if isempty(interfaces)
13341362
# Include IPv6 interfaces if there are no IPv4 ones
@@ -1355,6 +1383,7 @@ function init_bind_addr()
13551383
global LPROC
13561384
LPROC.bind_addr = bind_addr
13571385
LPROC.bind_port = bind_port
1386+
LPROC.bind_port_hint = bind_port_hint
13581387
end
13591388

13601389
using Random: randstring

src/managers.jl

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,32 @@ arguments (see below). In particular, the `exename` keyword can be used to speci
5656
the path to the `julia` binary on the remote machine(s).
5757
5858
`machines` is a vector of "machine specifications" which are given as strings of
59-
the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user and `port`
60-
to the standard SSH port. If `[bind_addr[:port]]` is specified, other workers will connect
61-
to this worker at the specified `bind_addr` and `port`.
59+
the form `[user@]host[:ssh_port] [bind_addr[:bind_port]]`. `user` defaults to
60+
current user and `ssh_port` to the standard SSH port. If
61+
`[bind_addr[:bind_port]]` is specified, other workers will connect to this
62+
worker at the specified `bind_addr` and `bind_port`. `bind_port` can be a
63+
specific port like in `addr:9000`, but it can also specify a port hint by
64+
enclosing it in brackets like `addr:[9000]`. Giving a port hint means that
65+
DistributedNext will try to bind to the specified port, but will fall back to
66+
another free port if it's unavailable.
6267
6368
It is possible to launch multiple processes on a remote host by using a tuple in the
6469
`machines` vector or the form `(machine_spec, count)`, where `count` is the number of
6570
workers to be launched on the specified host. Passing `:auto` as the worker count will
66-
launch as many workers as the number of CPU threads on the remote host.
71+
launch as many workers as the number of CPU threads on the remote host. If the
72+
`bind_port` is specified then the first worker will bind to `bind_port` and all
73+
other workers on the host will use `bind_port` as a port hint.
6774
6875
**Examples**:
6976
```julia
7077
addprocs([
71-
"remote1", # one worker on 'remote1' logging in with the current username
72-
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
73-
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
74-
("user@remote4", 4), # launch 4 workers on 'remote4'
75-
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
78+
"remote1", # one worker on 'remote1' logging in with the current username
79+
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
80+
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
81+
"user@remote4 10.1.1.1:8000" # specify the address for the worker to bind to on 'remote4'
82+
"user@remote5 10.1.1.1:[8000]" # same as above, but with a port hint instead of a specific port
83+
("user@remote4", 4), # launch 4 workers on 'remote4'
84+
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
7685
])
7786
```
7887

test/sshmanager.jl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ end
6161
@test length(new_pids) == 5
6262
test_n_remove_pids(new_pids)
6363

64+
print("\nssh addprocs with a port hint\n")
65+
new_pids = addprocs_with_testenv(["localhost 127.0.0.1:[8000]"]; sshflags=sshflags)
66+
worker = DistributedNext.worker_from_id(only(new_pids))
67+
@test 8000 >= worker.config.port < 9000
68+
test_n_remove_pids(new_pids)
69+
70+
print("\nssh addprocs with multiple workers and port specified\n")
71+
new_pids = addprocs_with_testenv([("localhost 127.0.0.1:8000", 2)]; sshflags=sshflags)
72+
@test length(new_pids) == 2
73+
test_n_remove_pids(new_pids)
74+
6475
print("\nssh addprocs with tunnel\n")
6576
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags)
6677
@test length(new_pids) == num_workers

0 commit comments

Comments
 (0)