diff --git a/Cargo.lock b/Cargo.lock index 5a0798e5ad..269903312e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,6 +185,18 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -1151,6 +1163,27 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -2233,6 +2266,7 @@ version = "0.33.0" dependencies = [ "aead", "anyhow", + "async-channel", "atomic-waker", "axum", "backon", diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 89d7275738..65b76ddb2b 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -23,6 +23,7 @@ workspace = true [dependencies] aead = { version = "0.5.2", features = ["bytes"] } anyhow = { version = "1" } +async-channel = "2.3.1" atomic-waker = "1.1.2" concurrent-queue = "2.5" backon = { version = "1.4" } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index c23e5cf08b..f620121fd7 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -228,7 +228,9 @@ pub(crate) struct MagicSock { disco_secrets: DiscoSecrets, /// UDP disco (ping) queue - udp_disco_sender: mpsc::Sender<(SocketAddr, PublicKey, disco::Message)>, + udp_disco_sender: async_channel::Sender<(SocketAddr, PublicKey, disco::Message)>, + udp_disco_receiver: async_channel::Receiver<(SocketAddr, PublicKey, disco::Message)>, + udp_disco_blocked: std::sync::Mutex>, /// Optional discovery service discovery: Option>, @@ -474,6 +476,79 @@ impl MagicSock { "connection closed", )); } + + // First empty out any disco messages, these have priority. Best-effort though, do + // not propagate any errors. + let mut pop_udp_disco_receiver = true; + while let Ok((dst, dst_key, msg)) = self.udp_disco_blocked.lock().unwrap().pop() { + let span = info_span!("udp-disco", %dst, node = %dst_key.fmt_short(), ?msg); + let _guard = span.enter(); + let packet = self.encode_disco_message(dst_key, &msg); + inc!(MagicsockMetrics, send_disco_udp); + let transmit = quinn_udp::Transmit { + destination: dst, + ecn: None, + contents: &packet, + segment_size: None, + src_ip: None, + }; + match self.try_send_udp(dst, &transmit) { + Ok(_) => { + inc!(MagicsockMetrics, send_disco_udp); + trace!("sent message"); + } + Err(err) if err.kind() != io::ErrorKind::WouldBlock => { + warn!(?err, "failed to send"); + pop_udp_disco_receiver = false; + break; + } + _ => { + trace!("would block"); + self.udp_disco_blocked + .lock() + .unwrap() + .push((dst, dst_key, msg)) + .ok(); + pop_udp_disco_receiver = false; + break; + } + } + } + if pop_udp_disco_receiver { + while let Ok((dst, dst_key, msg)) = self.udp_disco_receiver.try_recv() { + let span = info_span!("udp-disco", %dst, node = %dst_key.fmt_short(), ?msg); + let _guard = span.enter(); + let packet = self.encode_disco_message(dst_key, &msg); + inc!(MagicsockMetrics, send_disco_udp); + let transmit = quinn_udp::Transmit { + destination: dst, + ecn: None, + contents: &packet, + segment_size: None, + src_ip: None, + }; + match self.try_send_udp(dst, &transmit) { + Ok(_) => { + inc!(MagicsockMetrics, send_disco_udp); + trace!("sent message"); + } + Err(err) if err.kind() != io::ErrorKind::WouldBlock => { + warn!(?err, "failed to send"); + break; + } + _ => { + trace!("would block"); + self.udp_disco_blocked + .lock() + .unwrap() + .push((dst, dst_key, msg)) + .ok(); + break; + } + } + } + } + match MappedAddr::from(transmit.destination) { MappedAddr::None(dest) => { error!(%dest, "Cannot convert to a mapped address, voiding transmit."); @@ -1704,7 +1779,7 @@ impl Handle { let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256); let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel(); let relay_datagram_recv_queue = Arc::new(RelayDatagramRecvQueue::new()); - let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256); + let (udp_disco_sender, udp_disco_receiver) = async_channel::bounded(256); // load the node data let node_map = node_map.unwrap_or_default(); @@ -1736,6 +1811,8 @@ impl Handle { node_map, ip_mapped_addrs, udp_disco_sender, + udp_disco_receiver: udp_disco_receiver.clone(), + udp_disco_blocked: std::sync::Mutex::new(ConcurrentQueue::unbounded()), discovery, discovery_user_data: RwLock::new(discovery_user_data), direct_addrs: Default::default(), @@ -1783,7 +1860,7 @@ impl Handle { let _ = actor_tasks.spawn({ let msock = msock.clone(); async move { - while let Some((dst, dst_key, msg)) = udp_disco_receiver.recv().await { + while let Ok((dst, dst_key, msg)) = udp_disco_receiver.recv().await { if let Err(err) = msock.send_disco_message_udp(dst, dst_key, &msg).await { warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)"); }