Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jul 1, 2024
1 parent 2bae690 commit f596e4b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 51 deletions.
8 changes: 3 additions & 5 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ impl NapiConversations {
let tsfn: ThreadsafeFunction<NapiGroup, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let client = self.inner_client.clone();
let stream_closer = RustXmtpClient::stream_conversations_with_callback(
client.clone(),
move |convo| {
let stream_closer =
RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| {
tsfn.call(
Ok(NapiGroup::new(
client.clone(),
Expand All @@ -195,8 +194,7 @@ impl NapiConversations {
)),
ThreadsafeFunctionCallMode::Blocking,
);
},
);
});

Ok(NapiStreamCloser::new(stream_closer))
}
Expand Down
93 changes: 47 additions & 46 deletions bindings_node/src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,70 @@
use napi::bindgen_prelude::Error;
use std::sync::Arc;
use tokio::{sync::Mutex, task::AbortHandle};
use xmtp_mls::{client::ClientError, subscriptions::StreamHandle};
use napi::bindgen_prelude::Error;

use napi_derive::napi;

#[napi]
pub struct NapiStreamCloser {
#[allow(clippy::type_complexity)]
handle: Arc<Mutex<Option<StreamHandle<Result<(), ClientError>>>>>,
// for convenience, does not require locking mutex.
abort_handle: Arc<AbortHandle>,
#[allow(clippy::type_complexity)]
handle: Arc<Mutex<Option<StreamHandle<Result<(), ClientError>>>>>,
// for convenience, does not require locking mutex.
abort_handle: Arc<AbortHandle>,
}

impl NapiStreamCloser {
pub fn new(handle: StreamHandle<Result<(), ClientError>>) -> Self {
Self {
abort_handle: Arc::new(handle.handle.abort_handle()),
handle: Arc::new(Mutex::new(Some(handle))),
}
pub fn new(handle: StreamHandle<Result<(), ClientError>>) -> Self {
Self {
abort_handle: Arc::new(handle.handle.abort_handle()),
handle: Arc::new(Mutex::new(Some(handle))),
}
}
}

impl From<StreamHandle<Result<(), ClientError>>> for NapiStreamCloser {
fn from(handle: StreamHandle<Result<(), ClientError>>) -> Self {
NapiStreamCloser::new(handle)
}
fn from(handle: StreamHandle<Result<(), ClientError>>) -> Self {
NapiStreamCloser::new(handle)
}
}

#[napi]
impl NapiStreamCloser {
/// Signal the stream to end
/// Does not wait for the stream to end.
#[napi]
pub fn end(&self) {
self.abort_handle.abort();
}
/// Signal the stream to end
/// Does not wait for the stream to end.
#[napi]
pub fn end(&self) {
self.abort_handle.abort();
}

/// End the stream and `await` for it to shutdown
/// Returns the `Result` of the task.
#[napi]
pub async fn end_and_wait(&self) -> Result<(), Error> {
if self.abort_handle.is_finished() {
return Ok(());
}

let mut handle = self.handle.lock().await;
let handle = handle.take();
if let Some(h) = handle {
h.handle.abort();
let join_result = h.handle.await;
if matches!(join_result, Err(ref e) if !e.is_cancelled()) {
return Err(Error::from_reason(
format!("subscription event loop join error {}", join_result.unwrap_err())
));
}
} else {
log::warn!("subscription already closed");
}
Ok(())
/// End the stream and `await` for it to shutdown
/// Returns the `Result` of the task.
#[napi]
pub async fn end_and_wait(&self) -> Result<(), Error> {
if self.abort_handle.is_finished() {
return Ok(());
}

/// Checks if this stream is closed
#[napi]
pub fn is_closed(&self) -> bool {
self.abort_handle.is_finished()

let mut handle = self.handle.lock().await;
let handle = handle.take();
if let Some(h) = handle {
h.handle.abort();
let join_result = h.handle.await;
if matches!(join_result, Err(ref e) if !e.is_cancelled()) {
return Err(Error::from_reason(format!(
"subscription event loop join error {}",
join_result.unwrap_err()
)));
}
} else {
log::warn!("subscription already closed");
}
Ok(())
}

/// Checks if this stream is closed
#[napi]
pub fn is_closed(&self) -> bool {
self.abort_handle.is_finished()
}
}

0 comments on commit f596e4b

Please sign in to comment.