Skip to content

Commit

Permalink
Improved OnionListener
Browse files Browse the repository at this point in the history
  • Loading branch information
voidc committed Oct 13, 2020
1 parent b3a05f4 commit 782d92f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 26 deletions.
85 changes: 61 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::onion::tunnel::{self, Target, TunnelBuilder, TunnelHandler};
use anyhow::anyhow;
use bytes::Bytes;
use futures::stream::StreamExt;
use log::{info, trace, warn};
use log::{debug, info, trace, warn};
use ring::rand;
use std::collections::{hash_map, HashMap};
use std::net::SocketAddr;
Expand Down Expand Up @@ -90,19 +90,24 @@ pub struct OnionTunnel {
tunnel_id: TunnelId,
data_tx: mpsc::UnboundedSender<Bytes>,
data_rx: mpsc::Receiver<Bytes>,
counted: bool,
}

impl OnionTunnel {
pub(crate) fn new(
tunnel_id: TunnelId,
counted: bool,
) -> (Self, mpsc::Sender<Bytes>, mpsc::UnboundedReceiver<Bytes>) {
TUNNEL_COUNT.fetch_add(1, Ordering::Relaxed);
if counted {
TUNNEL_COUNT.fetch_add(1, Ordering::Relaxed);
}
let (data_tx, data_rx2) = mpsc::unbounded_channel();
let (data_tx2, data_rx) = mpsc::channel(DATA_BUFFER_SIZE);
let tunnel = Self {
tunnel_id,
data_tx,
data_rx,
counted,
};
(tunnel, data_tx2, data_rx2)
}
Expand Down Expand Up @@ -134,6 +139,21 @@ impl OnionTunnel {
data_tx: self.data_tx.clone(),
}
}

async fn forward_data(
mut self,
mut tunnel_rx: mpsc::Receiver<OnionTunnel>,
mut data_tx: mpsc::Sender<Bytes>,
mut data_rx: mpsc::UnboundedReceiver<Bytes>,
) -> Option<()> {
loop {
tokio::select! {
t = tunnel_rx.recv() => self = t?,
d = self.read() => data_tx.send(d.ok()?).await.ok()?,
d = data_rx.recv() => self.write(d?).ok()?,
}
}
}
}

impl fmt::Debug for OnionTunnel {
Expand All @@ -146,7 +166,12 @@ impl fmt::Debug for OnionTunnel {

impl Drop for OnionTunnel {
fn drop(&mut self) {
TUNNEL_COUNT.fetch_sub(1, Ordering::Relaxed);
if self.counted {
let c = TUNNEL_COUNT.fetch_sub(1, Ordering::Relaxed);
debug!("Dropping tunnel with ID {}, count: {}", self.id(), c);
} else {
debug!("Dropping tunnel with ID {}", self.id());
}
}
}

Expand Down Expand Up @@ -384,34 +409,46 @@ impl OnionListener {
}

async fn handle_incoming(&mut self, tunnel: OnionTunnel) {
let mut tunnels = self.tunnels.lock().await;
let tunnels = self.tunnels.clone();
let mut tunnels = tunnels.lock().await;

match tunnels.entry(tunnel.id()) {
hash_map::Entry::Occupied(mut e) => {
// FIXME replace entry if send fails
e.get_mut().send(tunnel).await;
if let Err(t) = e.get_mut().send(tunnel).await {
if let Ok(tunnel_tx) = self.handle_new_tunnel(t.0).await {
e.insert(tunnel_tx);
}
}
}
hash_map::Entry::Vacant(e) => {
let (tunnel_tx, mut tunnel_rx) = mpsc::channel(1);
// FIXME tunnels are never removed from the map
e.insert(tunnel_tx);

let (e_tunnel, mut e_data_tx, mut e_data_rx) = OnionTunnel::new(tunnel.id());
self.incoming.send(e_tunnel).await;

tokio::spawn(async move {
let mut i_tunnel = tunnel;
loop {
tokio::select! {
Some(t) = tunnel_rx.recv() => i_tunnel = t,
Ok(d) = i_tunnel.read() => e_data_tx.send(d).await.unwrap(),
Some(d) = e_data_rx.recv() => i_tunnel.write(d).unwrap(),
else => break,
}
}
});
if let Ok(tunnel_tx) = self.handle_new_tunnel(tunnel).await {
e.insert(tunnel_tx);
}
}
}
}

async fn handle_new_tunnel(
&mut self,
tunnel: OnionTunnel,
) -> Result<mpsc::Sender<OnionTunnel>> {
let (tunnel_tx, tunnel_rx) = mpsc::channel(1);
let (e_tunnel, e_data_tx, e_data_rx) = OnionTunnel::new(tunnel.id(), true);
self.incoming.send(e_tunnel).await?;

tokio::spawn({
let tunnels = self.tunnels.clone();
async move {
let tunnel_id = tunnel.id();
debug!("Handling incoming tunnel {}", tunnel_id);
let _ = tunnel.forward_data(tunnel_rx, e_data_tx, e_data_rx).await;
tunnels.lock().await.remove(&tunnel_id);
debug!("Finished handling incoming tunnel {}", tunnel_id);
}
});

Ok(tunnel_tx)
}
}

/// Tunnels created in one period should be torn down and rebuilt for the next period.
Expand Down
3 changes: 2 additions & 1 deletion src/onion/circuit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ impl CircuitHandler {
state
}
(TunnelRequest::Begin(tunnel_id), State::Default) => {
let (tunnel, tx, rx) = OnionTunnel::new(tunnel_id);
// counted = false because these tunnels will be mapped to counted tunnels by the OnionListener
let (tunnel, tx, rx) = OnionTunnel::new(tunnel_id, false);
if self.incoming.try_send(tunnel).is_ok() {
State::Endpoint {
tunnel_id,
Expand Down
2 changes: 1 addition & 1 deletion src/onion/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl TunnelHandler {
self.state = match (evt, state) {
(Event::Switchover, State::Building { ready }) => {
self.tunnel.begin(&self.builder.rng).await?;
let (tunnel, data_tx, data_rx) = OnionTunnel::new(self.tunnel.id);
let (tunnel, data_tx, data_rx) = OnionTunnel::new(self.tunnel.id, true);
let _ = ready.send(Ok(tunnel)); // TODO handle closed
self.spawn_next_tunnel_task();
State::Ready { data_tx, data_rx }
Expand Down

0 comments on commit 782d92f

Please sign in to comment.