From e044fcd7a4182d3bde7ba258108967fc0c0e1864 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 19 Feb 2025 11:35:55 +0100 Subject: [PATCH 1/6] fix: Endpoint::node_addr completes with direct addrs or relay ready --- iroh/src/endpoint.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 3654ad3fcb..9024d15474 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -836,8 +836,17 @@ impl Endpoint { /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses /// as they would be returned by [`Endpoint::home_relay`] and /// [`Endpoint::direct_addresses`]. + /// + /// This function is async because it waits for either the node's direct addresses + /// or the node's home relay are initialized. pub async fn node_addr(&self) -> Result { - let addrs = self.direct_addresses().initialized().await?; + // Wait for either the home relay or the direct addresses to be ready. + n0_future::future::race( + async { self.direct_addresses().initialized().await.map(|_| ()) }, + async { self.home_relay().initialized().await.map(|_| ()) }, + ) + .await?; + let addrs = self.direct_addresses().get()?.unwrap_or_default(); let relay = self.home_relay().get()?; Ok(NodeAddr::from_parts( self.node_id(), From 5fa3a1ff119de62d87aa531ca68f634a5d86895d Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 19 Feb 2025 12:46:00 +0100 Subject: [PATCH 2/6] fix: make conn_type_stream test pass again --- iroh/src/endpoint.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 9024d15474..90cb3b0ee3 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -2317,6 +2317,8 @@ mod tests { let ep1_nodeid = ep1.node_id(); let ep2_nodeid = ep2.node_id(); + // wait for the direct addresses to be initialized. + ep1.direct_addresses().initialized().await.unwrap(); let ep1_nodeaddr = ep1.node_addr().await.unwrap(); tracing::info!( "node id 1 {ep1_nodeid}, relay URL {:?}", From e13f1861b8d2190ce6009db07849c9451a322829 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 19 Feb 2025 14:26:35 +0100 Subject: [PATCH 3/6] refactor: make Endpoint::node_addr sync, add Endpoint::node_addr_initialized --- iroh/examples/connect-unreliable.rs | 2 +- iroh/examples/echo.rs | 2 +- iroh/examples/listen-unreliable.rs | 2 +- iroh/examples/listen.rs | 2 +- iroh/src/discovery.rs | 10 ++-- iroh/src/endpoint.rs | 93 +++++++++++++++++++++-------- iroh/src/magicsock.rs | 2 +- 7 files changed, 79 insertions(+), 34 deletions(-) diff --git a/iroh/examples/connect-unreliable.rs b/iroh/examples/connect-unreliable.rs index ea375ba0dd..8e1502dcfa 100644 --- a/iroh/examples/connect-unreliable.rs +++ b/iroh/examples/connect-unreliable.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { .bind() .await?; - let node_addr = endpoint.node_addr().await?; + let node_addr = endpoint.node_addr(); let me = node_addr.node_id; println!("node id: {me}"); println!("node listening addresses:"); diff --git a/iroh/examples/echo.rs b/iroh/examples/echo.rs index 5da09096f0..4b3ffe9c95 100644 --- a/iroh/examples/echo.rs +++ b/iroh/examples/echo.rs @@ -23,7 +23,7 @@ const ALPN: &[u8] = b"iroh-example/echo/0"; #[tokio::main] async fn main() -> Result<()> { let router = accept_side().await?; - let node_addr = router.endpoint().node_addr().await?; + let node_addr = router.endpoint().node_addr(); connect_side(node_addr).await?; diff --git a/iroh/examples/listen-unreliable.rs b/iroh/examples/listen-unreliable.rs index 745b20e5b0..76431f3910 100644 --- a/iroh/examples/listen-unreliable.rs +++ b/iroh/examples/listen-unreliable.rs @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); - let node_addr = endpoint.node_addr().await?; + let node_addr = endpoint.node_addr(); let local_addrs = node_addr .direct_addresses .into_iter() diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index 94950de210..d33e3fe79e 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); - let node_addr = endpoint.node_addr().await?; + let node_addr = endpoint.node_addr(); let local_addrs = node_addr .direct_addresses .into_iter() diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 9bfe858185..638797bc0d 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -646,7 +646,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for our address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr_initialized().await; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -672,7 +672,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr().await.context("waiting for NodeAddr")?; + ep1.node_addr_initialized().await; let _conn = ep2 .connect(ep1_addr, TEST_ALPN) .await @@ -704,7 +704,7 @@ mod tests { new_endpoint(secret, disco).await }; // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr_initialized().await; let _conn = ep2.connect(ep1.node_id(), TEST_ALPN).await?; Ok(()) } @@ -726,7 +726,7 @@ mod tests { new_endpoint(secret, disco).await }; // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr(); // 10x faster test via a 3s idle timeout instead of the 30s default let mut config = TransportConfig::default(); @@ -759,7 +759,7 @@ mod tests { new_endpoint(secret, disco).await }; // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr(); let ep1_wrong_addr = NodeAddr { node_id: ep1.node_id(), relay_url: None, diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 90cb3b0ee3..efd942d4de 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -837,22 +837,59 @@ impl Endpoint { /// as they would be returned by [`Endpoint::home_relay`] and /// [`Endpoint::direct_addresses`]. /// - /// This function is async because it waits for either the node's direct addresses - /// or the node's home relay are initialized. - pub async fn node_addr(&self) -> Result { + /// Note that right after binding, the returned node address will likely be empty, because + /// both discovering our direct addresses and selecting our home relay takes some time. + /// + /// If you want to wait until a non-empty node address is available, you can await + /// [`Self::node_addr_initialized`] beforehand. + /// ```no_run + /// # use iroh::{Endpoint, NodeAddr}; + /// # async fn wrapper() -> testresult::TestResult { + /// let ep = Endpoint::builder().bind().await?; + /// ep.node_addr_initialized().await; + /// let node_addr = ep.node_addr(); + /// assert!(!node_addr.is_empty()); + /// # Ok(()) + /// + /// If you need an address with direct addresses, you can wait for those to be initialized instead: + /// ```no_run + /// # use iroh::{Endpoint, NodeAddr}; + /// # async fn wrapper() -> testresult::TestResult { + /// let ep = Endpoint::builder().bind().await?; + /// ep.direct_addresses().initialized().await?; + /// let node_addr = ep.node_addr(); + /// assert!(!node_addr.direct_addresses.is_empty()); + /// # Ok(()) + /// # } + /// Similarily, if you only care about the home relay, use `Endpoint::home_relay` and wait + /// for it to be initialized. + pub fn node_addr(&self) -> NodeAddr { + let addrs = self + .direct_addresses() + .get() + .expect("watchable gone, which is impossible") + .unwrap_or_default(); + let relay = self + .home_relay() + .get() + .expect("watchable gone, which is impossible"); + NodeAddr::from_parts(self.node_id(), relay, addrs.into_iter().map(|x| x.addr)) + } + + /// Wait for either the node's direct addresses or the node's home relay to be initialized. + /// + /// After awaiting this, [`Self::node_addr`] is guaranteed to return a non-empty node address. + /// + /// Under the hood, this races [`Watcher::initialized`] for both [`Self::direct_addresses`] and + /// [`Self::home_relay`]. + pub async fn node_addr_initialized(&self) { // Wait for either the home relay or the direct addresses to be ready. n0_future::future::race( async { self.direct_addresses().initialized().await.map(|_| ()) }, async { self.home_relay().initialized().await.map(|_| ()) }, ) - .await?; - let addrs = self.direct_addresses().get()?.unwrap_or_default(); - let relay = self.home_relay().get()?; - Ok(NodeAddr::from_parts( - self.node_id(), - relay, - addrs.into_iter().map(|x| x.addr), - )) + .await + .expect("watchable gone, which is impossible"); } /// Returns a [`Watcher`] for the [`RelayUrl`] of the Relay server used as home relay. @@ -1916,7 +1953,7 @@ mod tests { .bind() .await .unwrap(); - let my_addr = ep.node_addr().await.unwrap(); + let my_addr = ep.node_addr(); let res = ep.connect(my_addr.clone(), TEST_ALPN).await; assert!(res.is_err()); let err = res.err().unwrap(); @@ -2193,8 +2230,10 @@ mod tests { .bind() .await .unwrap(); - let ep1_nodeaddr = ep1.node_addr().await.unwrap(); - let ep2_nodeaddr = ep2.node_addr().await.unwrap(); + ep1.node_addr_initialized().await; + ep2.node_addr_initialized().await; + let ep1_nodeaddr = ep1.node_addr(); + let ep2_nodeaddr = ep2.node_addr(); ep1.add_node_addr(ep2_nodeaddr.clone()).unwrap(); ep2.add_node_addr(ep1_nodeaddr.clone()).unwrap(); let ep1_nodeid = ep1.node_id(); @@ -2319,7 +2358,7 @@ mod tests { // wait for the direct addresses to be initialized. ep1.direct_addresses().initialized().await.unwrap(); - let ep1_nodeaddr = ep1.node_addr().await.unwrap(); + let ep1_nodeaddr = ep1.node_addr(); tracing::info!( "node id 1 {ep1_nodeid}, relay URL {:?}", ep1_nodeaddr.relay_url() @@ -2471,9 +2510,10 @@ mod tests { ) .await?; - connect_client_0rtt_expect_err(&client, server.node_addr().await?).await?; + server.node_addr_initialized().await; + connect_client_0rtt_expect_err(&client, server.node_addr()).await?; // The second 0rtt attempt should work - connect_client_0rtt_expect_ok(&client, server.node_addr().await?, true).await?; + connect_client_0rtt_expect_ok(&client, server.node_addr(), true).await?; client.close().await; server.close().await; @@ -2497,7 +2537,8 @@ mod tests { ) .await?; - connect_client_0rtt_expect_err(&client, server.node_addr().await?).await?; + server.node_addr_initialized().await; + connect_client_0rtt_expect_err(&client, server.node_addr()).await?; // connecting with another endpoint should not interfere with our // TLS session ticket cache for the first endpoint: @@ -2506,10 +2547,11 @@ mod tests { info_span!("another"), ) .await?; - connect_client_0rtt_expect_err(&client, another.node_addr().await?).await?; + another.node_addr_initialized().await; + connect_client_0rtt_expect_err(&client, another.node_addr()).await?; another.close().await; - connect_client_0rtt_expect_ok(&client, server.node_addr().await?, true).await?; + connect_client_0rtt_expect_ok(&client, server.node_addr(), true).await?; client.close().await; server.close().await; @@ -2528,8 +2570,9 @@ mod tests { let server_key = SecretKey::generate(rand::thread_rng()); let server = spawn_0rtt_server(server_key.clone(), info_span!("server-initial")).await?; - connect_client_0rtt_expect_err(&client, server.node_addr().await?).await?; - connect_client_0rtt_expect_ok(&client, server.node_addr().await?, true).await?; + server.node_addr_initialized().await; + connect_client_0rtt_expect_err(&client, server.node_addr()).await?; + connect_client_0rtt_expect_ok(&client, server.node_addr(), true).await?; server.close().await; @@ -2538,7 +2581,8 @@ mod tests { // we expect the client to *believe* it can 0-RTT connect to the server (hence expect_ok), // but the server will reject the early data because it discarded necessary state // to decrypt it when restarting. - connect_client_0rtt_expect_ok(&client, server.node_addr().await?, false).await?; + server.node_addr_initialized().await; + connect_client_0rtt_expect_ok(&client, server.node_addr(), false).await?; client.close().await; @@ -2553,7 +2597,8 @@ mod tests { .alpns(vec![TEST_ALPN.to_vec()]) .bind() .await?; - let server_addr = server.node_addr().await?; + server.node_addr_initialized().await; + let server_addr = server.node_addr(); let server_task = tokio::spawn(async move { let incoming = server.accept().await.unwrap(); let conn = incoming.await?; diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index ee82ee9814..29cf36dfe0 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -3505,7 +3505,7 @@ mod tests { println!("first conn!"); let conn = m1 .endpoint - .connect(m2.endpoint.node_addr().await?, ALPN) + .connect(m2.endpoint.node_addr(), ALPN) .await?; println!("Closing first conn"); conn.close(0u32.into(), b"bye lolz"); From a5da5720a5092d3f039f187d5811f422714643f5 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 19 Feb 2025 14:38:31 +0100 Subject: [PATCH 4/6] chore: fmt, fixup --- iroh/src/endpoint.rs | 7 ++++++- iroh/src/magicsock.rs | 5 +---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index efd942d4de..f94a664a9c 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -842,6 +842,7 @@ impl Endpoint { /// /// If you want to wait until a non-empty node address is available, you can await /// [`Self::node_addr_initialized`] beforehand. + /// /// ```no_run /// # use iroh::{Endpoint, NodeAddr}; /// # async fn wrapper() -> testresult::TestResult { @@ -850,8 +851,10 @@ impl Endpoint { /// let node_addr = ep.node_addr(); /// assert!(!node_addr.is_empty()); /// # Ok(()) + /// ``` /// /// If you need an address with direct addresses, you can wait for those to be initialized instead: + /// /// ```no_run /// # use iroh::{Endpoint, NodeAddr}; /// # async fn wrapper() -> testresult::TestResult { @@ -861,7 +864,9 @@ impl Endpoint { /// assert!(!node_addr.direct_addresses.is_empty()); /// # Ok(()) /// # } - /// Similarily, if you only care about the home relay, use `Endpoint::home_relay` and wait + /// ``` + /// + /// Similarly, if you only care about the home relay, use `Endpoint::home_relay` and wait /// for it to be initialized. pub fn node_addr(&self) -> NodeAddr { let addrs = self diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 29cf36dfe0..852424ed59 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -3503,10 +3503,7 @@ mod tests { })); println!("first conn!"); - let conn = m1 - .endpoint - .connect(m2.endpoint.node_addr(), ALPN) - .await?; + let conn = m1.endpoint.connect(m2.endpoint.node_addr(), ALPN).await?; println!("Closing first conn"); conn.close(0u32.into(), b"bye lolz"); conn.closed().await; From fd0fcee8a8892410da2c657eeee16c5fb2ff4ebe Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 19 Feb 2025 14:46:56 +0100 Subject: [PATCH 5/6] fix: examples --- iroh/examples/connect-unreliable.rs | 6 ++++-- iroh/examples/echo.rs | 1 + iroh/examples/listen-unreliable.rs | 2 ++ iroh/examples/listen.rs | 4 +++- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/iroh/examples/connect-unreliable.rs b/iroh/examples/connect-unreliable.rs index 8e1502dcfa..f4d1278351 100644 --- a/iroh/examples/connect-unreliable.rs +++ b/iroh/examples/connect-unreliable.rs @@ -50,9 +50,11 @@ async fn main() -> anyhow::Result<()> { .bind() .await?; - let node_addr = endpoint.node_addr(); - let me = node_addr.node_id; + let me = endpoint.node_id(); println!("node id: {me}"); + endpoint.direct_addresses().initialized().await?; + endpoint.home_relay().initialized().await?; + let node_addr = endpoint.node_addr(); println!("node listening addresses:"); node_addr .direct_addresses diff --git a/iroh/examples/echo.rs b/iroh/examples/echo.rs index 4b3ffe9c95..160950a130 100644 --- a/iroh/examples/echo.rs +++ b/iroh/examples/echo.rs @@ -23,6 +23,7 @@ const ALPN: &[u8] = b"iroh-example/echo/0"; #[tokio::main] async fn main() -> Result<()> { let router = accept_side().await?; + router.endpoint().node_addr_initialized().await; let node_addr = router.endpoint().node_addr(); connect_side(node_addr).await?; diff --git a/iroh/examples/listen-unreliable.rs b/iroh/examples/listen-unreliable.rs index 76431f3910..64c3296ab9 100644 --- a/iroh/examples/listen-unreliable.rs +++ b/iroh/examples/listen-unreliable.rs @@ -35,6 +35,8 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); + endpoint.direct_addresses().initialized().await?; + endpoint.home_relay().initialized().await?; let node_addr = endpoint.node_addr(); let local_addrs = node_addr .direct_addresses diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index d33e3fe79e..98375d77f5 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -35,8 +35,10 @@ async fn main() -> anyhow::Result<()> { let me = endpoint.node_id(); println!("node id: {me}"); - println!("node listening addresses:"); + endpoint.direct_addresses().initialized().await?; + endpoint.home_relay().initialized().await?; + println!("node listening addresses:"); let node_addr = endpoint.node_addr(); let local_addrs = node_addr .direct_addresses From 9ae65d47d7c8c67fb5f5931233f734f91edba69b Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 19 Feb 2025 14:52:21 +0100 Subject: [PATCH 6/6] fixup --- iroh/src/endpoint.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index f94a664a9c..209ca2034f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -851,6 +851,7 @@ impl Endpoint { /// let node_addr = ep.node_addr(); /// assert!(!node_addr.is_empty()); /// # Ok(()) + /// # } /// ``` /// /// If you need an address with direct addresses, you can wait for those to be initialized instead: