Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Jul 9, 2024
1 parent 2fc66d1 commit 9c916ae
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 69 deletions.
30 changes: 29 additions & 1 deletion bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ xmtp_proto = { path = "../xmtp_proto", features = ["proto_full", "grpc"] }
xmtp_user_preferences = { path = "../xmtp_user_preferences" }
xmtp_v2 = { path = "../xmtp_v2" }

tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# NOTE: A regression in openssl-sys exists where libatomic is dynamically linked
# for i686-linux-android targets. https://github.com/sfackler/rust-openssl/issues/2163
#
Expand Down
116 changes: 63 additions & 53 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ impl FfiConversations {
Ok(convo_list)
}

pub async fn stream(&self, callback: Box<dyn FfiConversationCallback>) -> FfiStreamCloser {
pub fn stream(&self, callback: Box<dyn FfiConversationCallback>) -> FfiStreamCloser {
let client = self.inner_client.clone();
let handle =
RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| {
Expand Down Expand Up @@ -645,7 +645,6 @@ impl FfiGroup {
self.created_at_ns,
);

log::debug!("Sending message");
let message_id = group
.send_message(content_bytes.as_slice(), &self.inner_client)
.await?;
Expand Down Expand Up @@ -1009,7 +1008,7 @@ impl FfiGroup {
Ok(())
}

pub async fn stream(&self, message_callback: Box<dyn FfiMessageCallback>) -> FfiStreamCloser {
pub fn stream(&self, message_callback: Box<dyn FfiMessageCallback>) -> FfiStreamCloser {
let inner_client = Arc::clone(&self.inner_client);
let handle = MlsGroup::stream_with_callback(
inner_client,
Expand Down Expand Up @@ -1176,19 +1175,20 @@ impl FfiStreamCloser {
let stream_handle = stream_handle.take();
if let Some(h) = stream_handle {
h.handle.abort();
let join_result = h.handle.await;
if matches!(join_result, Err(ref e) if !e.is_cancelled()) {
return Err(GenericError::Generic {
err: format!(
"subscription event loop join error {}",
join_result.unwrap_err()
),
});
match h.handle.await {
Err(e) if !e.is_cancelled() => Err(GenericError::Generic {
err: format!("subscription event loop join error {}", e),
}),
Err(e) if e.is_cancelled() => Ok(()),
Ok(t) => t.map_err(|e| GenericError::Generic { err: e.to_string() }),
Err(e) => Err(GenericError::Generic {
err: format!("error joining task {}", e),
}),
}
} else {
log::warn!("subscription already closed");
Ok(())
}
Ok(())
}

pub fn is_closed(&self) -> bool {
Expand Down Expand Up @@ -1286,6 +1286,7 @@ mod tests {
self,
distributions::{Alphanumeric, DistString},
};
use tokio::{sync::Notify, time::error::Elapsed};
use xmtp_cryptography::{signature::RecoverableSignature, utils::rng};
use xmtp_id::associations::generate_inbox_id;
use xmtp_mls::{storage::EncryptionKey, InboxOwner};
Expand Down Expand Up @@ -1330,24 +1331,30 @@ mod tests {
num_messages: Arc<AtomicU32>,
messages: Arc<Mutex<Vec<FfiMessage>>>,
conversations: Arc<Mutex<Vec<Arc<FfiGroup>>>>,
notify: Arc<tokio::sync::Notify>,
notify: Arc<Notify>,
}

impl RustStreamCallback {
pub fn message_count(&self) -> u32 {
self.num_messages.load(Ordering::SeqCst)
}

pub async fn wait_for_delivery(&self) {
self.notify.notified().await
pub async fn wait_for_delivery(&self) -> Result<(), Elapsed> {
tokio::time::timeout(std::time::Duration::from_secs(60), async {
self.notify.notified().await
})
.await?;
Ok(())
}
}

impl FfiMessageCallback for RustStreamCallback {
fn on_message(&self, message: FfiMessage) {
log::debug!("On message called");
let mut messages = self.messages.lock().unwrap();
log::info!("Received: {}", String::from_utf8_lossy(&message.content));
log::info!(
"ON MESSAGE Received\n-------- \n{}\n----------",
String::from_utf8_lossy(&message.content)
);
messages.push(message);
let _ = self.num_messages.fetch_add(1, Ordering::SeqCst);
self.notify.notify_one();
Expand Down Expand Up @@ -1726,7 +1733,7 @@ mod tests {
.update_group_name("Old Name".to_string())
.await
.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();

let bo_groups = bo
.conversations()
Expand All @@ -1739,14 +1746,14 @@ mod tests {
.update_group_name("Old Name2".to_string())
.await
.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();

// Uncomment the following lines to add more group name updates
bo_group
.update_group_name("Old Name3".to_string())
.await
.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();

assert_eq!(message_callbacks.message_count(), 3);

Expand All @@ -1756,6 +1763,7 @@ mod tests {
}

// test is also showing intermittent failures with database locked msg
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_stream_and_update_name_without_forking_group() {
let alix = new_test_client().await;
Expand Down Expand Up @@ -1785,11 +1793,9 @@ mod tests {
.update_group_name("hello".to_string())
.await
.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();
alix_group.send("hello1".as_bytes().to_vec()).await.unwrap();
message_callbacks.wait_for_delivery().await;

bo.conversations().sync().await.unwrap();
message_callbacks.wait_for_delivery().await.unwrap();

let bo_groups = bo
.conversations()
Expand All @@ -1806,9 +1812,9 @@ mod tests {
assert_eq!(bo_messages1.len(), first_msg_check);

bo_group.send("hello2".as_bytes().to_vec()).await.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();
bo_group.send("hello3".as_bytes().to_vec()).await.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();

alix_group.sync().await.unwrap();

Expand All @@ -1818,7 +1824,7 @@ mod tests {
assert_eq!(alix_messages.len(), second_msg_check);

alix_group.send("hello4".as_bytes().to_vec()).await.unwrap();
message_callbacks.wait_for_delivery().await;
message_callbacks.wait_for_delivery().await.unwrap();
bo_group.sync().await.unwrap();

let bo_messages2 = bo_group
Expand All @@ -1840,8 +1846,7 @@ mod tests {

let stream = bola
.conversations()
.stream(Box::new(stream_callback.clone()))
.await;
.stream(Box::new(stream_callback.clone()));

amal.conversations()
.create_group(
Expand All @@ -1851,7 +1856,7 @@ mod tests {
.await
.unwrap();

stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();

assert_eq!(stream_callback.message_count(), 1);
// Create another group and add bola
Expand All @@ -1862,7 +1867,7 @@ mod tests {
)
.await
.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();

assert_eq!(stream_callback.message_count(), 2);

Expand Down Expand Up @@ -1893,7 +1898,7 @@ mod tests {
stream.wait_for_ready().await;

alix_group.send("first".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();

let bo_group = bo
.conversations()
Expand All @@ -1906,23 +1911,23 @@ mod tests {
let _ = caro.inner_client.sync_welcomes().await.unwrap();

bo_group.send("second".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();
alix_group.send("third".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();
bo_group.send("fourth".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();

assert_eq!(stream_callback.message_count(), 4);
stream.end_and_wait().await.unwrap();
assert!(stream.is_closed());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
#[tokio::test(flavor = "multi_thread")]
async fn test_message_streaming() {
let amal = new_test_client().await;
let bola = new_test_client().await;

let group = amal
let amal_group: Arc<FfiGroup> = amal
.conversations()
.create_group(
vec![bola.account_address.clone()],
Expand All @@ -1931,16 +1936,24 @@ mod tests {
.await
.unwrap();

bola.inner_client.sync_welcomes().await.unwrap();
let bola_group = bola.group(amal_group.group_id.clone()).unwrap();

let stream_callback = RustStreamCallback::default();
let stream_closer = group.stream(Box::new(stream_callback.clone())).await;
let stream_closer = bola_group.stream(Box::new(stream_callback.clone()));

group.send("hello".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
group.send("goodbye".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_closer.wait_for_ready().await;

assert_eq!(stream_callback.message_count(), 2);
amal_group.send("hello".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await.unwrap();

amal_group
.send("goodbye".as_bytes().to_vec())
.await
.unwrap();
stream_callback.wait_for_delivery().await.unwrap();

assert_eq!(stream_callback.message_count(), 2);
stream_closer.end_and_wait().await.unwrap();
}

Expand Down Expand Up @@ -1970,9 +1983,9 @@ mod tests {
stream_closer.wait_for_ready().await;

amal_group.send(b"hello1".to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();
amal_group.send(b"hello2".to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();

assert_eq!(stream_callback.message_count(), 2);
assert!(!stream_closer.is_closed());
Expand All @@ -1981,7 +1994,7 @@ mod tests {
.remove_members_by_inbox_id(vec![bola.inbox_id().clone()])
.await
.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();
assert_eq!(stream_callback.message_count(), 3); // Member removal transcript message
//
amal_group.send(b"hello3".to_vec()).await.unwrap();
Expand All @@ -2000,7 +2013,7 @@ mod tests {
assert_eq!(stream_callback.message_count(), 3); // Don't receive transcript messages while removed

amal_group.send("hello4".as_bytes().to_vec()).await.unwrap();
stream_callback.wait_for_delivery().await;
stream_callback.wait_for_delivery().await.unwrap();
assert_eq!(stream_callback.message_count(), 4); // Receiving messages again
assert!(!stream_closer.is_closed());

Expand Down Expand Up @@ -2061,10 +2074,7 @@ mod tests {
// Stream all group messages
let message_callback = RustStreamCallback::default();
let group_callback = RustStreamCallback::default();
let stream_groups = bo
.conversations()
.stream(Box::new(group_callback.clone()))
.await;
let stream_groups = bo.conversations().stream(Box::new(group_callback.clone()));

let stream_messages = bo
.conversations()
Expand All @@ -2080,10 +2090,10 @@ mod tests {
)
.await
.unwrap();
group_callback.wait_for_delivery().await;
group_callback.wait_for_delivery().await.unwrap();

alix_group.send("hello1".as_bytes().to_vec()).await.unwrap();
message_callback.wait_for_delivery().await;
message_callback.wait_for_delivery().await.unwrap();

assert_eq!(group_callback.message_count(), 1);
assert_eq!(message_callback.message_count(), 1);
Expand Down
Loading

0 comments on commit 9c916ae

Please sign in to comment.