From e6e8cab1b054d80952fa4df932a8658c1067b3e7 Mon Sep 17 00:00:00 2001 From: Cameron Voell <1103838+cameronvoell@users.noreply.github.com> Date: Thu, 29 Aug 2024 14:53:27 -0700 Subject: [PATCH] Fix send update interval ns calculation, move to configuration (#1008) * fix send update interval ns calculation, move to configuration * lint fix * add delay so send msg syncs installations * add constant to tests so they follow the libxmtp configuration * fmt fix * directly update installations instead of sleeping delay interval in bindings test --------- Co-authored-by: cameronvoell --- bindings_ffi/src/mls.rs | 29 ++++++++++++++++++++++++++++- xmtp_mls/src/configuration.rs | 8 ++++++-- xmtp_mls/src/groups/mod.rs | 24 ++++++++++++++++++++---- xmtp_mls/src/groups/sync.rs | 16 ++++++++-------- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 4571d59af..35d332d59 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1584,7 +1584,11 @@ mod tests { use tokio::{sync::Notify, time::error::Elapsed}; use xmtp_cryptography::{signature::RecoverableSignature, utils::rng}; use xmtp_id::associations::generate_inbox_id; - use xmtp_mls::{storage::EncryptionKey, InboxOwner}; + use xmtp_mls::{ + groups::{GroupError, MlsGroup}, + storage::EncryptionKey, + InboxOwner, + }; #[derive(Clone)] pub struct LocalWalletInboxOwner { @@ -1728,6 +1732,20 @@ mod tests { new_test_client_with_wallet(wallet).await } + impl FfiGroup { + #[cfg(test)] + async fn update_installations(&self) -> Result<(), GroupError> { + let group = MlsGroup::new( + self.inner_client.context().clone(), + self.group_id.clone(), + self.created_at_ns, + ); + + group.update_installations(&self.inner_client).await?; + Ok(()) + } + } + #[tokio::test] async fn get_inbox_id() { let client = new_test_client().await; @@ -2464,6 +2482,8 @@ mod tests { // Recreate client2 (new installation) let client2 = new_test_client_with_wallet(wallet2).await; + client1_group.update_installations().await.unwrap(); + // Send a message that will break the group client1_group .send("This message will break the group".as_bytes().to_vec()) @@ -2517,6 +2537,8 @@ mod tests { let alix_group = alix.group(group.id()).unwrap(); let bo_group = bo.group(group.id()).unwrap(); let caro_group = caro.group(group.id()).unwrap(); + + alix_group.update_installations().await.unwrap(); log::info!("Alix sending first message"); // Alix sends a message in the group alix_group @@ -2525,6 +2547,7 @@ mod tests { .unwrap(); log::info!("Caro sending second message"); + caro_group.update_installations().await.unwrap(); // Caro sends a message in the group caro_group .send("Second message".as_bytes().to_vec()) @@ -2542,6 +2565,8 @@ mod tests { .await; bo_stream_messages.wait_for_ready().await; + alix_group.update_installations().await.unwrap(); + log::info!("Alix sending third message after Bo's second installation added"); // Alix sends a message to the group alix_group @@ -2555,6 +2580,7 @@ mod tests { log::info!("Bo sending fourth message"); // Bo sends a message to the group + bo2_group.update_installations().await.unwrap(); bo2_group .send("Fourth message".as_bytes().to_vec()) .await @@ -2562,6 +2588,7 @@ mod tests { log::info!("Caro sending fifth message"); // Caro sends a message in the group + caro_group.update_installations().await.unwrap(); caro_group .send("Fifth message".as_bytes().to_vec()) .await diff --git a/xmtp_mls/src/configuration.rs b/xmtp_mls/src/configuration.rs index 540505c7f..039299de2 100644 --- a/xmtp_mls/src/configuration.rs +++ b/xmtp_mls/src/configuration.rs @@ -12,9 +12,13 @@ pub const MAX_GROUP_SYNC_RETRIES: usize = 3; pub const MAX_INTENT_PUBLISH_ATTEMPTS: usize = 3; -const NANOSECONDS_IN_HOUR: i64 = 3_600_000_000_000; +const NS_IN_SEC: i64 = 1_000_000_000; -pub const UPDATE_INSTALLATIONS_INTERVAL_NS: i64 = NANOSECONDS_IN_HOUR / 2; // 30 min +const NS_IN_HOUR: i64 = NS_IN_SEC * 60 * 60; + +pub const SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS: i64 = NS_IN_HOUR / 2; // 30 min + +pub const SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS: i64 = 5 * NS_IN_SEC; pub const MAX_GROUP_SIZE: u16 = 400; diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index e9f4ca1de..17fa8c51f 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -74,6 +74,7 @@ use crate::{ configuration::{ CIPHERSUITE, GROUP_MEMBERSHIP_EXTENSION_ID, GROUP_PERMISSIONS_EXTENSION_ID, MAX_GROUP_SIZE, MAX_PAST_EPOCHS, MUTABLE_METADATA_EXTENSION_ID, + SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS, }, hpke::{decrypt_welcome, HpkeError}, identity::{parse_credential, Identity, IdentityError}, @@ -454,10 +455,10 @@ impl MlsGroup { where ApiClient: XmtpApi, { - let update_interval = Some(5_000_000); // 5 seconds in nanoseconds + let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); let conn = self.context.store.conn()?; let provider = XmtpOpenMlsProvider::from(conn); - self.maybe_update_installations(&provider, update_interval, client) + self.maybe_update_installations(&provider, update_interval_ns, client) .await?; let message_id = self.prepare_message(message, provider.conn_ref(), |now| { @@ -480,14 +481,29 @@ impl MlsGroup { { let conn = self.context.store.conn()?; let provider = XmtpOpenMlsProvider::from(conn); - let update_interval = Some(5_000_000); - self.maybe_update_installations(&provider, update_interval, client) + let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); + self.maybe_update_installations(&provider, update_interval_ns, client) .await?; self.sync_until_last_intent_resolved(&provider, client) .await?; Ok(()) } + /// Update group installations + pub async fn update_installations( + &self, + client: &Client, + ) -> Result<(), GroupError> + where + ApiClient: XmtpApi, + { + let conn = self.context.store.conn()?; + let provider = XmtpOpenMlsProvider::from(conn); + self.maybe_update_installations(&provider, Some(0), client) + .await?; + Ok(()) + } + /// Send a message, optimistically returning the ID of the message before the result of a message publish. pub fn send_message_optimistic(&self, message: &[u8]) -> Result, GroupError> { let conn = self.context.store.conn()?; diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index 6f81c4eaa..3842a853c 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -18,7 +18,7 @@ use crate::{ codecs::{group_updated::GroupUpdatedCodec, ContentCodec}, configuration::{ GRPC_DATA_LIMIT, MAX_GROUP_SIZE, MAX_INTENT_PUBLISH_ATTEMPTS, MAX_PAST_EPOCHS, - UPDATE_INSTALLATIONS_INTERVAL_NS, + SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS, }, groups::{ intents::UpdateMetadataIntentData, message_history::MessageHistoryContent, @@ -974,24 +974,24 @@ impl MlsGroup { pub async fn maybe_update_installations( &self, provider: &XmtpOpenMlsProvider, - update_interval: Option, + update_interval_ns: Option, client: &Client, ) -> Result<(), GroupError> where ApiClient: XmtpApi, { // determine how long of an interval in time to use before updating list - let interval = match update_interval { + let interval_ns = match update_interval_ns { Some(val) => val, - None => UPDATE_INSTALLATIONS_INTERVAL_NS, + None => SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS, }; - let now = crate::utils::time::now_ns(); - let last = provider + let now_ns = crate::utils::time::now_ns(); + let last_ns = provider .conn_ref() .get_installations_time_checked(self.group_id.clone())?; - let elapsed = now - last; - if elapsed > interval { + let elapsed_ns = now_ns - last_ns; + if elapsed_ns > interval_ns { self.add_missing_installations(provider, client).await?; provider .conn_ref()