From 40fc428f5b58cd41418e69fd6b369adf78d36803 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 22 Nov 2024 06:13:40 -0600 Subject: [PATCH] refactor: provide topic in pubsub event (#337) --- CHANGELOG.md | 3 + Cargo.lock | 2 +- Cargo.toml | 2 +- examples/pubsub.rs | 153 ++++++++++++++++++++++++++++++++++++++------- src/lib.rs | 68 ++++++++++++-------- tests/pubsub.rs | 72 ++++++++++++++++++++- 6 files changed, 249 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bded6120c..c73260748 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# 0.13.0 +- refactor: provide topic in pubsub event. If a topic filter is supplied, the topic will be excluded from the event. [PR 337](https://github.com/dariusc93/rust-ipfs/pull/337) + # 0.12.2 - feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR 320](https://github.com/dariusc93/rust-ipfs/pull/320) diff --git a/Cargo.lock b/Cargo.lock index 520514104..aa78f7afb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4379,7 +4379,7 @@ dependencies = [ [[package]] name = "rust-ipfs" -version = "0.12.2" +version = "0.13.0" dependencies = [ "anyhow", "async-stream", diff --git a/Cargo.toml b/Cargo.toml index e35fe67ab..f4f101db1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ name = "rust-ipfs" readme = "README.md" repository = "https://github.com/dariusc93/rust-ipfs" description = "IPFS node implementation" -version = "0.12.2" +version = "0.13.0" [features] default = [] diff --git a/examples/pubsub.rs b/examples/pubsub.rs index f6b8428b7..e8d236059 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -1,11 +1,13 @@ use clap::Parser; -use futures::{pin_mut, FutureExt}; +use futures::FutureExt; use ipld_core::ipld; use libp2p::futures::StreamExt; use libp2p::Multiaddr; use rust_ipfs::p2p::MultiaddrExt; -use rust_ipfs::{Ipfs, Keypair, PubsubEvent, UninitializedIpfs}; +use rust_ipfs::{ConnectionEvents, Ipfs, Keypair, PubsubEvent, UninitializedIpfs}; +use parking_lot::Mutex; +use pollable_map::stream::StreamMap; use rustyline_async::Readline; use std::time::Duration; use std::{io::Write, sync::Arc}; @@ -41,6 +43,8 @@ async fn main() -> anyhow::Result<()> { let topic = opt.topic.unwrap_or_else(|| String::from("ipfs-chat")); + let main_topic = Arc::new(Mutex::new(topic.clone())); + let keypair = Keypair::generate_ed25519(); let peer_id = keypair.public().to_peer_id(); @@ -95,6 +99,16 @@ async fn main() -> anyhow::Result<()> { let mut st = ipfs.connection_events().await?; + let mut main_events = StreamMap::new(); + + let mut listener_st = StreamMap::new(); + + let mut main_event_st = ipfs.pubsub_events(None).await?; + + let stream = ipfs.pubsub_subscribe(topic.clone()).await?; + + listener_st.insert(topic.clone(), stream); + for addr in opt.connect { let Some(peer_id) = addr.peer_id() else { writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?; @@ -109,41 +123,138 @@ async fn main() -> anyhow::Result<()> { writeln!(stdout, "Connected to {}", peer_id)?; } - let mut event_stream = ipfs.pubsub_events(&topic).await?; - - let stream = ipfs.pubsub_subscribe(&topic).await?; - - pin_mut!(stream); - - tokio::spawn(topic_discovery(ipfs.clone(), topic.clone())); + let owned_topic = topic.to_string(); + tokio::spawn(topic_discovery(ipfs.clone(), owned_topic)); tokio::task::yield_now().await; loop { tokio::select! { - data = stream.next() => { - if let Some(msg) = data { - writeln!(stdout, "{}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?; + Some((topic, msg)) = listener_st.next() => { + writeln!(stdout, "> {topic}: {}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?; + } + Some(conn_ev) = st.next() => { + match conn_ev { + ConnectionEvents::IncomingConnection{ peer_id, .. } => { + writeln!(stdout, "> {peer_id} connected")?; + } + ConnectionEvents::OutgoingConnection{ peer_id, .. } => { + writeln!(stdout, "> {peer_id} connected")?; + } + ConnectionEvents::ClosedConnection{ peer_id, .. } => { + writeln!(stdout, "> {peer_id} disconnected")?; + } } } - conn_ev = st.next() => { - if let Some(ev) = conn_ev { - writeln!(stdout, "connection event: {ev:?}")?; + Some(event) = main_event_st.next() => { + match event { + PubsubEvent::Subscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?, + PubsubEvent::Unsubscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?, + _ => unreachable!(), } } - Some(event) = event_stream.next() => { + Some((topic, event)) = main_events.next() => { match event { - PubsubEvent::Subscribe { peer_id } => writeln!(stdout, "{} subscribed", peer_id)?, - PubsubEvent::Unsubscribe { peer_id } => writeln!(stdout, "{} unsubscribed", peer_id)?, + PubsubEvent::Subscribe { peer_id, topic: None } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?, + PubsubEvent::Unsubscribe { peer_id, topic: None } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?, + _ => unreachable!() } } line = rl.readline().fuse() => match line { Ok(rustyline_async::ReadlineEvent::Line(line)) => { - if let Err(e) = ipfs.pubsub_publish(topic.clone(), line.as_bytes().to_vec()).await { - writeln!(stdout, "Error publishing message: {e}")?; + let line = line.trim(); + if !line.starts_with('/') { + if !line.is_empty() { + let topic_to_publish = &*main_topic.lock(); + if let Err(e) = ipfs.pubsub_publish(topic_to_publish.clone(), line.as_bytes().to_vec()).await { + writeln!(stdout, "> error publishing message: {e}")?; + continue; + } + writeln!(stdout, "{peer_id}: {line}")?; + } continue; } - writeln!(stdout, "{peer_id}: {line}")?; + + let mut command = line.split(' '); + + match command.next() { + Some("/subscribe") => { + let topic = match command.next() { + Some(topic) => topic.to_string(), + None => { + writeln!(stdout, "> topic must be provided")?; + continue; + } + }; + let event_st = ipfs.pubsub_events(topic.clone()).await?; + let Ok(st) = ipfs.pubsub_subscribe(topic.clone()).await else { + writeln!(stdout, "> already subscribed to topic")?; + continue; + }; + + listener_st.insert(topic.clone(), st); + main_events.insert(topic.clone(), event_st); + writeln!(stdout, "> subscribed to {}", topic)?; + *main_topic.lock() = topic; + continue; + } + Some("/unsubscribe") => { + let topic = match command.next() { + Some(topic) => topic.to_string(), + None => main_topic.lock().clone() + }; + + listener_st.remove(&topic); + main_events.remove(&topic); + + if !ipfs.pubsub_unsubscribe(&topic).await.unwrap_or_default() { + writeln!(stdout, "> unable to unsubscribe from {}", topic)?; + continue; + } + + writeln!(stdout, "> unsubscribe from {}", topic)?; + if let Some(some_topic) = main_events.keys().next() { + *main_topic.lock() = some_topic.clone(); + writeln!(stdout, "> setting current topic to {}", some_topic)?; + } + continue; + } + Some("/list-topics") => { + let topics = ipfs.pubsub_subscribed().await.unwrap_or_default(); + if topics.is_empty() { + writeln!(stdout, "> not subscribed to any topics")?; + continue; + } + + let current_topic = main_topic.lock().clone(); + + writeln!(stdout, "> list of topics")?; + for topic in topics { + writeln!(stdout, "\t{topic} {}", if current_topic == topic { "- current" } else { "" } )?; + } + } + Some("/set-current-topic") => { + let topic = match command.next() { + Some(topic) if !topic.is_empty() => topic.to_string(), + None | _ => { + writeln!(stdout, "> topic must be provided")?; + continue; + } + }; + + let topics = ipfs.pubsub_subscribed().await.unwrap_or_default(); + if topics.is_empty() || !topics.contains(&topic) { + writeln!(stdout, "> not subscribed to topic \"{topic}\"")?; + continue; + } + + *main_topic.lock() = topic.clone(); + + writeln!(stdout, "> topic set to {topic}")?; + } + _ => continue + } + } Ok(rustyline_async::ReadlineEvent::Eof) => { cancel.notify_one(); diff --git a/src/lib.rs b/src/lib.rs index b64d7eb36..334f30a38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -449,13 +449,19 @@ impl From for Option { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum PubsubEvent { /// Subscription event to a given topic - Subscribe { peer_id: PeerId }, + Subscribe { + peer_id: PeerId, + topic: Option, + }, /// Unsubscribing event to a given topic - Unsubscribe { peer_id: PeerId }, + Unsubscribe { + peer_id: PeerId, + topic: Option, + }, } #[derive(Debug, Clone)] @@ -467,15 +473,6 @@ pub(crate) enum InnerPubsubEvent { Unsubscribe { topic: String, peer_id: PeerId }, } -impl From for PubsubEvent { - fn from(event: InnerPubsubEvent) -> Self { - match event { - InnerPubsubEvent::Subscribe { peer_id, .. } => PubsubEvent::Subscribe { peer_id }, - InnerPubsubEvent::Unsubscribe { peer_id, .. } => PubsubEvent::Unsubscribe { peer_id }, - } - } -} - type TSwarmEvent = as Stream>::Item; type TSwarmEventFn = Arc, &TSwarmEvent) + Sync + Send>; type TTransportFn = Box< @@ -1527,13 +1524,12 @@ impl Ipfs { .await } - /// Stream that returns [`PubsubEvent`] for a given topic + /// Stream that returns [`PubsubEvent`] for a given topic. if a topic is not supplied, it will provide all events emitted for any topic. pub async fn pubsub_events( &self, - topic: impl Into, + topic: impl Into>, ) -> Result, Error> { async move { - let topic = topic.into(); let (tx, rx) = oneshot_channel(); self.to_task @@ -1541,24 +1537,42 @@ impl Ipfs { .send(IpfsEvent::PubsubEventStream(tx)) .await?; - let mut receiver = rx - .await?; + let receiver = rx.await?; - let defined_topic = topic.to_string(); + let defined_topic = topic.into(); - let stream = async_stream::stream! { - while let Some(event) = receiver.next().await { - match &event { - InnerPubsubEvent::Subscribe { topic, .. } | InnerPubsubEvent::Unsubscribe { topic, .. } if topic.eq(&defined_topic) => yield event.into(), - _ => {} - } + let stream = receiver.filter_map(move |event| { + let defined_topic = defined_topic.clone(); + async move { + let ev = match event { + InnerPubsubEvent::Subscribe { topic, peer_id } => { + let topic = match defined_topic { + Some(defined_topic) if defined_topic.eq(&topic) => None, + Some(defined_topic) if defined_topic.ne(&topic) => return None, + Some(_) => return None, + None => Some(topic), + }; + PubsubEvent::Subscribe { peer_id, topic } + } + InnerPubsubEvent::Unsubscribe { topic, peer_id } => { + let topic = match defined_topic { + Some(defined_topic) if defined_topic.eq(&topic) => None, + Some(defined_topic) if defined_topic.ne(&topic) => return None, + Some(_) => return None, + None => Some(topic), + }; + PubsubEvent::Unsubscribe { peer_id, topic } + } + }; + + Some(ev) } - }; + }); Ok(stream.boxed()) } - .instrument(self.span.clone()) - .await + .instrument(self.span.clone()) + .await } /// Publishes to the topic which may have been subscribed to earlier diff --git a/tests/pubsub.rs b/tests/pubsub.rs index 7baaae69e..fda762d67 100644 --- a/tests/pubsub.rs +++ b/tests/pubsub.rs @@ -1,7 +1,7 @@ use futures::future::pending; use futures::stream::StreamExt; use futures_timeout::TimeoutExt; -use rust_ipfs::Node; +use rust_ipfs::{Node, PubsubEvent}; use std::time::Duration; mod common; @@ -179,6 +179,76 @@ async fn publish_between_two_nodes_single_topic() { assert!(disappeared, "timed out before a saw b's unsubscription"); } +#[tokio::test] +async fn pubsub_event_without_filter() { + use futures::stream::StreamExt; + + let nodes = spawn_nodes::<2>(Topology::Line).await; + let node_a = &nodes[0]; + let node_a_peer_id = node_a.id; + let node_b = &nodes[1]; + let node_b_peer_id = node_b.id; + + let mut ev_a = node_a.pubsub_events(None).await.unwrap(); + let mut ev_b = node_b.pubsub_events(None).await.unwrap(); + + let _st_a = node_a.pubsub_subscribe("test0").await.unwrap(); + let _st_b = node_b.pubsub_subscribe("test1").await.unwrap(); + + let next_ev_a = ev_a.next().await.unwrap(); + let next_ev_b = ev_b.next().await.unwrap(); + + assert_eq!( + next_ev_a, + PubsubEvent::Subscribe { + peer_id: node_b_peer_id, + topic: Some("test1".to_string()) + } + ); + assert_eq!( + next_ev_b, + PubsubEvent::Subscribe { + peer_id: node_a_peer_id, + topic: Some("test0".to_string()) + } + ); +} + +#[tokio::test] +async fn pubsub_event_with_filter() { + use futures::stream::StreamExt; + + let nodes = spawn_nodes::<2>(Topology::Line).await; + let node_a = &nodes[0]; + let node_a_peer_id = node_a.id; + let node_b = &nodes[1]; + let node_b_peer_id = node_b.id; + + let mut ev_a = node_a.pubsub_events("test0".to_string()).await.unwrap(); + let mut ev_b = node_b.pubsub_events("test0".to_string()).await.unwrap(); + + let _st_a = node_a.pubsub_subscribe("test0").await.unwrap(); + let _st_b = node_b.pubsub_subscribe("test0").await.unwrap(); + + let next_ev_a = ev_a.next().await.unwrap(); + let next_ev_b = ev_b.next().await.unwrap(); + + assert_eq!( + next_ev_a, + PubsubEvent::Subscribe { + peer_id: node_b_peer_id, + topic: None, + } + ); + assert_eq!( + next_ev_b, + PubsubEvent::Subscribe { + peer_id: node_a_peer_id, + topic: None, + } + ); +} + #[tokio::test] async fn publish_between_two_nodes_different_topics() { use futures::stream::StreamExt;