diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index 93472974d..614b7662b 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -184,9 +184,8 @@ impl NapiConversations { let tsfn: ThreadsafeFunction = callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; let client = self.inner_client.clone(); - let stream_closer = RustXmtpClient::stream_conversations_with_callback( - client.clone(), - move |convo| { + let stream_closer = + RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| { tsfn.call( Ok(NapiGroup::new( client.clone(), @@ -195,8 +194,7 @@ impl NapiConversations { )), ThreadsafeFunctionCallMode::Blocking, ); - }, - ); + }); Ok(NapiStreamCloser::new(stream_closer)) } diff --git a/bindings_node/src/streams.rs b/bindings_node/src/streams.rs index b47ff73f1..5517b686e 100644 --- a/bindings_node/src/streams.rs +++ b/bindings_node/src/streams.rs @@ -1,69 +1,70 @@ +use napi::bindgen_prelude::Error; use std::sync::Arc; use tokio::{sync::Mutex, task::AbortHandle}; use xmtp_mls::{client::ClientError, subscriptions::StreamHandle}; -use napi::bindgen_prelude::Error; use napi_derive::napi; #[napi] pub struct NapiStreamCloser { - #[allow(clippy::type_complexity)] - handle: Arc>>>>, - // for convenience, does not require locking mutex. - abort_handle: Arc, + #[allow(clippy::type_complexity)] + handle: Arc>>>>, + // for convenience, does not require locking mutex. + abort_handle: Arc, } impl NapiStreamCloser { - pub fn new(handle: StreamHandle>) -> Self { - Self { - abort_handle: Arc::new(handle.handle.abort_handle()), - handle: Arc::new(Mutex::new(Some(handle))), - } + pub fn new(handle: StreamHandle>) -> Self { + Self { + abort_handle: Arc::new(handle.handle.abort_handle()), + handle: Arc::new(Mutex::new(Some(handle))), } + } } impl From>> for NapiStreamCloser { - fn from(handle: StreamHandle>) -> Self { - NapiStreamCloser::new(handle) - } + fn from(handle: StreamHandle>) -> Self { + NapiStreamCloser::new(handle) + } } #[napi] impl NapiStreamCloser { - /// Signal the stream to end - /// Does not wait for the stream to end. - #[napi] - pub fn end(&self) { - self.abort_handle.abort(); - } + /// Signal the stream to end + /// Does not wait for the stream to end. + #[napi] + pub fn end(&self) { + self.abort_handle.abort(); + } - /// End the stream and `await` for it to shutdown - /// Returns the `Result` of the task. - #[napi] - pub async fn end_and_wait(&self) -> Result<(), Error> { - if self.abort_handle.is_finished() { - return Ok(()); - } - - let mut handle = self.handle.lock().await; - let handle = handle.take(); - if let Some(h) = handle { - h.handle.abort(); - let join_result = h.handle.await; - if matches!(join_result, Err(ref e) if !e.is_cancelled()) { - return Err(Error::from_reason( - format!("subscription event loop join error {}", join_result.unwrap_err()) - )); - } - } else { - log::warn!("subscription already closed"); - } - Ok(()) + /// End the stream and `await` for it to shutdown + /// Returns the `Result` of the task. + #[napi] + pub async fn end_and_wait(&self) -> Result<(), Error> { + if self.abort_handle.is_finished() { + return Ok(()); } - - /// Checks if this stream is closed - #[napi] - pub fn is_closed(&self) -> bool { - self.abort_handle.is_finished() + + let mut handle = self.handle.lock().await; + let handle = handle.take(); + if let Some(h) = handle { + h.handle.abort(); + let join_result = h.handle.await; + if matches!(join_result, Err(ref e) if !e.is_cancelled()) { + return Err(Error::from_reason(format!( + "subscription event loop join error {}", + join_result.unwrap_err() + ))); + } + } else { + log::warn!("subscription already closed"); } + Ok(()) + } + + /// Checks if this stream is closed + #[napi] + pub fn is_closed(&self) -> bool { + self.abort_handle.is_finished() + } }