Skip to content

Commit

Permalink
Avoid race condition on differential logging connection
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Aug 8, 2019
1 parent 60e37dc commit a549f1b
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions tdiag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,59 @@ variable pointing to tdiag's differential port (51318 by default).
crate::commands::profile::listen_and_profile(timely_configuration, sockets)
}
("differential", Some(differential_args)) => {

let differential_port: u16 = differential_args.value_of("port")
.expect("error parsing args")
.parse()
.map_err(|e| DiagError(format!("Invalid --port: {}", e)))?;

match differential_args.subcommand() {
("arrangements", Some(_args)) => {
println!(
"Listening for {} Timely connections on {}:{}",
source_peers, ip_addr, port
);
let timely_sockets =
tdiag_connect::receive::open_sockets(ip_addr.clone(), port, source_peers)?;

println!(
"Listening for {} Differential connections on {}:{}",
source_peers,
ip_addr,
port + 1
);
let differential_sockets =
tdiag_connect::receive::open_sockets(ip_addr.clone(), port + 1, source_peers)?;
use std::net::{TcpListener, ToSocketAddrs};

let timely_listener = {
println!(
"Listening for {} Timely connections on {}:{}",
source_peers, ip_addr, port
);

let socket_addr = (ip_addr, port).to_socket_addrs()?
.next()
.ok_or(tdiag_connect::ConnectError::Other("Invalid listening address".to_string()))?;

TcpListener::bind(socket_addr)?
};

let differential_listener = {
println!(
"Listening for {} Differential connections on {}:{}",
source_peers,
ip_addr,
differential_port
);

let socket_addr = (ip_addr, differential_port).to_socket_addrs()?
.next()
.ok_or(tdiag_connect::ConnectError::Other("Invalid listening address".to_string()))?;

TcpListener::bind(socket_addr)?
};

let timely_sockets = (0..source_peers).map(|_| {
let socket = timely_listener.incoming().next().expect("Socket unexpectedly unavailable");
if let Ok(ref s) = &socket {
s.set_nonblocking(true)?;
}
socket.map(Some)
}).collect::<Result<Vec<_>, _>>()?;

let differential_sockets = (0..source_peers).map(|_| {
let socket = differential_listener.incoming().next().expect("Socket unexpectedly unavailable");
if let Ok(ref s) = &socket {
s.set_nonblocking(true)?;
}
socket.map(Some)
}).collect::<Result<Vec<_>, _>>()?;

println!("Trace sources connected");
crate::commands::arrangements::listen(
Expand Down

0 comments on commit a549f1b

Please sign in to comment.