diff --git a/examples/subscribe.rs b/examples/subscribe.rs index ee9908b2..14dad7b5 100644 --- a/examples/subscribe.rs +++ b/examples/subscribe.rs @@ -75,23 +75,23 @@ async fn main() -> Result<(), Box> { Update::Message(message) | Update::Signal(message) => { // Deserialize the message payload as you wish match serde_json::from_slice::(&message.data) { - Ok(message) => println!("defined message: {:?}", message), + Ok(message) => println!("(a) defined message: {:?}", message), Err(_) => { - println!("other message: {:?}", String::from_utf8(message.data)) + println!("(a) other message: {:?}", String::from_utf8(message.data)) } } } Update::Presence(presence) => { - println!("presence: {:?}", presence) + println!("(a) presence: {:?}", presence) } Update::AppContext(object) => { - println!("object: {:?}", object) + println!("(a) object: {:?}", object) } Update::MessageAction(action) => { - println!("message action: {:?}", action) + println!("(a) message action: {:?}", action) } Update::File(file) => { - println!("file: {:?}", file) + println!("(a) file: {:?}", file) } } })); @@ -101,23 +101,23 @@ async fn main() -> Result<(), Box> { Update::Message(message) | Update::Signal(message) => { // Deserialize the message payload as you wish match serde_json::from_slice::(&message.data) { - Ok(message) => println!("~~~~~> defined message: {:?}", message), + Ok(message) => println!("(b) defined message: {:?}", message), Err(_) => { - println!("other message: {:?}", String::from_utf8(message.data)) + println!("(b) other message: {:?}", String::from_utf8(message.data)) } } } Update::Presence(presence) => { - println!("~~~~~> presence: {:?}", presence) + println!("(b) presence: {:?}", presence) } Update::AppContext(object) => { - println!("~~~~~> object: {:?}", object) + println!("(b) object: {:?}", object) } Update::MessageAction(action) => { - println!("~~~~~> message action: {:?}", action) + println!("(b) message action: {:?}", action) } Update::File(file) => { - println!("~~~~~> file: {:?}", file) + println!("(b) file: {:?}", file) } } })); @@ -131,15 +131,23 @@ async fn main() -> Result<(), Box> { // You can also cancel the subscription at any time. // subscription.unsubscribe(); + println!("\nDisconnect from the real-time data stream"); client.disconnect(); tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + println!("\nReconnect to the real-time data stream"); client.reconnect(None); // Let event engine process unsubscribe request tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + // If Subscription or Subscription will go out of scope they will unsubscribe. + // drop(subscription); + // drop(subscription_clone); + + println!( + "\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." ); // Clean up before complete work with PubNub client instance. client.unsubscribe_all(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs index c478afe8..88a8c17f 100644 --- a/src/dx/subscribe/mod.rs +++ b/src/dx/subscribe/mod.rs @@ -405,9 +405,6 @@ where manager.unregister_all() } } - - #[cfg(feature = "presence")] - self.announce_left_all(); } /// Subscription manager which maintains Subscription EE. @@ -443,12 +440,12 @@ where *slot = Some(SubscriptionManager::new( self.subscribe_event_engine(), #[cfg(feature = "presence")] - Arc::new(move |channels, groups| { + Arc::new(move |channels, groups, _all| { Self::subscribe_heartbeat_call(heartbeat_self.clone(), channels, groups); }), #[cfg(feature = "presence")] - Arc::new(move |channels, groups| { - Self::subscribe_leave_call(leave_self.clone(), channels, groups); + Arc::new(move |channels, groups, all| { + Self::subscribe_leave_call(leave_self.clone(), channels, groups, all); }), )); } @@ -572,11 +569,16 @@ where client: Self, channels: Option>, channel_groups: Option>, + all: bool, ) { - client.announce_left( - Self::presence_filtered_entries(channels), - Self::presence_filtered_entries(channel_groups), - ); + if !all { + client.announce_left( + Self::presence_filtered_entries(channels), + Self::presence_filtered_entries(channel_groups), + ); + } else { + client.announce_left_all() + } } fn emit_status(client: Self, status: &ConnectionStatus) { diff --git a/src/dx/subscribe/subscription.rs b/src/dx/subscribe/subscription.rs index b1a0e091..610d9e24 100644 --- a/src/dx/subscribe/subscription.rs +++ b/src/dx/subscribe/subscription.rs @@ -22,7 +22,7 @@ use crate::{ core::{ cmp::PartialEq, fmt::{Debug, Formatter, Result}, - ops::{Add, Deref, DerefMut}, + ops::{Add, Deref, DerefMut, Drop}, }, }, subscribe::{ @@ -89,7 +89,10 @@ use crate::{ /// # Ok(()) /// # } /// ``` -pub struct Subscription { +pub struct Subscription< + T: Transport + Send + Sync + 'static, + D: Deserializer + Send + Sync + 'static, +> { /// Subscription reference. pub(super) inner: Arc>, } @@ -234,8 +237,8 @@ where impl Deref for Subscription where - T: Send + Sync, - D: Send + Sync, + T: Transport + Send + Sync, + D: Deserializer + Send + Sync, { type Target = SubscriptionRef; @@ -246,8 +249,8 @@ where impl DerefMut for Subscription where - T: Send + Sync, - D: Send + Sync, + T: Transport + Send + Sync, + D: Deserializer + Send + Sync, { fn deref_mut(&mut self) -> &mut Self::Target { Arc::get_mut(&mut self.inner) @@ -257,8 +260,8 @@ where impl Clone for Subscription where - T: Send + Sync, - D: Send + Sync, + T: Transport + Send + Sync, + D: Deserializer + Send + Sync, { fn clone(&self) -> Self { Self { @@ -267,6 +270,26 @@ where } } +impl Drop for Subscription +where + T: Transport + Send + Sync + 'static, + D: Deserializer + Send + Sync + 'static, +{ + fn drop(&mut self) { + // Unregistering self to clean up subscriptions list if required. + let Some(client) = self.client().upgrade().clone() else { + return; + }; + + if let Some(manager) = client.subscription_manager(false).write().as_mut() { + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.unregister(&handler); + } + } + } +} + impl Add for Subscription where T: Transport + Send + Sync + 'static, @@ -284,8 +307,8 @@ where impl PartialEq for Subscription where - T: Send + Sync, - D: Send + Sync, + T: Transport + Send + Sync, + D: Deserializer + Send + Sync, { fn eq(&self, other: &Self) -> bool { self.id.eq(&other.id) diff --git a/src/dx/subscribe/subscription_manager.rs b/src/dx/subscribe/subscription_manager.rs index 1e261f4b..93cc16aa 100644 --- a/src/dx/subscribe/subscription_manager.rs +++ b/src/dx/subscribe/subscription_manager.rs @@ -31,7 +31,7 @@ use crate::{ #[cfg(feature = "presence")] pub(in crate::dx::subscribe) type PresenceCall = - dyn Fn(Option>, Option>) + Send + Sync; + dyn Fn(Option>, Option>, bool) + Send + Sync; /// Active subscriptions' manager. /// @@ -296,12 +296,17 @@ where #[cfg(feature = "presence")] { - (!inputs.is_empty && removed.is_none()) - .then(|| self.heartbeat_call.as_ref()(channels.clone(), channel_groups.clone())); + (!inputs.is_empty && removed.is_none()).then(|| { + self.heartbeat_call.as_ref()(channels.clone(), channel_groups.clone(), false) + }); if let Some(removed) = removed { if !removed.is_empty { - self.leave_call.as_ref()(removed.channels(), removed.channel_groups()); + self.leave_call.as_ref()( + removed.channels(), + removed.channel_groups(), + inputs.is_empty, + ); } } } @@ -318,7 +323,7 @@ where #[cfg(feature = "presence")] if !inputs.is_empty { - self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups()); + self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups(), false); } self.event_engine @@ -424,12 +429,12 @@ mod should { let mut manager = SubscriptionManager::new( event_engine(), #[cfg(feature = "presence")] - Arc::new(|channels, _| { + Arc::new(|channels, _, _| { assert!(channels.is_some()); assert_eq!(channels.unwrap().len(), 1); }), #[cfg(feature = "presence")] - Arc::new(|_, _| {}), + Arc::new(|_, _, _| {}), ); let channel = client.channel("test"); let subscription = channel.subscription(None); @@ -447,9 +452,9 @@ mod should { let mut manager = SubscriptionManager::new( event_engine(), #[cfg(feature = "presence")] - Arc::new(|_, _| {}), + Arc::new(|_, _, _| {}), #[cfg(feature = "presence")] - Arc::new(|channels, _| { + Arc::new(|channels, _, _| { assert!(channels.is_some()); assert_eq!(channels.unwrap().len(), 1); }), @@ -471,9 +476,9 @@ mod should { let mut manager = SubscriptionManager::new( event_engine(), #[cfg(feature = "presence")] - Arc::new(|_, _| {}), + Arc::new(|_, _, _| {}), #[cfg(feature = "presence")] - Arc::new(|_, _| {}), + Arc::new(|_, _, _| {}), ); let cursor: SubscriptionCursor = "15800701771129796".to_string().into(); let channel = client.channel("test"); diff --git a/src/dx/subscribe/subscription_set.rs b/src/dx/subscribe/subscription_set.rs index 0aacb5e5..05a7699b 100644 --- a/src/dx/subscribe/subscription_set.rs +++ b/src/dx/subscribe/subscription_set.rs @@ -355,6 +355,26 @@ where } } +impl Drop for SubscriptionSet +where + T: Transport + Send + Sync, + D: Deserializer + Send + Sync, +{ + fn drop(&mut self) { + // Unregistering self to clean up subscriptions list if required. + let Some(client) = self.client().upgrade().clone() else { + return; + }; + + if let Some(manager) = client.subscription_manager(false).write().as_mut() { + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.unregister(&handler); + } + } + } +} + impl Debug for SubscriptionSet where T: Transport + Send + Sync + 'static, diff --git a/src/dx/subscribe/traits/subscriber.rs b/src/dx/subscribe/traits/subscriber.rs index a4c7591a..a94abee1 100644 --- a/src/dx/subscribe/traits/subscriber.rs +++ b/src/dx/subscribe/traits/subscriber.rs @@ -4,12 +4,13 @@ //! which is used by types to provide ability to subscribe for real-time events. use crate::{ + core::{Deserializer, Transport}, lib::alloc::vec::Vec, subscribe::{Subscription, SubscriptionOptions}, }; /// Trait representing a subscriber. -pub trait Subscriber { +pub trait Subscriber { /// Creates a new subscription with the specified options. /// /// # Arguments