Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(relay): external addr not expiring when relayed listen addr expire #5577

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ libp2p-ping = { version = "0.45.0", path = "protocols/ping" }
libp2p-plaintext = { version = "0.42.0", path = "transports/plaintext" }
libp2p-pnet = { version = "0.25.0", path = "transports/pnet" }
libp2p-quic = { version = "0.11.1", path = "transports/quic" }
libp2p-relay = { version = "0.18.0", path = "protocols/relay" }
libp2p-relay = { version = "0.18.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.27.0", path = "protocols/request-response" }
libp2p-server = { version = "0.12.8", path = "misc/server" }
Expand Down
6 changes: 3 additions & 3 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ async fn connect() {

let dst_relayed_addr = relay_tcp_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(dst_peer_id));
.with(Protocol::P2pCircuit);
dst.listen_on(dst_relayed_addr.clone()).unwrap();

wait_for_reservation(
Expand All @@ -70,7 +69,8 @@ async fn connect() {
.await;
async_std::task::spawn(dst.loop_on_next());

src.dial_and_wait(dst_relayed_addr.clone()).await;
src.dial_and_wait(dst_relayed_addr.with(Protocol::P2p(dst_peer_id)))
.await;

let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id));

Expand Down
6 changes: 6 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.18.1

- Fix relayed `ExternalAddr` not expiring when stopping to listen through relay.
Removing `/p2p/<local_peer_id>` part from the listen and external addresses reported by the relay `Behaviour`.
See [PR 5577](https://github.com/libp2p/rust-libp2p/pull/5577).

## 0.18.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-relay"
edition = "2021"
rust-version = { workspace = true }
description = "Communications relaying for libp2p"
version = "0.18.0"
version = "0.18.1"
authors = ["Parity Technologies <[email protected]>", "Max Inden <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
84 changes: 21 additions & 63 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ use futures::future::{BoxFuture, FutureExt};
use futures::io::{AsyncRead, AsyncWrite};
use futures::ready;
use futures::stream::StreamExt;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::transport::PortUse;
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, ExpiredListenAddr,
NetworkBehaviour, NewListenAddr, NotifyHandler, Stream, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice};
Expand Down Expand Up @@ -71,12 +71,6 @@ pub enum Event {
},
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ReservationStatus {
Pending,
Confirmed,
}

/// [`NetworkBehaviour`] implementation of the relay client
/// functionality of the circuit relay v2 protocol.
pub struct Behaviour {
Expand All @@ -87,11 +81,6 @@ pub struct Behaviour {
/// connection.
directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,

/// Stores the address of a pending or confirmed reservation.
///
/// This is indexed by the [`ConnectionId`] to a relay server and the address is the `/p2p-circuit` address we reserved on it.
reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,

/// Queue of actions to return when polled.
queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Void>>>,

Expand All @@ -105,7 +94,6 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
local_peer_id,
from_transport,
directly_connected_peers: Default::default(),
reservation_addresses: Default::default(),
queued_actions: Default::default(),
pending_handler_commands: Default::default(),
};
Expand Down Expand Up @@ -140,12 +128,6 @@ impl Behaviour {
unreachable!("`on_connection_closed` for unconnected peer.")
}
};
if let Some((addr, ReservationStatus::Confirmed)) =
self.reservation_addresses.remove(&connection_id)
{
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr));
}
}
}
}
Expand Down Expand Up @@ -220,8 +202,19 @@ impl NetworkBehaviour for Behaviour {
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => {
if addr.is_relayed() {
self.queued_actions
.push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
}
}
FromSwarm::ExpiredListenAddr(ExpiredListenAddr { addr, .. }) => {
if addr.is_relayed() {
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
}
}
FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
self.reservation_addresses.remove(&connection_id);
self.pending_handler_commands.remove(&connection_id);
}
_ => {}
Expand All @@ -231,7 +224,7 @@ impl NetworkBehaviour for Behaviour {
fn on_connection_handler_event(
&mut self,
event_source: PeerId,
connection: ConnectionId,
_: ConnectionId,
handler_event: THandlerOutEvent<Self>,
) {
let handler_event = match handler_event {
Expand All @@ -243,17 +236,6 @@ impl NetworkBehaviour for Behaviour {

let event = match handler_event {
handler::Event::ReservationReqAccepted { renewal, limit } => {
let (addr, status) = self
.reservation_addresses
.get_mut(&connection)
.expect("Relay connection exist");

if !renewal && *status == ReservationStatus::Pending {
*status = ReservationStatus::Confirmed;
self.queued_actions
.push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
}

Event::ReservationReqAccepted {
relay_peer_id: event_source,
renewal,
Expand Down Expand Up @@ -294,42 +276,18 @@ impl NetworkBehaviour for Behaviour {
.get(&relay_peer_id)
.and_then(|cs| cs.first())
{
Some(connection_id) => {
self.reservation_addresses.insert(
*connection_id,
(
relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(self.local_peer_id)),
ReservationStatus::Pending,
),
);

ToSwarm::NotifyHandler {
peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::Reserve { to_listener }),
}
}
Some(connection_id) => ToSwarm::NotifyHandler {
peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::Reserve { to_listener }),
},
None => {
let opts = DialOpts::peer_id(relay_peer_id)
.addresses(vec![relay_addr.clone()])
.extend_addresses_through_behaviour()
.build();
let relayed_connection_id = opts.connection_id();

self.reservation_addresses.insert(
relayed_connection_id,
(
relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(self.local_peer_id)),
ReservationStatus::Pending,
),
);

self.pending_handler_commands
.insert(relayed_connection_id, handler::In::Reserve { to_listener });
ToSwarm::Dial { opts }
Expand Down
48 changes: 41 additions & 7 deletions protocols/relay/src/priv_client/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::stream::{Stream, StreamExt};
use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{DialOpts, ListenerId, TransportError, TransportEvent};
use libp2p_identity::PeerId;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use thiserror::Error;
Expand Down Expand Up @@ -154,6 +154,7 @@ impl libp2p_core::Transport for Transport {
from_behaviour,
is_closed: false,
waker: None,
listened_addrs: Default::default(),
};
self.listeners.push(listener);
Ok(())
Expand Down Expand Up @@ -311,6 +312,8 @@ pub(crate) struct Listener {
/// the sender side of the `from_behaviour` channel is dropped.
is_closed: bool,
waker: Option<Waker>,
/// Addresses listened through relay
listened_addrs: HashSet<Multiaddr>,
}

impl Listener {
Expand Down Expand Up @@ -368,14 +371,45 @@ impl Stream for Listener {
self.queued_events.is_empty(),
"Assert empty due to previous `pop_front` attempt."
);
// Returned as [`ListenerEvent::NewAddress`] in next iteration of loop.
self.queued_events = addrs

let reserved_addresses = addrs
.into_iter()
.map(|listen_addr| TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr,
.map(|mut addr| {
// Every transport (tcp / quic / etc) gives addresses without the last
// p2p part, so pop it if present.
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.pop();
}
addr
})
.collect();
.collect::<HashSet<_>>();

let expired_addresses = self
.listened_addrs
.difference(&reserved_addresses)
.map(Clone::clone)
.collect::<Vec<_>>();
let new_addresses = reserved_addresses
.difference(&self.listened_addrs)
.map(Clone::clone)
.collect::<Vec<_>>();
let listener_id = self.listener_id;

for listen_addr in expired_addresses {
self.listened_addrs.remove(&listen_addr);
self.queued_events
.push_back(TransportEvent::AddressExpired {
listener_id,
listen_addr,
});
}
for listen_addr in new_addresses {
self.listened_addrs.insert(listen_addr.clone());
self.queued_events.push_back(TransportEvent::NewAddress {
listener_id,
listen_addr,
});
}
}
ToListenerMsg::IncomingRelayedConnection {
stream,
Expand Down
Loading
Loading