Skip to content

Commit

Permalink
Clean up empty intents (#969)
Browse files Browse the repository at this point in the history
* write a test to reproduce the issue

* add more messages

* Fix test

* Improve test

* Make test reliable

* Fix failing test

---------

Co-authored-by: Naomi Plasterer <[email protected]>
  • Loading branch information
neekolas and nplasterer authored Aug 16, 2024
1 parent 8b47df3 commit 657d696
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 47 deletions.
109 changes: 109 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,115 @@ mod tests {
assert_eq!(client2_members.len(), 2);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_create_new_installations_does_not_fork_group() {
let bo_wallet_key = &mut rng();
let bo_wallet = xmtp_cryptography::utils::LocalWallet::new(bo_wallet_key);

// Create clients
let alix = new_test_client().await;
let bo = new_test_client_with_wallet(bo_wallet.clone()).await;
let caro = new_test_client().await;

// Alix begins a stream for all messages
let message_callbacks = RustStreamCallback::default();
let stream_messages = alix
.conversations()
.stream_all_messages(Box::new(message_callbacks.clone()))
.await;
stream_messages.wait_for_ready().await;

// Alix creates a group with Bo and Caro
let group = alix
.conversations()
.create_group(
vec![bo.account_address.clone(), caro.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

// Alix and Caro Sync groups
alix.conversations().sync().await.unwrap();
bo.conversations().sync().await.unwrap();
caro.conversations().sync().await.unwrap();

// Alix and Caro find the group
let alix_group = alix.group(group.id()).unwrap();
let bo_group = bo.group(group.id()).unwrap();
let caro_group = caro.group(group.id()).unwrap();

// Alix sends a message in the group
alix_group
.send("First message".as_bytes().to_vec())
.await
.unwrap();

// Caro sends a message in the group
caro_group
.send("Second message".as_bytes().to_vec())
.await
.unwrap();

// Bo logs back in with a new installation
let bo2 = new_test_client_with_wallet(bo_wallet).await;

// Bo begins a stream for all messages
let bo_message_callbacks = RustStreamCallback::default();
let bo_stream_messages = bo2
.conversations()
.stream_all_messages(Box::new(bo_message_callbacks.clone()))
.await;
bo_stream_messages.wait_for_ready().await;

// Alix sends a message to the group
alix_group
.send("Third message".as_bytes().to_vec())
.await
.unwrap();

// New installation of bo finds the group
bo2.conversations().sync().await.unwrap();
let bo2_group = bo2.group(group.id()).unwrap();

// Bo sends a message to the group
bo2_group
.send("Fourth message".as_bytes().to_vec())
.await
.unwrap();

// Caro sends a message in the group
caro_group
.send("Fifth message".as_bytes().to_vec())
.await
.unwrap();

alix_group.sync().await.unwrap();
bo_group.sync().await.unwrap();
bo2_group.sync().await.unwrap();
caro_group.sync().await.unwrap();

// Get the message count for all the clients
let caro_messages = caro_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
let alix_messages = alix_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
let bo_messages = bo_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
let bo2_messages = bo2_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();

assert_eq!(caro_messages.len(), 5);
assert_eq!(alix_messages.len(), 6);
assert_eq!(bo_messages.len(), 5);
// Bo 2 only sees three messages since it joined after the first 2 were sent
assert_eq!(bo2_messages.len(), 3);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_send_messages_when_epochs_behind() {
let alix = new_test_client().await;
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ mod tests {
bola_group
.add_members_by_inbox_id(&bola, vec![charlie.inbox_id()])
.await
.expect_err("expected error");
.expect("bola's add should succeed in a no-op");

amal_group
.receive(&amal.store().conn().unwrap(), &amal)
Expand Down Expand Up @@ -1494,8 +1494,8 @@ mod tests {
None,
)
.unwrap();
// Bola should have one uncommitted intent in `Error::Failed` state for the failed attempt at adding Charlie, who is already in the group
assert_eq!(bola_failed_intents.len(), 1);
// Bola's attempted add should be deleted, since it will have been a no-op on the second try
assert_eq!(bola_failed_intents.len(), 0);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down
91 changes: 48 additions & 43 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ impl MlsGroup {
state: IntentState::Error,
..
})) => {
log::warn!("not retrying intent ID {id}. since it is in state Error",);
log::warn!(
"not retrying intent ID {id}. since it is in state Error. {:?}",
last_err
);
return Err(last_err.unwrap_or(GroupError::Generic(
"Group intent could not be committed".to_string(),
)));
Expand Down Expand Up @@ -803,50 +806,52 @@ impl MlsGroup {
})
);

if let Err(err) = result {
log::error!("error getting publish intent data {:?}", err);
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
log::error!("intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn()
.increment_intent_publish_attempt_count(intent.id)?;
}

return Err(err);
}
match result {
Err(err) => {
log::error!("error getting publish intent data {:?}", err);
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
log::error!("intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn()
.increment_intent_publish_attempt_count(intent.id)?;
}

if let Some((payload, post_commit_data)) = result.expect("checked") {
let payload_slice = payload.as_slice();
return Err(err);
}
Ok(Some((payload, post_commit_data))) => {
let payload_slice = payload.as_slice();

client
.api_client
.send_group_messages(vec![payload_slice])
.await?;
log::info!(
"[{}] published intent [{}] of type [{}]",
client.inbox_id(),
intent.id,
intent.kind
);
provider.conn().set_group_intent_published(
intent.id,
sha256(payload_slice),
post_commit_data,
)?;
log::debug!(
"client [{}] set stored intent [{}] to state `published`",
client.inbox_id(),
intent.id
);
} else {
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
client
.api_client
.send_group_messages(vec![payload_slice])
.await?;
log::info!(
"[{}] published intent [{}] of type [{}]",
client.inbox_id(),
intent.id,
intent.kind
);
provider.conn().set_group_intent_published(
intent.id,
sha256(payload_slice),
post_commit_data,
)?;
log::debug!(
"client [{}] set stored intent [{}] to state `published`",
client.inbox_id(),
intent.id
);
}
Ok(None) => {
log::info!("Skipping intent because no publish data returned");
let deleter: &dyn Delete<StoredGroupIntent, Key = i32> = &provider.conn();
deleter.delete(intent.id)?;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ where
let (tx, rx) = oneshot::channel();

let handle = tokio::spawn(async move {
let mut stream = client.stream_conversations().await.unwrap();
let mut stream = client.stream_conversations().await?;
let _ = tx.send(());
while let Some(convo) = stream.next().await {
convo_callback(convo)
Expand Down

0 comments on commit 657d696

Please sign in to comment.