diff --git a/connect/src/receive/connect.rs b/connect/src/receive/connect.rs index b4e102c..516a182 100644 --- a/connect/src/receive/connect.rs +++ b/connect/src/receive/connect.rs @@ -16,9 +16,30 @@ use std::path::PathBuf; /// /// The sockets are returned in nonblocking mode. pub fn open_sockets(ip_addr: IpAddr, port: u16, source_peers: usize) -> Result>, ConnectError> { + let listener = bind(ip_addr, port)?; + await_sockets(listener, source_peers) +} + +/// Wraps `TcpListener::bind` to return `ConnectError`s. +pub fn bind(ip_addr: IpAddr, port: u16) -> Result { let socket_addr = (ip_addr, port).to_socket_addrs()? .next().ok_or(ConnectError::Other("Invalid listening address".to_string()))?; - let listener = TcpListener::bind(socket_addr)?; + + match TcpListener::bind(socket_addr) { + Err(err) => Err(ConnectError::Other(err.to_string())), + Ok(listener) => Ok(listener) + } +} + +/// Listens on the provided socket until `source_peers` connections +/// from the computations we're examining have been established (one +/// socket for every worker on the examined computation). +/// +/// The sockets are wrapped in `Some(_)` because the result is +/// commonly used as a an argument to `make_readers` in this module. +/// +/// The sockets are returned in nonblocking mode. +pub fn await_sockets(listener: TcpListener, source_peers: usize) -> Result>, ConnectError> { Ok((0..source_peers).map(|_| { let socket = listener.incoming().next().expect("Socket unexpectedly unavailable"); if let Ok(ref s) = &socket {