Skip to content

Commit

Permalink
Use tdiag_connect::receive::await_sockets in
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Aug 9, 2019
1 parent 708a3b7 commit ab412a4
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 43 deletions.
3 changes: 2 additions & 1 deletion tdiag/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ edition = "2018"
timely = "^0.10"
differential-dataflow = "^0.10"
clap = "^2.33"
tdiag-connect = "^0.2"
# tdiag-connect = "^0.2"
tdiag-connect = { path = "../connect" }
53 changes: 11 additions & 42 deletions tdiag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,51 +133,20 @@ variable pointing to tdiag's differential port (51318 by default).

match differential_args.subcommand() {
("arrangements", Some(args)) => {
use std::net::{TcpListener, ToSocketAddrs};
// It's crucial that we bind to both listening
// addresses first, before waiting for
// connections. Otherwise we will open up the
// potential for a race condition in the source
// computation.

let timely_listener = {
println!(
"Listening for {} Timely connections on {}:{}",
source_peers, ip_addr, port
);
println!("Listening for {} Timely connections on {}:{}", source_peers, ip_addr, port);
let timely_listener = tdiag_connect::receive::bind(ip_addr, port)?;

let socket_addr = (ip_addr, port).to_socket_addrs()?
.next()
.ok_or(tdiag_connect::ConnectError::Other("Invalid listening address".to_string()))?;
println!("Listening for {} Differential connections on {}:{}", source_peers, ip_addr, differential_port);
let differential_listener = tdiag_connect::receive::bind(ip_addr, differential_port)?;

TcpListener::bind(socket_addr)?
};

let differential_listener = {
println!(
"Listening for {} Differential connections on {}:{}",
source_peers,
ip_addr,
differential_port
);

let socket_addr = (ip_addr, differential_port).to_socket_addrs()?
.next()
.ok_or(tdiag_connect::ConnectError::Other("Invalid listening address".to_string()))?;

TcpListener::bind(socket_addr)?
};

let timely_sockets = (0..source_peers).map(|_| {
let socket = timely_listener.incoming().next().expect("Socket unexpectedly unavailable");
if let Ok(ref s) = &socket {
s.set_nonblocking(true)?;
}
socket.map(Some)
}).collect::<Result<Vec<_>, _>>()?;

let differential_sockets = (0..source_peers).map(|_| {
let socket = differential_listener.incoming().next().expect("Socket unexpectedly unavailable");
if let Ok(ref s) = &socket {
s.set_nonblocking(true)?;
}
socket.map(Some)
}).collect::<Result<Vec<_>, _>>()?;
let timely_sockets = tdiag_connect::receive::await_sockets(timely_listener, source_peers)?;
let differential_sockets = tdiag_connect::receive::await_sockets(differential_listener, source_peers)?;

let output_interval_ms: u64 = args.value_of("output-interval")
.expect("error parsing args")
Expand Down

0 comments on commit ab412a4

Please sign in to comment.