From 3949b97ac9219222e7d6754fe61d4338aae98bc4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Sep 2024 19:09:34 +0300 Subject: [PATCH 1/9] tcp: Make socket futures cancelable Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 76 +++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 856c9410..56034cf7 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -39,6 +39,7 @@ use crate::{ use futures::{ future::BoxFuture, stream::{FuturesUnordered, Stream, StreamExt}, + TryFutureExt, }; use multiaddr::Multiaddr; use socket2::{Domain, Socket, Type}; @@ -70,6 +71,25 @@ struct PendingInboundConnection { address: SocketAddr, } +#[derive(Debug)] +enum RawConnectionResult { + /// The first successful connection. + Connected { + connection_id: ConnectionId, + address: Multiaddr, + stream: TcpStream, + }, + + /// All connection attempts failed. + Failed { + connection_id: ConnectionId, + errors: Vec<(Multiaddr, DialError)>, + }, + + /// Future was canceled. + Canceled, +} + /// TCP transport. pub(crate) struct TcpTransport { /// Transport context. @@ -96,15 +116,7 @@ pub(crate) struct TcpTransport { >, /// Pending raw, unnegotiated connections. - pending_raw_connections: FuturesUnordered< - BoxFuture< - 'static, - Result< - (ConnectionId, Multiaddr, TcpStream), - (ConnectionId, Vec<(Multiaddr, DialError)>), - >, - >, - >, + pending_raw_connections: FuturesUnordered>, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, @@ -412,11 +424,17 @@ impl Transport for TcpTransport { }) .collect(); - self.pending_raw_connections.push(Box::pin(async move { + // Future that will resolve to the first successful connection. + let future = async move { let mut errors = Vec::with_capacity(num_addresses); while let Some(result) = futures.next().await { match result { - Ok((address, stream)) => return Ok((connection_id, address, stream)), + Ok((address, stream)) => + return RawConnectionResult::Connected { + connection_id, + address, + stream, + }, Err(error) => { tracing::debug!( target: LOG_TARGET, @@ -429,8 +447,15 @@ impl Transport for TcpTransport { } } - Err((connection_id, errors)) - })); + RawConnectionResult::Failed { + connection_id, + errors, + } + }; + + let (fut, handle) = futures::future::abortable(future); + let fut = fut.unwrap_or_else(|_| RawConnectionResult::Canceled); + self.pending_raw_connections.push(Box::pin(fut)); Ok(()) } @@ -523,16 +548,14 @@ impl Stream for TcpTransport { } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { - match result { - Ok((connection_id, address, stream)) => { - tracing::trace!( - target: LOG_TARGET, - ?connection_id, - ?address, - canceled = self.canceled.contains(&connection_id), - "connection opened", - ); + tracing::trace!(target: LOG_TARGET, ?result, "raw connection result"); + match result { + RawConnectionResult::Connected { + connection_id, + address, + stream, + } => if !self.canceled.remove(&connection_id) { self.opened_raw.insert(connection_id, (stream, address.clone())); @@ -540,15 +563,18 @@ impl Stream for TcpTransport { connection_id, address, })); - } - } - Err((connection_id, errors)) => + }, + RawConnectionResult::Failed { + connection_id, + errors, + } => if !self.canceled.remove(&connection_id) { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); }, + RawConnectionResult::Canceled => (), } } From 85142b5e9b14c31ca60488d1d4d522d681b7a11b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Sep 2024 19:16:19 +0300 Subject: [PATCH 2/9] tcp: Abort futures for canceling dialing operations Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 56034cf7..657250b2 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -38,7 +38,7 @@ use crate::{ use futures::{ future::BoxFuture, - stream::{FuturesUnordered, Stream, StreamExt}, + stream::{AbortHandle, FuturesUnordered, Stream, StreamExt}, TryFutureExt, }; use multiaddr::Multiaddr; @@ -87,7 +87,7 @@ enum RawConnectionResult { }, /// Future was canceled. - Canceled, + Canceled { connection_id: ConnectionId }, } /// TCP transport. @@ -124,6 +124,8 @@ pub(crate) struct TcpTransport { /// Canceled raw connections. canceled: HashSet, + cancel_futures: HashMap, + /// Connections which have been opened and negotiated but are being validated by the /// `TransportManager`. pending_open: HashMap, @@ -296,6 +298,7 @@ impl TransportBuilder for TcpTransport { pending_inbound_connections: HashMap::new(), pending_connections: FuturesUnordered::new(), pending_raw_connections: FuturesUnordered::new(), + cancel_futures: HashMap::new(), }, listen_addresses, )) @@ -454,8 +457,9 @@ impl Transport for TcpTransport { }; let (fut, handle) = futures::future::abortable(future); - let fut = fut.unwrap_or_else(|_| RawConnectionResult::Canceled); + let fut = fut.unwrap_or_else(move |_| RawConnectionResult::Canceled { connection_id }); self.pending_raw_connections.push(Box::pin(fut)); + self.cancel_futures.insert(connection_id, handle); Ok(()) } @@ -513,6 +517,7 @@ impl Transport for TcpTransport { fn cancel(&mut self, connection_id: ConnectionId) { self.canceled.insert(connection_id); + self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); } } @@ -574,7 +579,9 @@ impl Stream for TcpTransport { errors, })); }, - RawConnectionResult::Canceled => (), + RawConnectionResult::Canceled { connection_id } => { + self.canceled.remove(&connection_id); + } } } From 14dc4cc133e4c09f06b75119970583973a8353f0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Sep 2024 19:23:38 +0300 Subject: [PATCH 3/9] websocket: Backport abortable socket dial futures from TCP Signed-off-by: Alexandru Vasile --- src/transport/websocket/mod.rs | 93 ++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index f7999735..2c4d8d65 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -36,7 +36,11 @@ use crate::{ DialError, PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{ + future::BoxFuture, + stream::{AbortHandle, FuturesUnordered}, + Stream, StreamExt, TryFutureExt, +}; use multiaddr::{Multiaddr, Protocol}; use socket2::{Domain, Socket, Type}; use std::net::SocketAddr; @@ -71,6 +75,25 @@ struct PendingInboundConnection { address: SocketAddr, } +#[derive(Debug)] +enum RawConnectionResult { + /// The first successful connection. + Connected { + connection_id: ConnectionId, + address: Multiaddr, + stream: WebSocketStream>, + }, + + /// All connection attempts failed. + Failed { + connection_id: ConnectionId, + errors: Vec<(Multiaddr, DialError)>, + }, + + /// Future was canceled. + Canceled { connection_id: ConnectionId }, +} + /// WebSocket transport. pub(crate) struct WebSocketTransport { /// Transport context. @@ -97,19 +120,7 @@ pub(crate) struct WebSocketTransport { >, /// Pending raw, unnegotiated connections. - pending_raw_connections: FuturesUnordered< - BoxFuture< - 'static, - Result< - ( - ConnectionId, - Multiaddr, - WebSocketStream>, - ), - (ConnectionId, Vec<(Multiaddr, DialError)>), - >, - >, - >, + pending_raw_connections: FuturesUnordered>, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap>, Multiaddr)>, @@ -117,6 +128,8 @@ pub(crate) struct WebSocketTransport { /// Canceled raw connections. canceled: HashSet, + cancel_futures: HashMap, + /// Negotiated connections waiting validation. pending_open: HashMap, } @@ -315,6 +328,7 @@ impl TransportBuilder for WebSocketTransport { pending_inbound_connections: HashMap::new(), pending_connections: FuturesUnordered::new(), pending_raw_connections: FuturesUnordered::new(), + cancel_futures: HashMap::new(), }, listen_addresses, )) @@ -458,12 +472,17 @@ impl Transport for WebSocketTransport { }) .collect(); - self.pending_raw_connections.push(Box::pin(async move { + // Future that will resolve to the first successful connection. + let future = async move { let mut errors = Vec::with_capacity(num_addresses); - while let Some(result) = futures.next().await { match result { - Ok((address, stream)) => return Ok((connection_id, address, stream)), + Ok((address, stream)) => + return RawConnectionResult::Connected { + connection_id, + address, + stream, + }, Err(error) => { tracing::debug!( target: LOG_TARGET, @@ -476,8 +495,16 @@ impl Transport for WebSocketTransport { } } - Err((connection_id, errors)) - })); + RawConnectionResult::Failed { + connection_id, + errors, + } + }; + + let (fut, handle) = futures::future::abortable(future); + let fut = fut.unwrap_or_else(move |_| RawConnectionResult::Canceled { connection_id }); + self.pending_raw_connections.push(Box::pin(fut)); + self.cancel_futures.insert(connection_id, handle); Ok(()) } @@ -536,6 +563,7 @@ impl Transport for WebSocketTransport { fn cancel(&mut self, connection_id: ConnectionId) { self.canceled.insert(connection_id); + self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); } } @@ -565,16 +593,14 @@ impl Stream for WebSocketTransport { } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { - match result { - Ok((connection_id, address, stream)) => { - tracing::trace!( - target: LOG_TARGET, - ?connection_id, - ?address, - canceled = self.canceled.contains(&connection_id), - "connection opened", - ); + tracing::trace!(target: LOG_TARGET, ?result, "raw connection result"); + match result { + RawConnectionResult::Connected { + connection_id, + address, + stream, + } => if !self.canceled.remove(&connection_id) { self.opened_raw.insert(connection_id, (stream, address.clone())); @@ -582,15 +608,20 @@ impl Stream for WebSocketTransport { connection_id, address, })); - } - } - Err((connection_id, errors)) => + }, + RawConnectionResult::Failed { + connection_id, + errors, + } => if !self.canceled.remove(&connection_id) { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); }, + RawConnectionResult::Canceled { connection_id } => { + self.canceled.remove(&connection_id); + } } } From 2260c9558b884d74714c21ddd895f059acfa1bac Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 26 Sep 2024 18:54:07 +0300 Subject: [PATCH 4/9] quic: Backport abortable socket dial futures from TCP Signed-off-by: Alexandru Vasile --- src/transport/quic/mod.rs | 88 +++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 26 deletions(-) diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index ad03674d..d69e1603 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -34,7 +34,11 @@ use crate::{ PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{ + future::BoxFuture, + stream::{AbortHandle, FuturesUnordered}, + Stream, StreamExt, TryFutureExt, +}; use multiaddr::{Multiaddr, Protocol}; use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout}; @@ -66,6 +70,25 @@ struct NegotiatedConnection { connection: Connection, } +#[derive(Debug)] +enum RawConnectionResult { + /// The first successful connection. + Connected { + connection_id: ConnectionId, + address: Multiaddr, + stream: NegotiatedConnection, + }, + + /// All connection attempts failed. + Failed { + connection_id: ConnectionId, + errors: Vec<(Multiaddr, DialError)>, + }, + + /// Future was canceled. + Canceled { connection_id: ConnectionId }, +} + /// QUIC transport object. pub(crate) struct QuicTransport { /// Transport handle. @@ -92,21 +115,15 @@ pub(crate) struct QuicTransport { pending_open: HashMap, /// Pending raw, unnegotiated connections. - pending_raw_connections: FuturesUnordered< - BoxFuture< - 'static, - Result< - (ConnectionId, Multiaddr, NegotiatedConnection), - (ConnectionId, Vec<(Multiaddr, DialError)>), - >, - >, - >, + pending_raw_connections: FuturesUnordered>, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, /// Canceled raw connections. canceled: HashSet, + + cancel_futures: HashMap, } impl QuicTransport { @@ -225,6 +242,7 @@ impl TransportBuilder for QuicTransport { pending_inbound_connections: HashMap::new(), pending_raw_connections: FuturesUnordered::new(), pending_connections: FuturesUnordered::new(), + cancel_futures: HashMap::new(), }, listen_addresses, )) @@ -407,12 +425,18 @@ impl Transport for QuicTransport { }) .collect(); - self.pending_raw_connections.push(Box::pin(async move { + // Future that will resolve to the first successful connection. + let future = async move { let mut errors = Vec::with_capacity(num_addresses); while let Some(result) = futures.next().await { match result { - Ok((address, connection)) => return Ok((connection_id, address, connection)), + Ok((address, stream)) => + return RawConnectionResult::Connected { + connection_id, + address, + stream, + }, Err(error) => { tracing::debug!( target: LOG_TARGET, @@ -425,8 +449,16 @@ impl Transport for QuicTransport { } } - Err((connection_id, errors)) - })); + RawConnectionResult::Failed { + connection_id, + errors, + } + }; + + let (fut, handle) = futures::future::abortable(future); + let fut = fut.unwrap_or_else(move |_| RawConnectionResult::Canceled { connection_id }); + self.pending_raw_connections.push(Box::pin(fut)); + self.cancel_futures.insert(connection_id, handle); Ok(()) } @@ -446,6 +478,7 @@ impl Transport for QuicTransport { /// Cancel opening connections. fn cancel(&mut self, connection_id: ConnectionId) { self.canceled.insert(connection_id); + self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); } } @@ -470,16 +503,14 @@ impl Stream for QuicTransport { } while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) { - match result { - Ok((connection_id, address, stream)) => { - tracing::trace!( - target: LOG_TARGET, - ?connection_id, - ?address, - canceled = self.canceled.contains(&connection_id), - "connection opened", - ); + tracing::trace!(target: LOG_TARGET, ?result, "raw connection result"); + match result { + RawConnectionResult::Connected { + connection_id, + address, + stream, + } => if !self.canceled.remove(&connection_id) { self.opened_raw.insert(connection_id, (stream, address.clone())); @@ -487,15 +518,20 @@ impl Stream for QuicTransport { connection_id, address, })); - } - } - Err((connection_id, errors)) => + }, + RawConnectionResult::Failed { + connection_id, + errors, + } => if !self.canceled.remove(&connection_id) { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); }, + RawConnectionResult::Canceled { connection_id } => { + self.canceled.remove(&connection_id); + } } } From 3c8c556cf67cd7d54a950a0a96d01489752bf746 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Oct 2024 15:57:29 +0300 Subject: [PATCH 5/9] tcp: Fix cancel leak Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 59 ++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 657250b2..2a257df2 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -46,7 +46,7 @@ use socket2::{Domain, Socket, Type}; use tokio::net::TcpStream; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, net::SocketAddr, pin::Pin, task::{Context, Poll}, @@ -121,9 +121,9 @@ pub(crate) struct TcpTransport { /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, - /// Canceled raw connections. - canceled: HashSet, - + /// Cancel raw connections futures. + /// + /// This is cancelling `Self::pending_raw_connections`. cancel_futures: HashMap, /// Connections which have been opened and negotiated but are being validated by the @@ -291,7 +291,6 @@ impl TransportBuilder for TcpTransport { config, context, dial_addresses, - canceled: HashSet::new(), opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), @@ -516,8 +515,11 @@ impl Transport for TcpTransport { } fn cancel(&mut self, connection_id: ConnectionId) { - self.canceled.insert(connection_id); - self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); + // Cancel the future if it exists. + // State clean-up happens inside the `poll_next`. + if let Some(handle) = self.cancel_futures.get(&connection_id) { + handle.abort(); + } } } @@ -560,27 +562,56 @@ impl Stream for TcpTransport { connection_id, address, stream, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?address, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { self.opened_raw.insert(connection_id, (stream, address.clone())); return Poll::Ready(Some(TransportEvent::ConnectionOpened { connection_id, address, })); - }, + } + } + RawConnectionResult::Failed { connection_id, errors, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?errors, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); - }, + } + } RawConnectionResult::Canceled { connection_id } => { - self.canceled.remove(&connection_id); + if self.cancel_futures.remove(&connection_id).is_none() { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + "raw cancelled connection without a handle", + ); + } } } } From 9f9ae068eb9560eb05edff566e2f45f3f20674ea Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Oct 2024 16:01:14 +0300 Subject: [PATCH 6/9] websocket: Fix cancel leak Signed-off-by: Alexandru Vasile --- src/transport/websocket/mod.rs | 59 ++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 2c4d8d65..049002d7 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -50,7 +50,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use url::Url; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, pin::Pin, task::{Context, Poll}, time::Duration, @@ -125,9 +125,9 @@ pub(crate) struct WebSocketTransport { /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap>, Multiaddr)>, - /// Canceled raw connections. - canceled: HashSet, - + /// Cancel raw connections futures. + /// + /// This is cancelling `Self::pending_raw_connections`. cancel_futures: HashMap, /// Negotiated connections waiting validation. @@ -321,7 +321,6 @@ impl TransportBuilder for WebSocketTransport { config, context, dial_addresses, - canceled: HashSet::new(), opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), @@ -562,8 +561,11 @@ impl Transport for WebSocketTransport { } fn cancel(&mut self, connection_id: ConnectionId) { - self.canceled.insert(connection_id); - self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); + // Cancel the future if it exists. + // State clean-up happens inside the `poll_next`. + if let Some(handle) = self.cancel_futures.get(&connection_id) { + handle.abort(); + } } } @@ -600,27 +602,56 @@ impl Stream for WebSocketTransport { connection_id, address, stream, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?address, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { self.opened_raw.insert(connection_id, (stream, address.clone())); return Poll::Ready(Some(TransportEvent::ConnectionOpened { connection_id, address, })); - }, + } + } + RawConnectionResult::Failed { connection_id, errors, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?errors, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); - }, + } + } RawConnectionResult::Canceled { connection_id } => { - self.canceled.remove(&connection_id); + if self.cancel_futures.remove(&connection_id).is_none() { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + "raw cancelled connection without a handle", + ); + } } } } From fdf40024192631c6d51e8f717a767db1a4cbeb49 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:21:56 +0200 Subject: [PATCH 7/9] Update src/transport/tcp/mod.rs Co-authored-by: Dmitry Markin --- src/transport/tcp/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 2a257df2..6b3c3e8c 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -609,7 +609,7 @@ impl Stream for TcpTransport { tracing::warn!( target: LOG_TARGET, ?connection_id, - "raw cancelled connection without a handle", + "raw cancelled connection without a cancel handle", ); } } From c0f924bfce1cf96bd84cb05e9a6d298d374be4b3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:22:01 +0200 Subject: [PATCH 8/9] Update src/transport/websocket/mod.rs Co-authored-by: Dmitry Markin --- src/transport/websocket/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 049002d7..7340243c 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -649,7 +649,7 @@ impl Stream for WebSocketTransport { tracing::warn!( target: LOG_TARGET, ?connection_id, - "raw cancelled connection without a handle", + "raw cancelled connection without a cancel handle", ); } } From 97bc506030614da7f62ce502ca028c8a98648dc2 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 30 Oct 2024 11:32:55 +0200 Subject: [PATCH 9/9] quic: Fix memory leak Signed-off-by: Alexandru Vasile --- src/transport/quic/mod.rs | 60 ++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index d69e1603..0cf5e255 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -43,7 +43,7 @@ use multiaddr::{Multiaddr, Protocol}; use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout}; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, sync::Arc, @@ -120,9 +120,9 @@ pub(crate) struct QuicTransport { /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, - /// Canceled raw connections. - canceled: HashSet, - + /// Cancel raw connections futures. + /// + /// This is cancelling `Self::pending_raw_connections`. cancel_futures: HashMap, } @@ -235,7 +235,6 @@ impl TransportBuilder for QuicTransport { context, config, listener, - canceled: HashSet::new(), opened_raw: HashMap::new(), pending_open: HashMap::new(), pending_dials: HashMap::new(), @@ -477,8 +476,11 @@ impl Transport for QuicTransport { /// Cancel opening connections. fn cancel(&mut self, connection_id: ConnectionId) { - self.canceled.insert(connection_id); - self.cancel_futures.remove(&connection_id).map(|handle| handle.abort()); + // Cancel the future if it exists. + // State clean-up happens inside the `poll_next`. + if let Some(handle) = self.cancel_futures.get(&connection_id) { + handle.abort(); + } } } @@ -510,27 +512,57 @@ impl Stream for QuicTransport { connection_id, address, stream, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?address, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { self.opened_raw.insert(connection_id, (stream, address.clone())); return Poll::Ready(Some(TransportEvent::ConnectionOpened { connection_id, address, })); - }, + } + } + RawConnectionResult::Failed { connection_id, errors, - } => - if !self.canceled.remove(&connection_id) { + } => { + let Some(handle) = self.cancel_futures.remove(&connection_id) else { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + ?errors, + "raw connection without a cancel handle", + ); + continue; + }; + + if !handle.is_aborted() { return Poll::Ready(Some(TransportEvent::OpenFailure { connection_id, errors, })); - }, + } + } + RawConnectionResult::Canceled { connection_id } => { - self.canceled.remove(&connection_id); + if self.cancel_futures.remove(&connection_id).is_none() { + tracing::warn!( + target: LOG_TARGET, + ?connection_id, + "raw cancelled connection without a cancel handle", + ); + } } } }