Skip to content

Commit

Permalink
Save group message content types (#1435)
Browse files Browse the repository at this point in the history
Part of #1403

Adds fields to `group_messages` table corresponding to `ContentTypeId` proto and saves them when sending and receiving messages.

`ContentTypeId` for reference: 

https://github.com/xmtp/proto/blob/404a0f41a6dc00f5de5fcfc24856c8b4e417fe59/proto/mls/message_contents/content.proto#L10-L16

```proto
// ContentTypeId is used to identify the type of content stored in a Message.
message ContentTypeId {
  string authority_id = 1;  // authority governing this content type
  string type_id = 2;       // type identifier
  uint32 version_major = 3; // major version of the type
  uint32 version_minor = 4; // minor version of the type
}
```
  • Loading branch information
cameronvoell authored Dec 20, 2024
1 parent 1c5c59f commit db84199
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 30 deletions.
6 changes: 6 additions & 0 deletions xmtp_content_types/src/attachment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct AttachmentCodec {}

//. Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-remote-attachment/src/Attachment.ts
impl AttachmentCodec {
pub const TYPE_ID: &'static str = "attachment";
}
2 changes: 1 addition & 1 deletion xmtp_content_types/src/group_updated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct GroupUpdatedCodec {}

impl GroupUpdatedCodec {
const AUTHORITY_ID: &'static str = "xmtp.org";
const TYPE_ID: &'static str = "group_updated";
pub const TYPE_ID: &'static str = "group_updated";
}

impl ContentCodec<GroupUpdated> for GroupUpdatedCodec {
Expand Down
12 changes: 6 additions & 6 deletions xmtp_content_types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
pub mod attachment;
pub mod group_updated;
pub mod membership_change;
pub mod reaction;
pub mod read_receipt;
pub mod remote_attachment;
pub mod reply;
pub mod text;
pub mod transaction_reference;

use thiserror::Error;
use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent};

pub enum ContentType {
GroupMembershipChange,
GroupUpdated,
Text,
}

#[derive(Debug, Error)]
pub enum CodecError {
#[error("encode error {0}")]
Expand Down
2 changes: 1 addition & 1 deletion xmtp_content_types/src/membership_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct GroupMembershipChangeCodec {}

impl GroupMembershipChangeCodec {
const AUTHORITY_ID: &'static str = "xmtp.org";
const TYPE_ID: &'static str = "group_membership_change";
pub const TYPE_ID: &'static str = "group_membership_change";
}

impl ContentCodec<GroupMembershipChanges> for GroupMembershipChangeCodec {
Expand Down
6 changes: 6 additions & 0 deletions xmtp_content_types/src/reaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct ReactionCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-reaction/src/Reaction.ts
impl ReactionCodec {
pub const TYPE_ID: &'static str = "reaction";
}
6 changes: 6 additions & 0 deletions xmtp_content_types/src/read_receipt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct ReadReceiptCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-read-receipt/src/ReadReceipt.ts
impl ReadReceiptCodec {
pub const TYPE_ID: &'static str = "readReceipt";
}
6 changes: 6 additions & 0 deletions xmtp_content_types/src/remote_attachment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct RemoteAttachmentCodec {}

//. Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-remote-attachment/src/RemoteAttachment.ts
impl RemoteAttachmentCodec {
pub const TYPE_ID: &'static str = "remoteStaticAttachment";
}
6 changes: 6 additions & 0 deletions xmtp_content_types/src/reply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct ReplyCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-reply/src/Reply.ts
impl ReplyCodec {
pub const TYPE_ID: &'static str = "reply";
}
2 changes: 1 addition & 1 deletion xmtp_content_types/src/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct TextCodec {}

impl TextCodec {
const AUTHORITY_ID: &'static str = "xmtp.org";
const TYPE_ID: &'static str = "text";
pub const TYPE_ID: &'static str = "text";
const ENCODING_KEY: &'static str = "encoding";
const ENCODING_UTF8: &'static str = "UTF-8";
}
Expand Down
6 changes: 6 additions & 0 deletions xmtp_content_types/src/transaction_reference.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct TransactionReferenceCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-transaction-reference/src/TransactionReference.ts
impl TransactionReferenceCodec {
pub const TYPE_ID: &'static str = "transactionReference";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE group_messages
DROP COLUMN authority_id;

ALTER TABLE group_messages
DROP COLUMN version_major;

ALTER TABLE group_messages
DROP COLUMN version_minor;

ALTER TABLE group_messages
DROP COLUMN content_type;
11 changes: 11 additions & 0 deletions xmtp_mls/migrations/2024-12-18-175338_messages_content_type/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE group_messages
ADD COLUMN content_type INTEGER NOT NULL DEFAULT 0;

ALTER TABLE group_messages
ADD COLUMN version_minor INTEGER NOT NULL DEFAULT 0;

ALTER TABLE group_messages
ADD COLUMN version_major INTEGER NOT NULL DEFAULT 0;

ALTER TABLE group_messages
ADD COLUMN authority_id TEXT NOT NULL DEFAULT '';
44 changes: 36 additions & 8 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::{
GRPC_DATA_LIMIT, HMAC_SALT, MAX_GROUP_SIZE, MAX_INTENT_PUBLISH_ATTEMPTS, MAX_PAST_EPOCHS,
SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS,
},
groups::device_sync::DeviceSyncContent,
groups::{
device_sync::preference_sync::UserPreferenceUpdate, intents::UpdateMetadataIntentData,
device_sync::{preference_sync::UserPreferenceUpdate, DeviceSyncContent},
intents::UpdateMetadataIntentData,
validated_commit::ValidatedCommit,
},
hpke::{encrypt_welcome, HpkeError},
Expand All @@ -25,15 +25,14 @@ use crate::{
storage::{
db_connection::DbConnection,
group_intent::{IntentKind, IntentState, StoredGroupIntent, ID},
group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage},
group_message::{ContentType, DeliveryStatus, GroupMessageKind, StoredGroupMessage},
refresh_state::EntityKind,
serialization::{db_deserialize, db_serialize},
sql_key_store,
user_preferences::StoredUserPreferences,
StorageError,
},
subscriptions::LocalEvents,
subscriptions::SyncMessage,
subscriptions::{LocalEvents, SyncMessage},
utils::{hash::sha256, id::calculate_message_id, time::hmac_epoch},
xmtp_openmls_provider::XmtpOpenMlsProvider,
Delete, Fetch, StoreOrIgnore,
Expand All @@ -44,7 +43,7 @@ use hmac::{Hmac, Mac};
use openmls::{
credentials::BasicCredential,
extensions::Extensions,
framing::{ContentType, ProtocolMessage},
framing::{ContentType as MlsContentType, ProtocolMessage},
group::{GroupEpoch, StagedCommit},
key_packages::KeyPackage,
prelude::{
Expand Down Expand Up @@ -546,6 +545,7 @@ where
})) => {
let message_id =
calculate_message_id(&self.group_id, &content, &idempotency_key);
let queryable_content_fields = Self::extract_queryable_content_fields(&content);
StoredGroupMessage {
id: message_id,
group_id: self.group_id.clone(),
Expand All @@ -555,6 +555,10 @@ where
sender_installation_id,
sender_inbox_id,
delivery_status: DeliveryStatus::Published,
content_type: queryable_content_fields.content_type,
version_major: queryable_content_fields.version_major,
version_minor: queryable_content_fields.version_minor,
authority_id: queryable_content_fields.authority_id,
}
.store_or_ignore(provider.conn_ref())?
}
Expand Down Expand Up @@ -583,6 +587,10 @@ where
sender_installation_id,
sender_inbox_id: sender_inbox_id.clone(),
delivery_status: DeliveryStatus::Published,
content_type: ContentType::Unknown,
version_major: 0,
version_minor: 0,
authority_id: "unknown".to_string(),
}
.store_or_ignore(provider.conn_ref())?;

Expand Down Expand Up @@ -612,6 +620,10 @@ where
sender_installation_id,
sender_inbox_id,
delivery_status: DeliveryStatus::Published,
content_type: ContentType::Unknown,
version_major: 0,
version_minor: 0,
authority_id: "unknown".to_string(),
}
.store_or_ignore(provider.conn_ref())?;

Expand Down Expand Up @@ -712,7 +724,7 @@ where
discriminant(&other),
)),
}?;
if !allow_epoch_increment && message.content_type() == ContentType::Commit {
if !allow_epoch_increment && message.content_type() == MlsContentType::Commit {
return Err(GroupMessageProcessingError::EpochIncrementNotAllowed);
}

Expand Down Expand Up @@ -933,7 +945,19 @@ where
encoded_payload_bytes.as_slice(),
&timestamp_ns.to_string(),
);

let content_type = match encoded_payload.r#type {
Some(ct) => ct,
None => {
tracing::warn!("Missing content type in encoded payload, using default values");
// Default content type values
xmtp_proto::xmtp::mls::message_contents::ContentTypeId {
authority_id: "unknown".to_string(),
type_id: "unknown".to_string(),
version_major: 0,
version_minor: 0,
}
}
};
let msg = StoredGroupMessage {
id: message_id,
group_id: group_id.to_vec(),
Expand All @@ -943,6 +967,10 @@ where
sender_installation_id,
sender_inbox_id,
delivery_status: DeliveryStatus::Published,
content_type: content_type.type_id.into(),
version_major: content_type.version_major as i32,
version_minor: content_type.version_minor as i32,
authority_id: content_type.authority_id.to_string(),
};

msg.store_or_ignore(conn)?;
Expand Down
50 changes: 48 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use self::{
intents::IntentError,
validated_commit::CommitValidationError,
};
use crate::storage::StorageError;
use crate::storage::{group_message::ContentType, StorageError};
use xmtp_common::time::now_ns;
use xmtp_proto::xmtp::mls::{
api::v1::{
Expand All @@ -67,7 +67,7 @@ use xmtp_proto::xmtp::mls::{
},
message_contents::{
plaintext_envelope::{Content, V1},
PlaintextEnvelope,
EncodedContent, PlaintextEnvelope,
},
};

Expand Down Expand Up @@ -309,6 +309,38 @@ pub enum UpdateAdminListType {
RemoveSuper,
}

/// Fields extracted from content of a message that should be stored in the DB
pub struct QueryableContentFields {
pub content_type: ContentType,
pub version_major: i32,
pub version_minor: i32,
pub authority_id: String,
}

impl Default for QueryableContentFields {
fn default() -> Self {
Self {
content_type: ContentType::Unknown, // Or whatever the appropriate default is
version_major: 0,
version_minor: 0,
authority_id: String::new(),
}
}
}

impl From<EncodedContent> for QueryableContentFields {
fn from(content: EncodedContent) -> Self {
let content_type_id = content.r#type.unwrap_or_default();

QueryableContentFields {
content_type: content_type_id.type_id.into(),
version_major: content_type_id.version_major as i32,
version_minor: content_type_id.version_minor as i32,
authority_id: content_type_id.authority_id.to_string(),
}
}
}

/// Represents a group, which can contain anywhere from 1 to MAX_GROUP_SIZE inboxes.
///
/// This is a wrapper around OpenMLS's `MlsGroup` that handles our application-level configuration
Expand Down Expand Up @@ -706,6 +738,15 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
Ok(message_id)
}

/// Helper function to extract queryable content fields from a message
fn extract_queryable_content_fields(message: &[u8]) -> QueryableContentFields {
// Return early with default if decoding fails or type is missing
EncodedContent::decode(message)
.inspect_err(|e| tracing::debug!("Failed to decode message as EncodedContent: {}", e))
.map(QueryableContentFields::from)
.unwrap_or_default()
}

/// Prepare a [`IntentKind::SendMessage`] intent, and [`StoredGroupMessage`] on this users XMTP [`Client`].
///
/// # Arguments
Expand Down Expand Up @@ -734,6 +775,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {

// store this unpublished message locally before sending
let message_id = calculate_message_id(&self.group_id, message, &now.to_string());
let queryable_content_fields = Self::extract_queryable_content_fields(message);
let group_message = StoredGroupMessage {
id: message_id.clone(),
group_id: self.group_id.clone(),
Expand All @@ -743,6 +785,10 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
sender_installation_id: self.context().installation_public_key().into(),
sender_inbox_id: self.context().inbox_id().to_string(),
delivery_status: DeliveryStatus::Unpublished,
content_type: queryable_content_fields.content_type,
version_major: queryable_content_fields.version_major,
version_minor: queryable_content_fields.version_minor,
authority_id: queryable_content_fields.authority_id,
};
group_message.store(provider.conn_ref())?;

Expand Down
Loading

0 comments on commit db84199

Please sign in to comment.