Skip to content

Commit

Permalink
Added timeout to RPS query
Browse files Browse the repository at this point in the history
  • Loading branch information
voidc committed Aug 27, 2020
1 parent 3487e47 commit 803abc4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
49 changes: 36 additions & 13 deletions src/api/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use onion::{Peer, RsaPrivateKey, RsaPublicKey};
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time;
use tokio::time::Duration;

const PEER_BUFFER_SIZE: usize = 20;
const QUERY_TIMEOUT: Duration = Duration::from_secs(2);

pub enum RpsModule {
Socket(SocketRpsModule),
Expand Down Expand Up @@ -73,20 +76,40 @@ impl SocketRpsModule {

async fn query(&mut self) -> Result<Peer> {
self.socket.write(RpsRequest::Query).await?;
if let Some(msg) = self.socket.read_next().await? {
match msg {
RpsResponse::Peer(_port, portmap, peer_addr, peer_hostkey) => {
let (_, peer_port) = portmap
.iter()
.find(|(m, _)| *m == Module::Onion)
.ok_or_else(|| anyhow!("Peer does not expose onion port"))?;
let peer_addr = SocketAddr::new(peer_addr, *peer_port);
let peer_hostkey = RsaPublicKey::new(peer_hostkey.as_ref());
Ok(Peer::new(peer_addr, peer_hostkey))
}
let msg = time::timeout(QUERY_TIMEOUT, self.socket.read_next())
.await
.map_err(|_| anyhow!("RPS query timed out"))?
.map_err(|e| anyhow!("RPS query failed: {}", e))?;

match msg {
RpsResponse::Peer(_port, portmap, peer_addr, peer_hostkey) => {
let (_, peer_port) = portmap
.iter()
.find(|(m, _)| *m == Module::Onion)
.ok_or_else(|| anyhow!("Peer does not expose onion port"))?;
let peer_addr = SocketAddr::new(peer_addr, *peer_port);
let peer_hostkey = RsaPublicKey::new(peer_hostkey.as_ref());
Ok(Peer::new(peer_addr, peer_hostkey))
}
} else {
Err(anyhow!("rps query failed"))
}
}
}

#[cfg(test)]
mod tests {
use crate::api::config::RpsConfig;
use crate::api::rps::RpsModule;

#[tokio::test]
#[ignore = "requires a running RPS instance listening on 127.0.0.1:7101"]
async fn test_rps_query() {
let config = RpsConfig {
api_address: Some("127.0.0.1:7101".parse().unwrap()),
peers: None,
};

let mut rps = RpsModule::new(&config).await.unwrap();
println!("Connected to RPS");
println!("{:?}", rps.query().await);
}
}
6 changes: 3 additions & 3 deletions src/api/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<S> ApiSocket<S> {
}

impl<S: AsyncRead + Unpin> ApiSocket<S> {
pub(crate) async fn read_next<M: TryFromBytes<anyhow::Error>>(&mut self) -> Result<Option<M>> {
pub async fn read_next<M: TryFromBytes<anyhow::Error>>(&mut self) -> Result<M> {
let mut size_buf = [0u8; 2];
self.stream.read_exact(&mut size_buf).await?;
let size = u16::from_be_bytes(size_buf) as usize;
Expand All @@ -27,12 +27,12 @@ impl<S: AsyncRead + Unpin> ApiSocket<S> {
self.buf[0] = size_buf[0];
self.buf[1] = size_buf[1];
self.stream.read_exact(&mut self.buf[2..]).await?;
Ok(Some(M::try_read_from(&mut self.buf)?))
Ok(M::try_read_from(&mut self.buf)?)
}
}

impl<S: AsyncWrite + Unpin> ApiSocket<S> {
pub(crate) async fn write<M: ToBytes>(&mut self, message: M) -> Result<()> {
pub async fn write<M: ToBytes>(&mut self, message: M) -> Result<()> {
self.buf.clear();
self.buf.reserve(message.size());
message.write_to(&mut self.buf);
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ impl OnionModule {
.insert(client_addr, ApiSocket::new(write_stream));
let mut socket = ApiSocket::new(read_stream);

while let Some(msg) = socket.read_next::<OnionRequest>().await? {
loop {
let msg = socket.read_next::<OnionRequest>().await?;
trace!("Handling {:?}", msg);
let _msg_id = msg.id();
match msg {
Expand Down Expand Up @@ -128,7 +129,6 @@ impl OnionModule {
}
}
}
Ok(())
}

/// Handles P2P protocol events and notifies interested API clients
Expand Down

0 comments on commit 803abc4

Please sign in to comment.