Skip to content

Commit

Permalink
feat(iroh): remove Arc requirements from ProtocolHandler (#3010)
Browse files Browse the repository at this point in the history
## Breaking Changes

- `iroh::protocol::ProtocolHandler::accept` now takes `&self` instead of
`Arc<Self>`
- `iroh::protocol::ProtocolHandler::shutdown` now takes `&self` instead
of `Arc<Self>`
- `iroh::protocol::RouterBuilder::accept` now takes `T: ProtocolHandler`
instead of `Arc<dyn ProtocolHandler>`
- `iroh::protocol::ProtocolMap` is now private
  • Loading branch information
dignifiedquire authored Dec 5, 2024
1 parent 79bf3c3 commit 8dfbc35
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 27 deletions.
9 changes: 2 additions & 7 deletions iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
//!
//! cargo run --example echo --features=examples
use std::sync::Arc;

use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{
Expand Down Expand Up @@ -60,10 +58,7 @@ async fn accept_side() -> Result<Router> {
let endpoint = Endpoint::builder().discovery_n0().bind().await?;

// Build our protocol handler and add our protocol, identified by its ALPN, and spawn the node.
let router = Router::builder(endpoint)
.accept(ALPN, Arc::new(Echo))
.spawn()
.await?;
let router = Router::builder(endpoint).accept(ALPN, Echo).spawn().await?;

Ok(router)
}
Expand All @@ -76,7 +71,7 @@ impl ProtocolHandler for Echo {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> {
// We have to return a boxed future from the handler.
Box::pin(async move {
// Wait for the connection to be fully established.
Expand Down
11 changes: 6 additions & 5 deletions iroh/examples/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ impl ProtocolHandler for BlobSearch {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> {
let this = self.clone();
// We have to return a boxed future from the handler.
Box::pin(async move {
// Wait for the connection to be fully established.
Expand All @@ -145,7 +146,7 @@ impl ProtocolHandler for BlobSearch {

// Now, we can perform the actual query on our local database.
let query = String::from_utf8(query_bytes)?;
let num_matches = self.query_local(&query).await;
let num_matches = this.query_local(&query).await;

// We want to return a list of hashes. We do the simplest thing possible, and just send
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
Expand All @@ -167,11 +168,11 @@ impl ProtocolHandler for BlobSearch {

impl BlobSearch {
/// Create a new protocol handler.
pub fn new(endpoint: Endpoint) -> Arc<Self> {
Arc::new(Self {
pub fn new(endpoint: Endpoint) -> Self {
Self {
endpoint,
blobs: Default::default(),
})
}
}

/// Query a remote node, download all matching blobs and print the results.
Expand Down
49 changes: 34 additions & 15 deletions iroh/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
//! ## Example
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use anyhow::Result;
//! # use futures_lite::future::Boxed as BoxedFuture;
//! # use iroh::{endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeAddr};
//! #
//! # async fn test_compile() -> Result<()> {
//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
//!
//! const ALPN: &[u8] = b"/my/alpn";
//! let router = Router::builder(endpoint)
//! .accept(&ALPN, Arc::new(Echo))
//! .accept(b"/my/alpn", Echo)
//! .spawn()
//! .await?;
//! # Ok(())
Expand All @@ -24,7 +22,7 @@
//! struct Echo;
//!
//! impl ProtocolHandler for Echo {
//! fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
//! fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> {
//! Box::pin(async move {
//! let connection = connecting.await?;
//! let (mut send, mut recv) = connection.accept_bi().await?;
Expand Down Expand Up @@ -111,39 +109,59 @@ pub trait ProtocolHandler: Send + Sync + std::fmt::Debug + 'static {
/// Handle an incoming connection.
///
/// This runs on a freshly spawned tokio task so this can be long-running.
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>>;

/// Called when the node shuts down.
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
fn shutdown(&self) -> BoxedFuture<()> {
Box::pin(async move {})
}
}

impl<T: ProtocolHandler> ProtocolHandler for Arc<T> {
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
self.as_ref().accept(conn)
}

fn shutdown(&self) -> BoxedFuture<()> {
self.as_ref().shutdown()
}
}

impl<T: ProtocolHandler> ProtocolHandler for Box<T> {
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
self.as_ref().accept(conn)
}

fn shutdown(&self) -> BoxedFuture<()> {
self.as_ref().shutdown()
}
}

/// A typed map of protocol handlers, mapping them from ALPNs.
#[derive(Debug, Clone, Default)]
pub struct ProtocolMap(BTreeMap<Vec<u8>, Arc<dyn ProtocolHandler>>);
#[derive(Debug, Default)]
pub(crate) struct ProtocolMap(BTreeMap<Vec<u8>, Box<dyn ProtocolHandler>>);

impl ProtocolMap {
/// Returns the registered protocol handler for an ALPN as a [`Arc<dyn ProtocolHandler>`].
pub fn get(&self, alpn: &[u8]) -> Option<Arc<dyn ProtocolHandler>> {
self.0.get(alpn).cloned()
pub(crate) fn get(&self, alpn: &[u8]) -> Option<&dyn ProtocolHandler> {
self.0.get(alpn).map(|p| &**p)
}

/// Inserts a protocol handler.
pub fn insert(&mut self, alpn: Vec<u8>, handler: Arc<dyn ProtocolHandler>) {
pub(crate) fn insert(&mut self, alpn: Vec<u8>, handler: Box<dyn ProtocolHandler>) {
self.0.insert(alpn, handler);
}

/// Returns an iterator of all registered ALPN protocol identifiers.
pub fn alpns(&self) -> impl Iterator<Item = &Vec<u8>> {
pub(crate) fn alpns(&self) -> impl Iterator<Item = &Vec<u8>> {
self.0.keys()
}

/// Shuts down all protocol handlers.
///
/// Calls and awaits [`ProtocolHandler::shutdown`] for all registered handlers concurrently.
pub async fn shutdown(&self) {
let handlers = self.0.values().cloned().map(ProtocolHandler::shutdown);
pub(crate) async fn shutdown(&self) {
let handlers = self.0.values().map(|p| p.shutdown());
join_all(handlers).await;
}
}
Expand Down Expand Up @@ -201,7 +219,8 @@ impl RouterBuilder {

/// Configures the router to accept the [`ProtocolHandler`] when receiving a connection
/// with this `alpn`.
pub fn accept(mut self, alpn: impl AsRef<[u8]>, handler: Arc<dyn ProtocolHandler>) -> Self {
pub fn accept<T: ProtocolHandler>(mut self, alpn: impl AsRef<[u8]>, handler: T) -> Self {
let handler = Box::new(handler);
self.protocols.insert(alpn.as_ref().to_vec(), handler);
self
}
Expand Down

0 comments on commit 8dfbc35

Please sign in to comment.