Skip to content

Commit

Permalink
fix: reset tunnel state on reconnection
Browse files Browse the repository at this point in the history
Specifically, tunnels in the router configuration are now stamped with the handshake time. Applying a config with a new handshake timestamp for a tunnel wipes its state.
  • Loading branch information
max-niederman committed Apr 23, 2024
1 parent 209ec4c commit 4dbe326
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 8 deletions.
4 changes: 4 additions & 0 deletions packages/centipede_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ impl<R: Rng + CryptoRng> Controller<R> {
self.router_config.recv_tunnels.insert(
public_key_to_peer_id(message.sender()),
centipede_router::config::RecvTunnel {
initialized_at: *handshake_timestamp,
cipher: cipher.clone(),
},
);
Expand All @@ -471,6 +472,7 @@ impl<R: Rng + CryptoRng> Controller<R> {
self.router_config.send_tunnels.insert(
public_key_to_peer_id(message.sender()),
centipede_router::config::SendTunnel {
initialized_at: *handshake_timestamp,
cipher: cipher.clone(),
links: HashSet::new(),
},
Expand Down Expand Up @@ -529,6 +531,7 @@ impl<R: Rng + CryptoRng> Controller<R> {
self.router_config.recv_tunnels.insert(
public_key_to_peer_id(message.sender()),
centipede_router::config::RecvTunnel {
initialized_at: handshake_timestamp,
cipher: cipher.clone(),
},
);
Expand All @@ -545,6 +548,7 @@ impl<R: Rng + CryptoRng> Controller<R> {
self.router_config.send_tunnels.insert(
public_key_to_peer_id(message.sender()),
centipede_router::config::SendTunnel {
initialized_at: handshake_timestamp,
cipher: cipher.clone(),
links: local_addrs
.iter()
Expand Down
152 changes: 146 additions & 6 deletions packages/centipede_router/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
collections::{HashMap, HashSet}, net::SocketAddr, sync::Arc, time::SystemTime
};

use chacha20poly1305::ChaCha20Poly1305;
Expand All @@ -27,13 +25,21 @@ pub struct Router {
/// Configuration of a receiving tunnel.
#[derive(Clone)]
pub struct RecvTunnel {
// TODO: should this really be a time type or just an opaque identifier?
// arguably the router shouldn't care about the time
/// Timestamp at which the tunnel was initialized. Used to reset the memory.
pub initialized_at: SystemTime,

/// Cipher with which to decrypt messages.
pub cipher: ChaCha20Poly1305,
}

/// Configuration of a sending tunndfel.
#[derive(Clone)]
pub struct SendTunnel {
/// Timestamp at which the tunnel was initialized. Used to reset the sequence number.
pub initialized_at: SystemTime,

/// Cipher with which to encrypt messages.
pub cipher: ChaCha20Poly1305,

Expand All @@ -55,11 +61,13 @@ pub(crate) fn apply(config: &Router, state: &crate::ConfiguredRouter) -> crate::
(
*id,
crate::RecvTunnel {
initialized_at: tun.initialized_at,
cipher: tun.cipher.clone(),
memory: state
.recv_tunnels
.get(id)
.map(|tun| tun.memory.clone())
.filter(|old| old.initialized_at == tun.initialized_at)
.map(|old| old.memory.clone())
.unwrap_or_default(),
},
)
Expand All @@ -72,13 +80,15 @@ pub(crate) fn apply(config: &Router, state: &crate::ConfiguredRouter) -> crate::
(
*id,
crate::SendTunnel {
initialized_at: tun.initialized_at,
links: tun.links.iter().copied().collect(),
cipher: tun.cipher.clone(),
next_sequence_number: state
.send_tunnels
.get(id)
.map(|tun| tun.next_sequence_number.clone())
.unwrap_or(Arc::new(AtomicU64::new(0))),
.filter(|old| old.initialized_at == tun.initialized_at)
.map(|old| old.next_sequence_number.clone())
.unwrap_or_default()
},
)
})
Expand Down Expand Up @@ -107,6 +117,8 @@ impl<'r> ConfiguratorHandle<'r> {

#[cfg(test)]
mod tests {
use std::time::Duration;

use chacha20poly1305::KeyInit;

use super::*;
Expand Down Expand Up @@ -154,6 +166,7 @@ mod tests {
map.insert(
[2; 8],
super::SendTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: ChaCha20Poly1305::new((&[0; 32]).into()),
links: [Link {
local: SocketAddr::from(([127, 0, 0, 1], 0)),
Expand Down Expand Up @@ -199,6 +212,7 @@ mod tests {
map.insert(
[2; 8],
super::SendTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: ChaCha20Poly1305::new((&[0; 32]).into()),
links: [Link {
local: SocketAddr::from(([127, 0, 0, 1], 0)),
Expand All @@ -221,6 +235,74 @@ mod tests {
);
}

#[test]
fn reinitializing_send_tunnel_resets_sequence() {
let mut state = crate::ConfiguredRouter::default();

state = apply(
&super::Router {
local_id: [1; 8],
recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))]
.into_iter()
.collect(),
recv_tunnels: HashMap::new(),
send_tunnels: {
let mut map = HashMap::new();
map.insert(
[2; 8],
super::SendTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: ChaCha20Poly1305::new((&[0; 32]).into()),
links: [Link {
local: SocketAddr::from(([127, 0, 0, 1], 0)),
remote: SocketAddr::from(([127, 0, 0, 1], 0)),
}]
.into_iter()
.collect(),
},
);
map
},
},
&state,
);
let sequence = state.send_tunnels[&[2; 8]].next_sequence_number.as_ref() as *const _;

state = apply(
&super::Router {
local_id: [1; 8],
recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))]
.into_iter()
.collect(),
recv_tunnels: HashMap::new(),
send_tunnels: {
let mut map = HashMap::new();
map.insert(
[2; 8],
super::SendTunnel {
initialized_at: SystemTime::UNIX_EPOCH + Duration::from_secs(1),
cipher: ChaCha20Poly1305::new((&[0; 32]).into()),
links: [Link {
local: SocketAddr::from(([127, 0, 0, 1], 0)),
remote: SocketAddr::from(([127, 0, 0, 1], 0)),
}]
.into_iter()
.collect(),
},
);
map
},
},
&state,
);

assert_ne!(
state.send_tunnels[&[2; 8]].next_sequence_number.as_ref() as *const _,
sequence,
"sequence number generator was preserved"
);
}

#[test]
fn updating_recv_tunnel_preserves_memory() {
let mut state = crate::ConfiguredRouter::default();
Expand All @@ -236,6 +318,7 @@ mod tests {
map.insert(
[2; 8],
super::RecvTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: ChaCha20Poly1305::new((&[0; 32]).into()),
},
);
Expand Down Expand Up @@ -268,6 +351,7 @@ mod tests {
map.insert(
[2; 8],
super::RecvTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: ChaCha20Poly1305::new((&[1; 32]).into()),
},
);
Expand All @@ -285,4 +369,60 @@ mod tests {
"packet memory was not preserved"
);
}

#[test]
fn reinitializing_recv_tunnel_resets_memory() {
let mut state = crate::ConfiguredRouter::default();

state = apply(
&super::Router {
local_id: [1; 8],
recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))]
.into_iter()
.collect(),
recv_tunnels: {
let mut map = HashMap::new();
map.insert(
[2; 8],
super::RecvTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: ChaCha20Poly1305::new((&[0; 32]).into()),
},
);
map
},
send_tunnels: HashMap::new(),
},
&state,
);
let memory = state.recv_tunnels[&[2; 8]].memory.as_ref() as *const _;

state = apply(
&super::Router {
local_id: [1; 8],
recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))]
.into_iter()
.collect(),
recv_tunnels: {
let mut map = HashMap::new();
map.insert(
[2; 8],
super::RecvTunnel {
initialized_at: SystemTime::UNIX_EPOCH + Duration::from_secs(1),
cipher: ChaCha20Poly1305::new((&[1; 32]).into()),
},
);
map
},
send_tunnels: HashMap::new(),
},
&state,
);

assert_ne!(
state.recv_tunnels[&[2; 8]].memory.as_ref() as *const _,
memory,
"packet memory was preserved"
);
}
}
7 changes: 7 additions & 0 deletions packages/centipede_router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::SystemTime,
};

use arc_swap::ArcSwap;
Expand Down Expand Up @@ -52,6 +53,9 @@ struct ConfiguredRouter {
/// The state of a receiving tunnel.
#[derive(Clone)]
struct RecvTunnel {
/// Timestamp at which the tunnel was initialized. Used to reset the memory.
initialized_at: SystemTime,

/// Cipher with which to decrypt messages.
cipher: ChaCha20Poly1305,

Expand All @@ -62,6 +66,9 @@ struct RecvTunnel {
/// The state of a sending tunnel.
#[derive(Clone)]
struct SendTunnel {
/// Timestamp at which the tunnel was initialized. Used to reset the memory.
initialized_at: SystemTime,

/// Address pairs on which to send messages.
links: Vec<Link>,

Expand Down
4 changes: 3 additions & 1 deletion packages/centipede_router/tests/udp_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
io,
net::{SocketAddr, UdpSocket},
thread,
time::Duration,
time::{Duration, SystemTime},
};

use centipede_proto::{
Expand Down Expand Up @@ -61,6 +61,7 @@ fn half_duplex_single_message() {
map.insert(
[1; 8],
config::SendTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: dummy_cipher(),
links: ctx.possible_links_to([1; 8]),
},
Expand Down Expand Up @@ -99,6 +100,7 @@ fn half_duplex_single_message() {
map.insert(
[0; 8],
config::RecvTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: dummy_cipher(),
},
);
Expand Down
4 changes: 3 additions & 1 deletion packages/centipede_router/tests/udp_perf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![feature(test)]

extern crate test;
use std::{mem, net::UdpSocket, thread};
use std::{mem, net::UdpSocket, thread, time::SystemTime};

use centipede_proto::PacketMessage;
use centipede_router::{config, Link, Router};
Expand Down Expand Up @@ -45,6 +45,7 @@ fn half_duplex_iter(packet_size: usize, num_packets: usize) {
recv_tunnels: [(
[0; 8],
config::RecvTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
cipher: dummy_cipher(),
},
)]
Expand Down Expand Up @@ -78,6 +79,7 @@ fn half_duplex_iter(packet_size: usize, num_packets: usize) {
send_tunnels: [(
[1; 8],
config::SendTunnel {
initialized_at: SystemTime::UNIX_EPOCH,
links: [Link {
local: recv_addr,
remote: recv_addr,
Expand Down

0 comments on commit 4dbe326

Please sign in to comment.