Skip to content

Commit

Permalink
Ensure message send succeeds even when out of sync (#917)
Browse files Browse the repository at this point in the history
We have configured the `max_past_epochs` value to 3, which means that we keep around message encryption keys for 3 epochs before deleting them. This means that if we are 3 commits behind when we send a message, nobody else will be able to decrypt it, because they process everything sequentially, and they'll have already deleted their encryption keys by the time they see it.

The fix is as follows:
- When pulling down messages from a group, if we see a message we previously published, we check that the message is no more than 3 epochs behind. If the check passes, the message send intent is updated to COMMITTED, otherwise it's reset to TO_PUBLISH so that the message can be sent again.
- After sending a message, we should go ahead and pull down the messages afterwards, to make sure the message send succeeded (and retry via intents otherwise).

This has the following implications:
1. It's not required to sync the group before sending a message
2. Confirming that a message sent successfully (i.e. waiting for `send_message()` to complete) is slower - there is an extra round trip to pull down the messages afterwards (+more if the message needs to be retried)

My justification for the slower message send is that we've already set up optimistic message sends, with separate prepare and publish steps. In the event that multiple optimistic message sends happen back-to-back, you can call a single publish at the end. Perhaps we can recommend using optimistic message sends, with debounced publishes, in the docs somewhere.

\- Rich
  • Loading branch information
nplasterer authored Jul 25, 2024
1 parent 4a572e8 commit ffa0564
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 7 deletions.
132 changes: 132 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,138 @@ mod tests {
assert!(stream_messages.is_closed());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_send_message_when_out_of_sync() {
let alix = new_test_client().await;
let bo = new_test_client().await;
let caro = new_test_client().await;
let davon = new_test_client().await;
let eri = new_test_client().await;
let frankie = new_test_client().await;

let alix_group = alix
.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

bo.conversations().sync().await.unwrap();
let bo_group = bo.group(alix_group.id()).unwrap();

bo_group.send("bo1".as_bytes().to_vec()).await.unwrap();
alix_group.send("alix1".as_bytes().to_vec()).await.unwrap();

// Move the group forward by 3 epochs (as Alix's max_past_epochs is
// configured to 3) without Bo syncing
alix_group
.add_members(vec![
caro.account_address.clone(),
davon.account_address.clone(),
])
.await
.unwrap();
alix_group
.remove_members(vec![
caro.account_address.clone(),
davon.account_address.clone(),
])
.await
.unwrap();
alix_group
.add_members(vec![
eri.account_address.clone(),
frankie.account_address.clone(),
])
.await
.unwrap();

// Bo sends messages to Alix while 3 epochs behind
bo_group.send("bo3".as_bytes().to_vec()).await.unwrap();
alix_group.send("alix3".as_bytes().to_vec()).await.unwrap();
bo_group.send("bo4".as_bytes().to_vec()).await.unwrap();
bo_group.send("bo5".as_bytes().to_vec()).await.unwrap();

alix_group.sync().await.unwrap();
let alix_messages = alix_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();

bo_group.sync().await.unwrap();
let bo_messages = bo_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
assert_eq!(bo_messages.len(), 9);
assert_eq!(alix_messages.len(), 10);

assert_eq!(
bo_messages[bo_messages.len() - 1].id,
alix_messages[alix_messages.len() - 1].id
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_add_members_when_out_of_sync() {
let alix = new_test_client().await;
let bo = new_test_client().await;
let caro = new_test_client().await;
let davon = new_test_client().await;
let eri = new_test_client().await;
let frankie = new_test_client().await;

let alix_group = alix
.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

bo.conversations().sync().await.unwrap();
let bo_group = bo.group(alix_group.id()).unwrap();

bo_group.send("bo1".as_bytes().to_vec()).await.unwrap();
alix_group.send("alix1".as_bytes().to_vec()).await.unwrap();

// Move the group forward by 3 epochs (as Alix's max_past_epochs is
// configured to 3) without Bo syncing
alix_group
.add_members(vec![
caro.account_address.clone(),
davon.account_address.clone(),
])
.await
.unwrap();
alix_group
.remove_members(vec![
caro.account_address.clone(),
davon.account_address.clone(),
])
.await
.unwrap();
alix_group
.add_members(vec![eri.account_address.clone()])
.await
.unwrap();

// Bo adds a member while 3 epochs behind
bo_group
.add_members(vec![frankie.account_address.clone()])
.await
.unwrap();

bo_group.sync().await.unwrap();
let bo_members = bo_group.list_members().unwrap();
assert_eq!(bo_members.len(), 4);

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
assert_eq!(alix_members.len(), 4);
}

// test is also showing intermittent failures with database locked msg
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
Expand Down
7 changes: 5 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,12 @@ impl MlsGroup {
let message_id = self.prepare_message(message, &conn);

// Skipping a full sync here and instead just firing and forgetting
if let Err(err) = self.publish_intents(conn, client).await {
if let Err(err) = self.publish_intents(conn.clone(), client).await {
log::error!("Send: error publishing intents: {:?}", err);
}

self.sync_until_last_intent_resolved(conn, client).await?;

message_id
}

Expand All @@ -459,7 +461,8 @@ impl MlsGroup {
let update_interval = Some(5_000_000);
self.maybe_update_installations(conn.clone(), update_interval, client)
.await?;
self.publish_intents(conn, client).await?;
self.publish_intents(conn.clone(), client).await?;
self.sync_until_last_intent_resolved(conn, client).await?;
Ok(())
}

Expand Down
76 changes: 71 additions & 5 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use openmls::{
credentials::BasicCredential,
extensions::Extensions,
framing::{MlsMessageOut, ProtocolMessage},
group::GroupEpoch,
prelude::{
tls_codec::{Deserialize, Serialize},
LeafNodeIndex, MlsGroup as OpenMlsGroup, MlsMessageBodyIn, MlsMessageIn, PrivateMessageIn,
Expand Down Expand Up @@ -133,6 +134,28 @@ impl MlsGroup {
Ok(())
}

pub(super) async fn sync_until_last_intent_resolved<ApiClient>(
&self,
conn: DbConnection,
client: &Client<ApiClient>,
) -> Result<(), GroupError>
where
ApiClient: XmtpApi,
{
let intents = conn.find_group_intents(
self.group_id.clone(),
Some(vec![IntentState::ToPublish, IntentState::Published]),
None,
)?;

if intents.is_empty() {
return Ok(());
}

self.sync_until_intent_resolved(conn, intents[intents.len() - 1].id, client)
.await
}

/**
* Sync the group and wait for the intent to be deleted
* Group syncing may involve picking up messages unrelated to the intent, so simply checking for errors
Expand Down Expand Up @@ -188,6 +211,37 @@ impl MlsGroup {
Err(last_err.unwrap_or(GroupError::Generic("failed to wait for intent".to_string())))
}

fn is_valid_epoch(
inbox_id: InboxId,
intent_id: i32,
group_epoch: GroupEpoch,
message_epoch: GroupEpoch,
max_past_epochs: usize,
) -> bool {
if message_epoch.as_u64() + max_past_epochs as u64 <= group_epoch.as_u64() {
log::warn!(
"[{}] own message epoch {} is {} or more less than group epoch {} for intent {}. Retrying message",
inbox_id,
message_epoch,
max_past_epochs,
group_epoch,
intent_id
);
return false;
} else if message_epoch.as_u64() > group_epoch.as_u64() {
// Should not happen, logging proactively
log::error!(
"[{}] own message epoch {} is greater than group epoch {} for intent {}. Retrying message",
inbox_id,
message_epoch,
group_epoch,
intent_id
);
return false;
}
true
}

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(level = "trace", skip_all)]
async fn process_own_message<ApiClient: XmtpApi>(
Expand All @@ -203,11 +257,15 @@ impl MlsGroup {
if intent.state == IntentState::Committed {
return Ok(());
}
let message_epoch = message.epoch();
let group_epoch = openmls_group.epoch();
debug!(
"[{}] processing own message for intent {} / {:?}",
"[{}] processing own message for intent {} / {:?}, group epoch: {}, message_epoch: {}",
self.context.inbox_id(),
intent.id,
intent.kind
intent.kind,
group_epoch,
message_epoch
);

let conn = provider.conn();
Expand All @@ -223,8 +281,6 @@ impl MlsGroup {
let maybe_pending_commit = openmls_group.pending_commit();
// We don't get errors with merge_pending_commit when there are no commits to merge
if maybe_pending_commit.is_none() {
let message_epoch = message.epoch();
let group_epoch = openmls_group.epoch();
debug!(
"no pending commit to merge. Group epoch: {}. Message epoch: {}",
group_epoch, message_epoch
Expand Down Expand Up @@ -281,6 +337,16 @@ impl MlsGroup {
}
}
IntentKind::SendMessage => {
if !Self::is_valid_epoch(
self.context.inbox_id(),
intent.id,
group_epoch,
message_epoch,
3, // max_past_epochs, TODO: expose from OpenMLS MlsGroup
) {
conn.set_group_intent_to_publish(intent.id)?;
return Ok(());
}
if let Some(id) = intent.message_id()? {
conn.set_delivery_status_to_published(&id, envelope_timestamp_ns)?;
}
Expand Down Expand Up @@ -532,7 +598,7 @@ impl MlsGroup {
// Intent with the payload hash matches
Ok(Some(intent)) => {
log::info!(
"client [{}] is about to process own envelope [{}]",
"client [{}] is about to process own envelope [{}]",
client.inbox_id(),
envelope.id
);
Expand Down

0 comments on commit ffa0564

Please sign in to comment.