Skip to content

Commit

Permalink
Formatting Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zombieobject committed Apr 10, 2024
1 parent a902b29 commit aeec51a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 114 deletions.
180 changes: 91 additions & 89 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl FfiGroup {
self.inner_client.as_ref(),
self.group_id.clone(),
self.created_at_ns,
self.added_by_address.clone(),
self.added_by_address.clone(),
);
let message = group.process_streamed_group_message(envelope_bytes).await?;
let ffi_message = message.into();
Expand Down Expand Up @@ -955,84 +955,84 @@ mod tests {
);
}

// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
// async fn test_conversation_streaming() {
// let amal = new_test_client().await;
// let bola = new_test_client().await;

// let stream_callback = RustStreamCallback::new();

// let stream = bola
// .conversations()
// .stream(Box::new(stream_callback.clone()))
// .await
// .unwrap();

// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// amal.conversations()
// .create_group(vec![bola.account_address()], None)
// .await
// .unwrap();

// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// assert_eq!(stream_callback.message_count(), 1);
// // Create another group and add bola
// amal.conversations()
// .create_group(vec![bola.account_address()], None)
// .await
// .unwrap();

// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// assert_eq!(stream_callback.message_count(), 2);

// stream.end();
// tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
// assert!(stream.is_closed());
// }

// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
// async fn test_stream_all_messages() {
// let alix = new_test_client().await;
// let bo = new_test_client().await;
// let caro = new_test_client().await;

// let alix_group = alix
// .conversations()
// .create_group(vec![caro.account_address()], None)
// .await
// .unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// let stream_callback = RustStreamCallback::new();
// let stream = caro
// .conversations()
// .stream_all_messages(Box::new(stream_callback.clone()))
// .await
// .unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// alix_group.send("first".as_bytes().to_vec()).await.unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// let bo_group = bo
// .conversations()
// .create_group(vec![caro.account_address()], None)
// .await
// .unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
// bo_group.send("second".as_bytes().to_vec()).await.unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// alix_group.send("third".as_bytes().to_vec()).await.unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// bo_group.send("fourth".as_bytes().to_vec()).await.unwrap();
// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// assert_eq!(stream_callback.message_count(), 4);
// stream.end();
// tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
// assert!(stream.is_closed());
// }
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_conversation_streaming() {
let amal = new_test_client().await;
let bola = new_test_client().await;

let stream_callback = RustStreamCallback::new();

let stream = bola
.conversations()
.stream(Box::new(stream_callback.clone()))
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

amal.conversations()
.create_group(vec![bola.account_address()], None)
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

assert_eq!(stream_callback.message_count(), 1);
// Create another group and add bola
amal.conversations()
.create_group(vec![bola.account_address()], None)
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(stream_callback.message_count(), 2);

stream.end();
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
assert!(stream.is_closed());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_stream_all_messages() {
let alix = new_test_client().await;
let bo = new_test_client().await;
let caro = new_test_client().await;

let alix_group = alix
.conversations()
.create_group(vec![caro.account_address()], None)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

let stream_callback = RustStreamCallback::new();
let stream = caro
.conversations()
.stream_all_messages(Box::new(stream_callback.clone()))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

alix_group.send("first".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let bo_group = bo
.conversations()
.create_group(vec![caro.account_address()], None)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
bo_group.send("second".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
alix_group.send("third".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
bo_group.send("fourth".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

assert_eq!(stream_callback.message_count(), 4);
stream.end();
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
assert!(stream.is_closed());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_message_streaming() {
Expand Down Expand Up @@ -1147,22 +1147,24 @@ mod tests {
// Bola gets the group id. This will be needed to fetch the group from
// the database.
let bola_groups = bola_conversations
.list(
crate::FfiListConversationsOptions {
created_after_ns: None,
created_before_ns: None,
limit: None
}
)
.await
.unwrap();
.list(crate::FfiListConversationsOptions {
created_after_ns: None,
created_before_ns: None,
limit: None,
})
.await
.unwrap();

let bola_group = bola_groups.first().unwrap();

// Check Bola's group for the added_by_address of the inviter
let added_by_address = bola_group.added_by_address.clone().unwrap();

// // Verify the welcome host_credential is equal to Amal's
assert_eq!(amal.account_address(), added_by_address, "The Inviter and added_by_address do not match!");
assert_eq!(
amal.account_address(),
added_by_address,
"The Inviter and added_by_address do not match!"
);
}
}
69 changes: 44 additions & 25 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ pub mod validated_commit;

use intents::SendMessageIntentData;
use openmls::{
credentials::BasicCredential, error::LibraryError, extensions::{Extension, Extensions, Metadata}, group::{MlsGroupCreateConfig, MlsGroupJoinConfig}, prelude::{
BasicCredentialError, CredentialWithKey, CryptoConfig, Error as TlsCodecError, GroupId,
credentials::BasicCredential,
error::LibraryError,
extensions::{Extension, Extensions, Metadata},
group::{MlsGroupCreateConfig, MlsGroupJoinConfig},
prelude::{
BasicCredentialError, CredentialWithKey, CryptoConfig, Error as TlsCodecError, GroupId,
MlsGroup as OpenMlsGroup, StagedWelcome, Welcome as MlsWelcome, WireFormatPolicy,
}
},
};
use openmls_traits::OpenMlsProvider;
use prost::Message;
Expand Down Expand Up @@ -161,7 +165,12 @@ where
ApiClient: XmtpMlsClient,
{
// Creates a new group instance. Does not validate that the group exists in the DB
pub fn new(client: &'c Client<ApiClient>, group_id: Vec<u8>, created_at_ns: i64, added_by_address: Option<String>) -> Self {
pub fn new(
client: &'c Client<ApiClient>,
group_id: Vec<u8>,
created_at_ns: i64,
added_by_address: Option<String>,
) -> Self {
Self {
client,
group_id,
Expand Down Expand Up @@ -209,9 +218,20 @@ where
mls_group.save(provider.key_store())?;

let group_id = mls_group.group_id().to_vec();
let stored_group = StoredGroup::new(group_id.clone(), now_ns(), membership_state, added_by_address.clone());
let stored_group = StoredGroup::new(
group_id.clone(),
now_ns(),
membership_state,
added_by_address.clone(),
);

stored_group.store(provider.conn())?;
Ok(Self::new(client, group_id, stored_group.created_at_ns, added_by_address))
Ok(Self::new(
client,
group_id,
stored_group.created_at_ns,
added_by_address,
))
}

// Create a group from a decrypted and decoded welcome message
Expand All @@ -229,7 +249,12 @@ where
mls_group.save(provider.key_store())?;

let group_id = mls_group.group_id().to_vec();
let to_store = StoredGroup::new(group_id, now_ns(), GroupMembershipState::Pending, added_by_address.clone());
let to_store = StoredGroup::new(
group_id,
now_ns(),
GroupMembershipState::Pending,
added_by_address.clone(),
);
let stored_group = provider.conn().insert_or_ignore_group(to_store)?;

Ok(Self::new(
Expand All @@ -252,22 +277,15 @@ where
let welcome = deserialize_welcome(&welcome_bytes)?;

let join_config = build_group_join_config();
let staged_welcome = StagedWelcome::new_from_welcome(
provider,
&join_config,
welcome.clone(),
None
)?;
let staged_welcome =
StagedWelcome::new_from_welcome(provider, &join_config, welcome.clone(), None)?;

let added_by_node = staged_welcome
.welcome_sender()?;
let added_by_node = staged_welcome.welcome_sender()?;

let added_by_credential = BasicCredential::try_from(added_by_node.credential())?;
let pub_key_bytes = added_by_node.signature_key().as_slice();
let account_address = Identity::get_validated_account_address(
added_by_credential.identity(),
pub_key_bytes
)?;
let account_address =
Identity::get_validated_account_address(added_by_credential.identity(), pub_key_bytes)?;

Self::create_from_welcome(client, provider, welcome, Some(account_address))
}
Expand Down Expand Up @@ -1005,10 +1023,7 @@ mod tests {

// Bola syncs groups - this will decrypt the Welcome, identify who added Bola
// and then store that value on the group and insert into the database
let bola_groups = bola
.sync_welcomes()
.await
.unwrap();
let bola_groups = bola.sync_welcomes().await.unwrap();

// Bola gets the group id. This will be needed to fetch the group from
// the database.
Expand All @@ -1021,7 +1036,11 @@ mod tests {
// Check Bola's group for the added_by_address of the inviter
let added_by_address = bola_fetched_group.added_by_address.clone().unwrap();

// // Verify the welcome host_credential is equal to Amal's
assert_eq!(amal.account_address(), added_by_address, "The Inviter and added_by_address do not match!");
// Verify the welcome host_credential is equal to Amal's
assert_eq!(
amal.account_address(),
added_by_address,
"The Inviter and added_by_address do not match!"
);
}
}

0 comments on commit aeec51a

Please sign in to comment.