Skip to content

Commit

Permalink
Merge pull request #3127 from autonomys/fix-generic-request-with-addr…
Browse files Browse the repository at this point in the history
…esses

Make requests to peers with explicit addresses succeed
  • Loading branch information
nazar-pc authored Oct 15, 2024
2 parents f1428ed + 55e047d commit 7e7188b
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 16 deletions.
6 changes: 3 additions & 3 deletions crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::test()]
#[tokio::test]
async fn test_address_timed_removal_from_known_peers_cache() {
// Cache initialization
let peer_id = PeerId::random();
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn test_address_timed_removal_from_known_peers_cache() {
assert_eq!(removed_addresses.len(), 2);
}

#[tokio::test()]
#[tokio::test]
async fn test_different_removal_timing_from_known_peers_cache() {
// Cache initialization
let peer_id = PeerId::random();
Expand Down Expand Up @@ -282,7 +282,7 @@ async fn test_address_p2p_prefix_addition() {
assert_eq!(append_p2p_suffix(peer_id, short_addr.clone()), long_addr);
}

#[tokio::test()]
#[tokio::test]
async fn test_known_peers_removal_address_after_specified_interval() {
let config = KnownPeersManagerConfig {
enable_known_peers_source: false,
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(test)]
mod tests;

use crate::protocols::request_response::handlers::generic_request_handler::GenericRequest;
use crate::protocols::request_response::request_response_factory;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
Expand Down
91 changes: 91 additions & 0 deletions crates/subspace-networking/src/node/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// TODO: Remove
#![allow(
clippy::needless_return,
reason = "https://github.com/rust-lang/rust-clippy/issues/13458"
)]

use crate::protocols::request_response::handlers::generic_request_handler::{
GenericRequest, GenericRequestHandler,
};
use crate::{construct, Config};
use futures::channel::oneshot;
use libp2p::multiaddr::Protocol;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use std::sync::Arc;

#[derive(Encode, Decode)]
struct ExampleRequest;

impl GenericRequest for ExampleRequest {
const PROTOCOL_NAME: &'static str = "/example";
const LOG_TARGET: &'static str = "example_request";
type Response = ExampleResponse;
}

#[derive(Encode, Decode, Debug)]
struct ExampleResponse;

#[tokio::test]
async fn request_with_addresses() {
tracing_subscriber::fmt::init();

let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _example_request| async { Some(ExampleResponse) },
)],
..Config::default()
};
let (node_1, mut node_runner_1) = construct(config_1).unwrap();

let (node_1_address_sender, node_1_address_receiver) = oneshot::channel();
let on_new_listener_handler = node_1.on_new_listener(Arc::new({
let node_1_address_sender = Mutex::new(Some(node_1_address_sender));

move |address| {
if matches!(address.iter().next(), Some(Protocol::Ip4(_))) {
if let Some(node_1_address_sender) = node_1_address_sender.lock().take() {
node_1_address_sender.send(address.clone()).unwrap();
}
}
}
}));

tokio::spawn(async move {
node_runner_1.run().await;
});

// Wait for first node to know its address
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let config_2 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _| async { None },
)],
..Config::default()
};

let (node_2, mut node_runner_2) = construct(config_2).unwrap();

tokio::spawn(async move {
node_runner_2.run().await;
});

// Make request to previously unknown peer
node_2
.send_generic_request(node_1.id(), vec![node_1_addr], ExampleRequest)
.await
.unwrap();

// Subsequent requests should succeed without explicit addresses through already established
// connection
node_2
.send_generic_request(node_1.id(), Vec::new(), ExampleRequest)
.await
.unwrap();
}
62 changes: 53 additions & 9 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::constructor;
use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::LocalOnlyRecordStore;
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
Event as RequestResponseEvent, IfDisconnected, OutboundFailure, RequestFailure,
};
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use futures::future::Fuse;
use futures::{FutureExt, StreamExt};
use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
Expand Down Expand Up @@ -81,6 +81,13 @@ enum BootstrapCommandState {
Finished,
}

#[derive(Debug)]
struct PendingGenericRequest {
protocol_name: &'static str,
request: Vec<u8>,
result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
}

/// Runner for the Node.
#[must_use = "Node does not function properly unless its runner is driven forward"]
pub struct NodeRunner<LocalRecordProvider>
Expand Down Expand Up @@ -126,6 +133,7 @@ where
bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
/// Receives an event on peer address removal from the persistent storage.
removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
requests_pending_connections: HashMap<PeerId, Vec<PendingGenericRequest>>,
/// Optional storage for the [`HandlerId`] of the address removal task.
/// We keep to stop the task along with the rest of the networking.
_address_removal_task_handler_id: Option<HandlerId>,
Expand Down Expand Up @@ -220,6 +228,7 @@ where
bootstrap_addresses,
bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
removed_addresses_rx,
requests_pending_connections: HashMap::new(),
_address_removal_task_handler_id: address_removal_task_handler_id,
}
}
Expand Down Expand Up @@ -484,6 +493,18 @@ where
num_established,
..
} => {
if let Some(generic_requests) = self.requests_pending_connections.remove(&peer_id) {
let request_response = &mut self.swarm.behaviour_mut().request_response;
for request in generic_requests {
request_response.send_request(
&peer_id,
request.protocol_name,
request.request,
request.result_sender,
IfDisconnected::ImmediateError,
);
}
}
// Save known addresses that were successfully dialed.
if let ConnectedPoint::Dialer { address, .. } = &endpoint {
// filter non-global addresses when non-globals addresses are disabled
Expand Down Expand Up @@ -590,6 +611,19 @@ where
};
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
if let Some(generic_requests) =
self.requests_pending_connections.remove(peer_id)
{
for request in generic_requests {
// Do not care if receiver is gone
let _: Result<(), _> = request
.result_sender
.send(Err(RequestFailure::Network(OutboundFailure::DialFailure)));
}
}
}

if let Some(peer_id) = &peer_id {
let should_ban_temporarily =
self.should_temporary_ban_on_dial_error(peer_id, &error);
Expand Down Expand Up @@ -1435,12 +1469,21 @@ where
request,
result_sender,
} => {
// TODO: Ideally it'd be much simpler with https://github.com/libp2p/rust-libp2p/issues/5634
if !addresses.is_empty()
&& !self
.swarm
.connected_peers()
.any(|candidate| candidate == &peer_id)
{
self.requests_pending_connections
.entry(peer_id)
.or_default()
.push(PendingGenericRequest {
protocol_name,
request,
result_sender,
});
if let Err(error) = self.swarm.dial(
DialOpts::peer_id(peer_id)
.addresses(addresses)
Expand All @@ -1449,14 +1492,15 @@ where
) {
warn!(%error, "Failed to dial disconnected peer on generic request");
}
} else {
self.swarm.behaviour_mut().request_response.send_request(
&peer_id,
protocol_name,
request,
result_sender,
IfDisconnected::TryConnect,
);
}
self.swarm.behaviour_mut().request_response.send_request(
&peer_id,
protocol_name,
request,
result_sender,
IfDisconnected::TryConnect,
);
}
Command::GetProviders {
key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::time::sleep;

const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);

#[tokio::test()]
#[tokio::test]
async fn test_connection_breaks_after_timeout_without_reservation() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() {
assert!(!peer2.is_connected(peer1.local_peer_id()));
}

#[tokio::test()]
#[tokio::test]
async fn test_connection_reservation() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
Expand Down Expand Up @@ -111,7 +111,7 @@ async fn test_connection_reservation() {
assert!(peer2.is_connected(peer1.local_peer_id()));
}

#[tokio::test()]
#[tokio::test]
async fn test_connection_reservation_symmetry() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
Expand Down Expand Up @@ -157,7 +157,7 @@ async fn test_connection_reservation_symmetry() {
assert!(!peer2.is_connected(peer1.local_peer_id()));
}

#[tokio::test()]
#[tokio::test]
async fn test_reserved_peers_dial_event() {
let connection_timeout = Duration::from_millis(1300);
let long_delay = Duration::from_millis(2000);
Expand Down

0 comments on commit 7e7188b

Please sign in to comment.