Skip to content

refactor: Make Endpoint::node_addr sync and infallible, and add Endpoint::node_addr_initialized #3192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions iroh/examples/connect-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ async fn main() -> anyhow::Result<()> {
.bind()
.await?;

let node_addr = endpoint.node_addr().await?;
let me = node_addr.node_id;
let me = endpoint.node_id();
println!("node id: {me}");
endpoint.direct_addresses().initialized().await?;
endpoint.home_relay().initialized().await?;
let node_addr = endpoint.node_addr();
println!("node listening addresses:");
node_addr
.direct_addresses
Expand Down
3 changes: 2 additions & 1 deletion iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const ALPN: &[u8] = b"iroh-example/echo/0";
#[tokio::main]
async fn main() -> Result<()> {
let router = accept_side().await?;
let node_addr = router.endpoint().node_addr().await?;
router.endpoint().node_addr_initialized().await;
let node_addr = router.endpoint().node_addr();

connect_side(node_addr).await?;

Expand Down
4 changes: 3 additions & 1 deletion iroh/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ async fn main() -> anyhow::Result<()> {
println!("node id: {me}");
println!("node listening addresses:");

let node_addr = endpoint.node_addr().await?;
endpoint.direct_addresses().initialized().await?;
endpoint.home_relay().initialized().await?;
let node_addr = endpoint.node_addr();
let local_addrs = node_addr
.direct_addresses
.into_iter()
Expand Down
6 changes: 4 additions & 2 deletions iroh/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ async fn main() -> anyhow::Result<()> {

let me = endpoint.node_id();
println!("node id: {me}");
println!("node listening addresses:");

let node_addr = endpoint.node_addr().await?;
endpoint.direct_addresses().initialized().await?;
endpoint.home_relay().initialized().await?;
println!("node listening addresses:");
let node_addr = endpoint.node_addr();
let local_addrs = node_addr
.direct_addresses
.into_iter()
Expand Down
10 changes: 5 additions & 5 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ mod tests {
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for our address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr_initialized().await;
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
Expand All @@ -672,7 +672,7 @@ mod tests {
};
let ep1_addr = NodeAddr::new(ep1.node_id());
// wait for out address to be updated and thus published at least once
ep1.node_addr().await.context("waiting for NodeAddr")?;
ep1.node_addr_initialized().await;
let _conn = ep2
.connect(ep1_addr, TEST_ALPN)
.await
Expand Down Expand Up @@ -704,7 +704,7 @@ mod tests {
new_endpoint(secret, disco).await
};
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr_initialized().await;
let _conn = ep2.connect(ep1.node_id(), TEST_ALPN).await?;
Ok(())
}
Expand All @@ -726,7 +726,7 @@ mod tests {
new_endpoint(secret, disco).await
};
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr();

// 10x faster test via a 3s idle timeout instead of the 30s default
let mut config = TransportConfig::default();
Expand Down Expand Up @@ -759,7 +759,7 @@ mod tests {
new_endpoint(secret, disco).await
};
// wait for out address to be updated and thus published at least once
ep1.node_addr().await?;
ep1.node_addr();
let ep1_wrong_addr = NodeAddr {
node_id: ep1.node_id(),
relay_url: None,
Expand Down
104 changes: 83 additions & 21 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,66 @@ impl Endpoint {
/// The returned [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses
/// as they would be returned by [`Endpoint::home_relay`] and
/// [`Endpoint::direct_addresses`].
pub async fn node_addr(&self) -> Result<NodeAddr> {
let addrs = self.direct_addresses().initialized().await?;
let relay = self.home_relay().get()?;
Ok(NodeAddr::from_parts(
self.node_id(),
relay,
addrs.into_iter().map(|x| x.addr),
))
///
/// Note that right after binding, the returned node address will likely be empty, because
/// both discovering our direct addresses and selecting our home relay takes some time.
///
/// If you want to wait until a non-empty node address is available, you can await
/// [`Self::node_addr_initialized`] beforehand.
///
/// ```no_run
/// # use iroh::{Endpoint, NodeAddr};
/// # async fn wrapper() -> testresult::TestResult {
/// let ep = Endpoint::builder().bind().await?;
/// ep.node_addr_initialized().await;
/// let node_addr = ep.node_addr();
/// assert!(!node_addr.is_empty());
/// # Ok(())
/// # }
/// ```
///
/// If you need an address with direct addresses, you can wait for those to be initialized instead:
///
/// ```no_run
/// # use iroh::{Endpoint, NodeAddr};
/// # async fn wrapper() -> testresult::TestResult {
/// let ep = Endpoint::builder().bind().await?;
/// ep.direct_addresses().initialized().await?;
/// let node_addr = ep.node_addr();
/// assert!(!node_addr.direct_addresses.is_empty());
/// # Ok(())
/// # }
/// ```
///
/// Similarly, if you only care about the home relay, use `Endpoint::home_relay` and wait
/// for it to be initialized.
pub fn node_addr(&self) -> NodeAddr {
let addrs = self
.direct_addresses()
.get()
.expect("watchable gone, which is impossible")
.unwrap_or_default();
let relay = self
.home_relay()
.get()
.expect("watchable gone, which is impossible");
NodeAddr::from_parts(self.node_id(), relay, addrs.into_iter().map(|x| x.addr))
}

/// Wait for either the node's direct addresses or the node's home relay to be initialized.
///
/// After awaiting this, [`Self::node_addr`] is guaranteed to return a non-empty node address.
///
/// Under the hood, this races [`Watcher::initialized`] for both [`Self::direct_addresses`] and
/// [`Self::home_relay`].
pub async fn node_addr_initialized(&self) {
// Wait for either the home relay or the direct addresses to be ready.
n0_future::future::race(
async { self.direct_addresses().initialized().await.map(|_| ()) },
async { self.home_relay().initialized().await.map(|_| ()) },
)
.await
.expect("watchable gone, which is impossible");
}

/// Returns a [`Watcher`] for the [`RelayUrl`] of the Relay server used as home relay.
Expand Down Expand Up @@ -1907,7 +1959,7 @@ mod tests {
.bind()
.await
.unwrap();
let my_addr = ep.node_addr().await.unwrap();
let my_addr = ep.node_addr();
let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
assert!(res.is_err());
let err = res.err().unwrap();
Expand Down Expand Up @@ -2184,8 +2236,10 @@ mod tests {
.bind()
.await
.unwrap();
let ep1_nodeaddr = ep1.node_addr().await.unwrap();
let ep2_nodeaddr = ep2.node_addr().await.unwrap();
ep1.node_addr_initialized().await;
ep2.node_addr_initialized().await;
let ep1_nodeaddr = ep1.node_addr();
let ep2_nodeaddr = ep2.node_addr();
ep1.add_node_addr(ep2_nodeaddr.clone()).unwrap();
ep2.add_node_addr(ep1_nodeaddr.clone()).unwrap();
let ep1_nodeid = ep1.node_id();
Expand Down Expand Up @@ -2308,7 +2362,9 @@ mod tests {
let ep1_nodeid = ep1.node_id();
let ep2_nodeid = ep2.node_id();

let ep1_nodeaddr = ep1.node_addr().await.unwrap();
// wait for the direct addresses to be initialized.
ep1.direct_addresses().initialized().await.unwrap();
let ep1_nodeaddr = ep1.node_addr();
tracing::info!(
"node id 1 {ep1_nodeid}, relay URL {:?}",
ep1_nodeaddr.relay_url()
Expand Down Expand Up @@ -2460,9 +2516,10 @@ mod tests {
)
.await?;

connect_client_0rtt_expect_err(&client, server.node_addr().await?).await?;
server.node_addr_initialized().await;
connect_client_0rtt_expect_err(&client, server.node_addr()).await?;
// The second 0rtt attempt should work
connect_client_0rtt_expect_ok(&client, server.node_addr().await?, true).await?;
connect_client_0rtt_expect_ok(&client, server.node_addr(), true).await?;

client.close().await;
server.close().await;
Expand All @@ -2486,7 +2543,8 @@ mod tests {
)
.await?;

connect_client_0rtt_expect_err(&client, server.node_addr().await?).await?;
server.node_addr_initialized().await;
connect_client_0rtt_expect_err(&client, server.node_addr()).await?;

// connecting with another endpoint should not interfere with our
// TLS session ticket cache for the first endpoint:
Expand All @@ -2495,10 +2553,11 @@ mod tests {
info_span!("another"),
)
.await?;
connect_client_0rtt_expect_err(&client, another.node_addr().await?).await?;
another.node_addr_initialized().await;
connect_client_0rtt_expect_err(&client, another.node_addr()).await?;
another.close().await;

connect_client_0rtt_expect_ok(&client, server.node_addr().await?, true).await?;
connect_client_0rtt_expect_ok(&client, server.node_addr(), true).await?;

client.close().await;
server.close().await;
Expand All @@ -2517,8 +2576,9 @@ mod tests {
let server_key = SecretKey::generate(rand::thread_rng());
let server = spawn_0rtt_server(server_key.clone(), info_span!("server-initial")).await?;

connect_client_0rtt_expect_err(&client, server.node_addr().await?).await?;
connect_client_0rtt_expect_ok(&client, server.node_addr().await?, true).await?;
server.node_addr_initialized().await;
connect_client_0rtt_expect_err(&client, server.node_addr()).await?;
connect_client_0rtt_expect_ok(&client, server.node_addr(), true).await?;

server.close().await;

Expand All @@ -2527,7 +2587,8 @@ mod tests {
// we expect the client to *believe* it can 0-RTT connect to the server (hence expect_ok),
// but the server will reject the early data because it discarded necessary state
// to decrypt it when restarting.
connect_client_0rtt_expect_ok(&client, server.node_addr().await?, false).await?;
server.node_addr_initialized().await;
connect_client_0rtt_expect_ok(&client, server.node_addr(), false).await?;

client.close().await;

Expand All @@ -2542,7 +2603,8 @@ mod tests {
.alpns(vec![TEST_ALPN.to_vec()])
.bind()
.await?;
let server_addr = server.node_addr().await?;
server.node_addr_initialized().await;
let server_addr = server.node_addr();
let server_task = tokio::spawn(async move {
let incoming = server.accept().await.unwrap();
let conn = incoming.await?;
Expand Down
5 changes: 1 addition & 4 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3503,10 +3503,7 @@ mod tests {
}));

println!("first conn!");
let conn = m1
.endpoint
.connect(m2.endpoint.node_addr().await?, ALPN)
.await?;
let conn = m1.endpoint.connect(m2.endpoint.node_addr(), ALPN).await?;
println!("Closing first conn");
conn.close(0u32.into(), b"bye lolz");
conn.closed().await;
Expand Down
Loading