From 989916082898a9efd0c34e4ea0b59daa5bf68cbc Mon Sep 17 00:00:00 2001 From: Max Niederman Date: Fri, 29 Dec 2023 23:25:41 -0800 Subject: [PATCH] handle router reconfiguration in centipede_router --- Cargo.lock | 7 + packages/centipede_proto/src/packet.rs | 84 ++++++-- packages/centipede_router/Cargo.toml | 1 + packages/centipede_router/src/controller.rs | 196 ++++++++++++++++++ packages/centipede_router/src/lib.rs | 212 ++++---------------- packages/centipede_router/src/worker.rs | 182 ++++++++++++----- tmp.toml | 21 -- 7 files changed, 450 insertions(+), 253 deletions(-) create mode 100644 packages/centipede_router/src/controller.rs delete mode 100644 tmp.toml diff --git a/Cargo.lock b/Cargo.lock index 88ca7bc..7934c88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "base64ct" version = "1.6.0" @@ -56,6 +62,7 @@ dependencies = [ name = "centipede_router" version = "0.1.0" dependencies = [ + "arc-swap", "centipede_proto", "chacha20poly1305", ] diff --git a/packages/centipede_proto/src/packet.rs b/packages/centipede_proto/src/packet.rs index 864fb8e..c71e611 100644 --- a/packages/centipede_proto/src/packet.rs +++ b/packages/centipede_proto/src/packet.rs @@ -84,6 +84,15 @@ where } } + /// Interpret the message buffer as a byte slice. + pub fn as_ref(&self) -> Message<&'_ [u8], A, T> { + Message { + buffer: &self.buffer, + _auth: PhantomData, + _text: PhantomData, + } + } + /// Deconstruct the message into its underlying buffer. pub fn to_buffer(self) -> B { self.buffer @@ -205,7 +214,10 @@ where } /// Encrypt the message, fill in its tag, and return its buffer. - pub fn encrypt_in_place(mut self, cipher: &ChaCha20Poly1305) -> B + pub fn encrypt_in_place( + mut self, + cipher: &ChaCha20Poly1305, + ) -> Message where B: DerefMut, { @@ -217,11 +229,20 @@ where header[TAG_RANGE].copy_from_slice(&tag); - self.buffer + Message { + buffer: self.buffer, + _auth: PhantomData, + _text: PhantomData, + } } } impl Message, auth::Valid, text::Plaintext> { + /// Measure the buffer size needed to hold a message with the given packet size. + pub const fn measure(packet_size: usize) -> usize { + PACKET_RANGE.start + packet_size + } + /// Create a new message with an empty packet in a scratch buffer using the given metadata. pub fn new_in(sequence_number: u64, sender: [u8; 8], mut buffer: Vec) -> Self { buffer.clear(); @@ -232,19 +253,33 @@ impl Message, auth::Valid, text::Plaintext> { unsafe { Self::from_buffer_unchecked(buffer) } } - /// Create a new message with an empty packet backed by a [`Vec`] using the given metadata. - pub fn new(sequence_number: u64, sender: [u8; 8]) -> Self { + /// Create a new message backed by a [`Vec`] with capacity for the given packet size using the given metadata. + pub fn new_with_capacity(sequence_number: u64, sender: [u8; 8], packet_size: usize) -> Self { Self::new_in( sequence_number, sender, - Vec::with_capacity(PACKET_RANGE.start), + Vec::with_capacity(PACKET_RANGE.start + packet_size), ) } - /// Overwrite the message's packet data from a reader. - pub fn overwrite_packet(&mut self, mut reader: R) -> io::Result { + /// Create a new message backed by a [`Vec`] using the given metadata. + pub fn new(sequence_number: u64, sender: [u8; 8]) -> Self { + Self::new_with_capacity(sequence_number, sender, 0) + } + + /// Overwrite the message's packet data from an iterator. + pub fn overwrite_packet(&mut self, iter: impl IntoIterator) { + let iter = iter.into_iter(); + + self.reserve_packet(iter.size_hint().0); + self.buffer.truncate(PACKET_RANGE.start); - io::copy(&mut reader, &mut self.buffer) + self.buffer.extend(iter); + } + + /// Reserve space for a packet of the given size. + pub fn reserve_packet(&mut self, size: usize) { + self.buffer.reserve(PACKET_RANGE.start + size); } /// Allocate space for a packet of the given size and return a mutable reference to it. @@ -280,6 +315,23 @@ const NONCE_RANGE: Range = 0..12; const TAG_RANGE: Range = 16..32; const PACKET_RANGE: RangeFrom = 32..; +/// A mutable reference to a [`Vec`] that can be used as a packet message buffer. +pub struct ByteVecMut<'v>(pub &'v mut Vec); + +impl<'v> Deref for ByteVecMut<'v> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl<'v> DerefMut for ByteVecMut<'v> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + /// An error representing a failure to parse a packet message. #[derive(Debug, Error)] pub enum ParseError { @@ -318,7 +370,7 @@ mod tests { assert_eq!(message.claimed_sender(), [1; 8]); assert_eq!(message.claimed_packet_plaintext(), &[]); - message.overwrite_packet(PACKET).unwrap(); + message.overwrite_packet(PACKET.iter().copied()); assert_eq!(message.claimed_packet_plaintext(), b"hello world"); @@ -333,12 +385,11 @@ mod tests { fn encrypt_and_decrypt_in_place() { let mut message = Message::new(1729, [42; 8]); - message.overwrite_packet(PACKET).unwrap(); + message.overwrite_packet(PACKET.iter().copied()); let cipher = ChaCha20Poly1305::new((&[42; 32]).into()); - let ciphertext_raw = message.encrypt_in_place(&cipher); - let ciphertext_message = Message::from_buffer(ciphertext_raw).unwrap(); + let ciphertext_message = message.encrypt_in_place(&cipher); assert_eq!(ciphertext_message.claimed_sequence_number(), 1729); assert_eq!(ciphertext_message.claimed_sender(), [42; 8]); @@ -353,10 +404,13 @@ mod tests { #[test] fn discriminate_packet() { - let message = Message::new(42, [1; 8]); + let plaintext = Message::new(42, [1; 8]); let cipher = ChaCha20Poly1305::new((&[42; 32]).into()); - let buffer = message.encrypt_in_place(&cipher); + let ciphertext = plaintext.encrypt_in_place(&cipher); - assert_eq!(discriminate(buffer).unwrap(), MessageDiscriminant::Packet); + assert_eq!( + discriminate(ciphertext.as_buffer().as_slice()).unwrap(), + MessageDiscriminant::Packet + ); } } diff --git a/packages/centipede_router/Cargo.toml b/packages/centipede_router/Cargo.toml index 40aac36..31fd9aa 100644 --- a/packages/centipede_router/Cargo.toml +++ b/packages/centipede_router/Cargo.toml @@ -6,5 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arc-swap = "1.6.0" centipede_proto = { version = "0.1.0", path = "../centipede_proto" } chacha20poly1305 = "0.10.1" diff --git a/packages/centipede_router/src/controller.rs b/packages/centipede_router/src/controller.rs new file mode 100644 index 0000000..3ebb1f8 --- /dev/null +++ b/packages/centipede_router/src/controller.rs @@ -0,0 +1,196 @@ +use std::sync::{atomic::AtomicU64, Arc}; + +use chacha20poly1305::ChaCha20Poly1305; + +use crate::{ + packet_memory::PacketMemory, ConfiguredRouter, Link, PeerId, RecvTunnel, Router, SendTunnel, +}; + +pub struct Controller<'r> { + router: &'r Router, +} + +impl<'r> Controller<'r> { + /// Create a new controller, given a router. + /// + /// It is a logic error to create a controller for a router when there is already a controller for that router. + pub(crate) fn new(router: &'r Router) -> Self { + Self { router } + } + + /// Reconfigure the router by applying a function to the current configured state. + fn reconfigure(&mut self, f: impl FnOnce(&ConfiguredRouter) -> ConfiguredRouter) { + let prev = self.router.state.load(); + let next = f(prev.as_ref()); + + self.router.state.store(Arc::new(next)); + } + + /// Reconfigure the router by cloning the current configured state and mutating it. + fn reconfigure_mutate(&mut self, f: impl FnOnce(&mut ConfiguredRouter)) { + self.reconfigure(|prev| { + let mut next = prev.clone(); + f(&mut next); + next + }) + } + + /// Insert or update a receive tunnel. + pub fn upsert_receive_tunnel(&mut self, sender_id: PeerId, cipher: ChaCha20Poly1305) { + self.reconfigure_mutate(move |state| { + if let Some(tunnel) = state.recv_tunnels.get_mut(&sender_id) { + tunnel.cipher = cipher; + } else { + state.recv_tunnels.insert( + sender_id, + RecvTunnel { + cipher, + memory: Arc::new(PacketMemory::default()), + }, + ); + } + increment_generation(state); + }); + } + + /// Delete a receive tunnel. + pub fn delete_receive_tunnel(&mut self, sender_id: PeerId) { + self.reconfigure_mutate(move |state| { + state.recv_tunnels.remove(&sender_id); + increment_generation(state); + }); + } + + /// Insert or update a send tunnel. + pub fn upsert_send_tunnel( + &mut self, + receiver_id: PeerId, + cipher: ChaCha20Poly1305, + links: Vec, + ) { + self.reconfigure_mutate(move |state| { + if let Some(tunnel) = state.send_tunnels.get_mut(&receiver_id) { + tunnel.cipher = cipher; + tunnel.links = links; + } else { + state.send_tunnels.insert( + receiver_id, + SendTunnel { + links, + cipher, + next_sequence_number: Arc::new(AtomicU64::new(0)), + }, + ); + } + increment_generation(state); + }); + } + + /// Delete a send tunnel. + pub fn delete_send_tunnel(&mut self, receiver_id: PeerId) { + self.reconfigure_mutate(move |state| { + state.send_tunnels.remove(&receiver_id); + increment_generation(state); + }); + } +} + +fn increment_generation(state: &mut ConfiguredRouter) { + state.generation = state.generation.wrapping_add(1); +} + +#[cfg(test)] +mod tests { + use std::{net::SocketAddr, sync::atomic::Ordering}; + + use chacha20poly1305::KeyInit; + + use crate::{packet_memory::PacketRecollection, Router}; + + use super::*; + + #[test] + fn construct() { + Router::new([0; 8], vec![]); + } + + fn state<'c>(controller: &Controller) -> Arc { + controller.router.state.load_full() + } + + #[test] + fn crud_receive_tunnel() { + let mut router = Router::new([0; 8], vec![]); + let (mut controller, _) = router.handles(0); + + controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); + controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into())); + + assert!(state(&controller).recv_tunnels.contains_key(&[1; 8])); + + controller.delete_receive_tunnel([1; 8]); + + assert!(!state(&controller).recv_tunnels.contains_key(&[1; 8])); + } + + #[test] + fn crud_send_tunnel() { + let mut router = Router::new([0; 8], vec![]); + let (mut controller, _) = router.handles(0); + + let link = Link { + local: SocketAddr::from(([0, 0, 0, 0], 0)), + remote: SocketAddr::from(([0, 0, 0, 1], 1)), + }; + + controller.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()), vec![link]); + + assert_eq!(state(&controller).send_tunnels[&[1; 8]].links, vec![link]); + + controller.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into()), vec![]); + + assert_eq!(state(&controller).send_tunnels[&[1; 8]].links, vec![]); + + controller.delete_send_tunnel([1; 8]); + + assert!(!state(&controller).send_tunnels.contains_key(&[1; 8])); + } + + #[test] + fn receive_updates_preserve_state() { + let mut router = Router::new([0; 8], vec![]); + let (mut controller, _) = router.handles(0); + + controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); + + state(&controller).recv_tunnels[&[1; 8]].memory.observe(0); + + controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into())); + + assert_eq!( + state(&controller).recv_tunnels[&[1; 8]].memory.observe(0), + PacketRecollection::Seen + ) + } + + #[test] + fn send_updates_preserve_state() { + let mut router = Router::new([0; 8], vec![]); + let (mut controller, _) = router.handles(0); + + controller.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()), vec![]); + + state(&controller).send_tunnels[&[1; 8]] + .next_sequence_number + .store(1, Ordering::SeqCst); + + controller.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into()), vec![]); + + assert_eq!( + state(&controller).send_tunnels[&[1; 8]] + .next_sequence_number + .load(Ordering::SeqCst), + 1 + ); + } +} diff --git a/packages/centipede_router/src/lib.rs b/packages/centipede_router/src/lib.rs index 4e8a6d0..c2db9d4 100644 --- a/packages/centipede_router/src/lib.rs +++ b/packages/centipede_router/src/lib.rs @@ -1,19 +1,33 @@ pub mod worker; +pub mod controller; mod packet_memory; use std::{ collections::HashMap, + iter, net::SocketAddr, sync::{atomic::AtomicU64, Arc}, }; +use arc_swap::ArcSwap; use chacha20poly1305::ChaCha20Poly1305; +use controller::Controller; use packet_memory::PacketMemory; +use worker::Worker; /// The shared state of a Centipede tunnel router. -#[derive(Clone)] pub struct Router { + /// The configured state of the router. + state: ArcSwap, +} + +/// The shared state of a configured Centipede tunnel router. +#[derive(Clone)] +struct ConfiguredRouter { + /// The generation of this configuration. + generation: u64, + /// Our local peer identifier. local_id: PeerId, @@ -40,195 +54,49 @@ struct RecvTunnel { /// The state of a sending tunnel. #[derive(Clone)] struct SendTunnel { - /// Local addresses over which to send messages, - /// along with an optional endpoint to send as the opposite endpoint. - local_addrs: Vec, + /// Address pairs on which to send messages. + links: Vec, /// Cipher with which to encrypt messages, by sending endpoint. cipher: ChaCha20Poly1305, - /// Addresses of the remote endpoints. - remote_addrs: Vec, - /// The next sequence number. next_sequence_number: Arc, } +/// The two endpoint addresses of a tunnel link. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Link { + /// The local address. + local: SocketAddr, + + /// The remote address. + remote: SocketAddr, +} + pub type PeerId = [u8; 8]; impl Router { /// Create a new router. pub fn new(peer_id: PeerId, recv_addrs: Vec) -> Self { Self { - local_id: peer_id, - recv_addrs, - recv_tunnels: HashMap::new(), - send_tunnels: HashMap::new(), + state: ArcSwap::from_pointee(ConfiguredRouter { + generation: 0, + local_id: peer_id, + recv_addrs, + recv_tunnels: HashMap::new(), + send_tunnels: HashMap::new(), + }), } } - /// Insert or update a receive tunnel. - pub fn upsert_receive_tunnel(&mut self, sender_id: PeerId, cipher: ChaCha20Poly1305) { - let mut cipher = Some(cipher); - self.recv_tunnels - .entry(sender_id) - .and_modify(|tunnel| { - tunnel.cipher = cipher.take().unwrap(); - }) - .or_insert_with(|| RecvTunnel { - cipher: cipher.take().unwrap(), - memory: Arc::new(PacketMemory::default()), - }); - } - - /// Delete a receive tunnel. - pub fn delete_receive_tunnel(&mut self, sender_id: PeerId) { - self.recv_tunnels.remove(&sender_id); - } - - /// Insert or update a send tunnel. - pub fn upsert_send_tunnel( - &mut self, - receiver_id: PeerId, - cipher: ChaCha20Poly1305, - local_addrs: Vec, - remote_addrs: Vec, - ) { - let mut cipher = Some(cipher); - let mut local_addrs = Some(local_addrs); - let mut remote_addrs = Some(remote_addrs); - - self.send_tunnels - .entry(receiver_id) - .and_modify(|tunnel| { - tunnel.cipher = cipher.take().unwrap(); - tunnel.local_addrs = local_addrs.take().unwrap(); - tunnel.remote_addrs = remote_addrs.take().unwrap(); - }) - .or_insert_with(|| SendTunnel { - local_addrs: local_addrs.take().unwrap(), - cipher: cipher.take().unwrap(), - remote_addrs: remote_addrs.take().unwrap(), - next_sequence_number: Arc::new(AtomicU64::new(0)), - }); - } - - /// Delete a send tunnel. - pub fn delete_send_tunnel(&mut self, receiver_id: PeerId) { - self.send_tunnels.remove(&receiver_id); - } -} - -#[cfg(test)] -mod control_tests { - use std::sync::atomic::Ordering; - - use chacha20poly1305::KeyInit; - - use crate::packet_memory::PacketRecollection; - - use super::*; - - #[test] - fn construct() { - Router::new([0; 8], vec![]); - } - - #[test] - fn crud_receive_tunnel() { - let mut router = Router::new([0; 8], vec![]); - - router.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); - router.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into())); - - assert!(router.recv_tunnels.contains_key(&[1; 8])); - - router.delete_receive_tunnel([1; 8]); - - assert!(!router.recv_tunnels.contains_key(&[1; 8])); - } - - #[test] - fn crud_send_tunnel() { - let mut router = Router::new([0; 8], vec![]); - - router.upsert_send_tunnel( - [1; 8], - ChaCha20Poly1305::new((&[0; 32]).into()), - vec![SocketAddr::from(([0, 0, 0, 0], 0))], - vec![], - ); - - assert_eq!( - router.send_tunnels[&[1; 8]].local_addrs, - vec![SocketAddr::from(([0, 0, 0, 0], 0))] - ); - assert_eq!(router.send_tunnels[&[1; 8]].remote_addrs, vec![]); - - router.upsert_send_tunnel( - [1; 8], - ChaCha20Poly1305::new((&[1; 32]).into()), - vec![SocketAddr::from(([0, 0, 0, 0], 0))], - vec![SocketAddr::from(([0, 0, 0, 0], 1))], - ); - - assert_eq!( - router.send_tunnels[&[1; 8]].local_addrs, - vec![SocketAddr::from(([0, 0, 0, 0], 0))] - ); - assert_eq!( - router.send_tunnels[&[1; 8]].remote_addrs, - vec![SocketAddr::from(([0, 0, 0, 0], 1))] - ); - - router.delete_send_tunnel([1; 8]); - - assert!(!router.send_tunnels.contains_key(&[1; 8])); - } - - #[test] - fn receive_updates_preserve_state() { - let mut router = Router::new([0; 8], vec![]); + /// Get one controller and N worker handles to the router. + pub fn handles(&mut self, n: usize) -> (Controller<'_>, Vec>) { + let this = &*self; - router.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); - - router.recv_tunnels[&[1; 8]].memory.observe(0); - - router.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into())); - - assert_eq!( - router.recv_tunnels[&[1; 8]].memory.observe(0), - PacketRecollection::Seen - ) - } + let controller = Controller::new(this); + let workers = iter::repeat_with(|| Worker::new(this)).take(n).collect(); - #[test] - fn send_updates_preserve_state() { - let mut router = Router::new([0; 8], vec![]); - - router.upsert_send_tunnel( - [1; 8], - ChaCha20Poly1305::new((&[0; 32]).into()), - vec![SocketAddr::from(([0, 0, 0, 0], 0))], - vec![], - ); - - router.send_tunnels[&[1; 8]] - .next_sequence_number - .store(1, Ordering::SeqCst); - - router.upsert_send_tunnel( - [1; 8], - ChaCha20Poly1305::new((&[1; 32]).into()), - vec![SocketAddr::from(([0, 0, 0, 0], 0))], - vec![], - ); - - assert_eq!( - router.send_tunnels[&[1; 8]] - .next_sequence_number - .load(Ordering::SeqCst), - 1 - ) + (controller, workers) } } diff --git a/packages/centipede_router/src/worker.rs b/packages/centipede_router/src/worker.rs index 654fd99..76b51af 100644 --- a/packages/centipede_router/src/worker.rs +++ b/packages/centipede_router/src/worker.rs @@ -2,33 +2,55 @@ use centipede_proto::{ marker::{auth, text}, PacketMessage, }; -use chacha20poly1305::ChaCha20Poly1305; -use crate::{packet_memory::PacketRecollection, PeerId, Router}; +use crate::{ + packet_memory::PacketRecollection, ConfiguredRouter, Link, PeerId, Router, SendTunnel, +}; use std::{ - net::SocketAddr, + collections::hash_map, + iter, mem, ops::{Deref, DerefMut}, + pin::Pin, + slice, + sync::{atomic::Ordering, Arc}, }; -/// Handle an incoming packet message from the Centipede network. -pub fn handle_incoming( - router: &Router, - message: PacketMessage, -) -> Option> -where - B: DerefMut, -{ - let tunnel = router.recv_tunnels.get(&message.claimed_sender())?; - let decrypted = message.decrypt_in_place(&tunnel.cipher).ok()?; - - match tunnel.memory.observe(decrypted.sequence_number()) { - PacketRecollection::New => Some(ReceivePacket { decrypted }), - PacketRecollection::Seen => None, - PacketRecollection::Confusing => None, - } +pub struct Worker<'r> { + router: &'r Router, } +impl<'r> Worker<'r> { + /// Create a new worker. + pub(crate) fn new(router: &'r Router) -> Self { + Self { router } + } + + /// Handle an incoming packet message from the Centipede network. + pub fn handle_incoming( + &mut self, + message: PacketMessage, + ) -> Option> + where + B: DerefMut, + { + let state = self.router.state.load(); + + let tunnel = state.recv_tunnels.get(&message.claimed_sender())?; + let decrypted = message.decrypt_in_place(&tunnel.cipher).ok()?; + + match tunnel.memory.observe(decrypted.sequence_number()) { + PacketRecollection::New => Some(ReceivePacket { decrypted }), + PacketRecollection::Seen => None, + PacketRecollection::Confusing => None, + } + } + + /// Handle an outgoing packet from the system's networking stack. + pub fn handle_outgoing<'p>(&mut self, packet: &'p [u8]) -> HandleOutgoing<'p> { + HandleOutgoing::start(self.router, packet) + } +} /// The obligation to /// receive a packet from the Centipede network /// and hand it off to the system's networking stack. @@ -45,39 +67,109 @@ impl> ReceivePacket { } } -/// Handle an outgoing packet from the system's networking stack. -pub fn handle_outgoing<'r>( - router: &'r Router, - _packet: &'r [u8], -) -> impl Iterator> + 'r { - // TODO: route based on destination address - - router.send_tunnels.values().flat_map(move |tunnel| { - tunnel.remote_addrs.iter().map(move |peer_addr| SendPacket { - sender_id: &router.local_id, - cipher: &tunnel.cipher, - sequence_number: tunnel - .next_sequence_number - .fetch_add(1, std::sync::atomic::Ordering::SeqCst), - peer_addr, - }) - }) +/// A coroutine yielding packets to send. +pub struct HandleOutgoing<'p> { + /// Plaintext of the outgoing packet being handled. + packet_plaintext: &'p [u8], + + /// Iterator over the send tunnels. + /// Peekable so we can access the current tunnel without consuming it. + /// + /// Note that the 'static lifetime here is a lie, and the iterator + /// actually borrows from the router state. + tunnels: iter::Peekable>, + + /// Iterator over the links of the current tunnel. + /// + /// Note that the 'static lifetime here is a lie, and the iterator + /// actually borrows from the router state. + remaining_links: slice::Iter<'static, Link>, + + /// Arc pointer owning the router state, + /// preventing it from being dropped while this coroutine is running. + /// + /// Note that this must be after the iterators, so that they are dropped first. + router_state: Pin>>, +} + +impl<'p> HandleOutgoing<'p> { + /// Create a new coroutine yielding packets to send. + fn start(router: &Router, packet: &'p [u8]) -> Self { + let router_state = router.state.load(); + + let mut tunnels = unsafe { + // SAFETY: The iterator refers to the configured router, + // which lives as long as `Self`, and is never + // moved because it is behind a `Pin`. + + mem::transmute::< + iter::Peekable>, + iter::Peekable>, + >(router_state.send_tunnels.values().peekable()) + }; + + let remaining_links = tunnels + .peek() + .map(|tunnel| tunnel.links.iter()) + .unwrap_or_default(); + + Self { + packet_plaintext: packet, + router_state: Pin::new(router_state), + tunnels, + remaining_links, + } + } + + /// Resume the coroutine, yielding the next packet to send. + pub fn resume(&mut self, scratch: Vec) -> Option { + match self.remaining_links.next() { + Some(&link) => { + let tunnel = *self.tunnels.peek()?; + + let sequence_number = tunnel.next_sequence_number.fetch_add(1, Ordering::Relaxed); + + let mut message = + PacketMessage::new_in(sequence_number, self.router_state.local_id, scratch); + message.overwrite_packet(self.packet_plaintext.iter().copied()); + let message = message.encrypt_in_place(&tunnel.cipher); + + Some(SendPacket { link, message }) + } + None => { + self.tunnels.next()?; + self.remaining_links = self.tunnels.peek().unwrap().links.iter(); + None + } + } + } } /// The obligation to /// send a packet from the system's networking stack /// to another peer on the Centipede network. #[must_use] -pub struct SendPacket<'r> { - /// The ID of the sending peer. - pub sender_id: &'r PeerId, +pub struct SendPacket { + /// The link to send the packet over. + link: Link, + + /// The encrypted packet message to send. + message: PacketMessage, auth::Valid, text::Ciphertext>, +} - /// The cipher with which to encrypt the packet. - pub cipher: &'r ChaCha20Poly1305, +impl SendPacket { + /// Get the link to send the packet over. + pub fn link(&self) -> Link { + self.link + } - /// The sequence number of the packet. - pub sequence_number: u64, + /// Get the packet message to send. + pub fn message(&self) -> PacketMessage<&'_ [u8], auth::Valid, text::Ciphertext> { + self.message.as_ref() + } - /// The internet address of the peer. - pub peer_addr: &'r SocketAddr, + /// Fulfill the obligation to send the packet, getting back the scratch space. + pub fn fulfill(self) -> Vec { + self.message.to_buffer() + } } diff --git a/tmp.toml b/tmp.toml deleted file mode 100644 index ee248bb..0000000 --- a/tmp.toml +++ /dev/null @@ -1,21 +0,0 @@ -base64 = "0.21.5" -bincode = "1.3.3" -chacha20poly1305 = { version = "0.10.1", features = ["std"] } -cidr = { version = "0.2.2", features = ["serde"] } -ed25519-dalek = { version = "2.0.0", features = ["rand_core", "serde"] } -flurry = "0.4.0" -hypertube = "0.2.0" -log = "0.4.20" -mio = { version = "0.8.8", features = ["os-poll", "os-ext"] } -num_cpus = "1.16.0" -pretty_env_logger = "0.5.0" -rand = "0.8.5" -replace_with = "0.1.7" -serde = { version = "1.0.188", features = ["derive"] } -serde_with = { version = "3.4.0", features = ["base64"] } -socket2 = { version = "0.5.3", features = ["all"] } -stakker = { version = "0.2.10", features = ["logger"] } -stakker_mio = "0.2.5" -thiserror = "1.0.47" -toml = "0.7.6" -x25519-dalek = { version = "2.0.0", features = ["serde"] } \ No newline at end of file