diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index f15883138cd..22bae3f4543 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -2731,6 +2731,7 @@ mod test { }, media::get_content_thumbnail::Method, membership::Invite3pidInit, + message::get_message_events::Direction, session::get_login_types::LoginType, uiaa::{AuthData, UiaaResponse}, }, @@ -3895,4 +3896,150 @@ mod test { assert_eq!(client.whoami().await.unwrap().user_id, user_id); } + + #[tokio::test] + async fn messages() { + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + let client = logged_in_client().await; + + let _m = mock("GET", Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string())) + .with_status(200) + .with_body(test_json::MORE_SYNC.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let mocked_messages = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/rooms/.*/messages.*from=t392-516_47314_0_7_1_1_1_11444_1.*" + .to_string(), + ), + ) + .with_status(200) + .with_body(test_json::SYNC_ROOM_MESSAGES_BATCH_1.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let mocked_context = + mock("GET", Matcher::Regex(r"^/_matrix/client/r0/rooms/.*/context/.*".to_string())) + .with_status(200) + .with_body(test_json::CONTEXT_MESSAGE.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let _ = client.sync_once(sync_settings).await.unwrap(); + + let room = client.get_joined_room(&room_id).unwrap(); + + // Try to get the timeline starting at an event not known to the store. + let events = room + .messages(&event_id!("$f3h4d129462ha:example.com"), None, 3, Direction::Backward) + .await + .unwrap() + .unwrap(); + + let expected_events = [ + event_id!("$f3h4d129462ha:example.com"), + event_id!("$143273582443PhrSnbefore1:example.org"), + event_id!("$143273582443PhrSnbefore2:example.org"), + ]; + + assert_eq!(events.len(), expected_events.len()); + assert!(!events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events) + .any(|(a, b)| &a != b)); + + mocked_context.assert(); + + let expected_events = [ + event_id!("$152037280074GZeOm:localhost"), + event_id!("$1444812213350496Caaaf:example.com"), + event_id!("$1444812213350496Cbbbf:example.com"), + event_id!("$1444812213350496Ccccf:example.com"), + ]; + + let events = room + .messages( + &event_id!("$152037280074GZeOm:localhost"), + None, + expected_events.len(), + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), expected_events.len()); + assert!(!events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events) + .any(|(a, b)| &a != b)); + + let events = room + .messages( + &event_id!("$1444812213350496Ccccf:example.com"), + None, + expected_events.len(), + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), expected_events.len()); + assert!(!events + .iter() + .rev() + .map(|event| event.event_id()) + .zip(&expected_events) + .any(|(a, b)| &a != b)); + + let end_event = event_id!("$1444812213350496Cbbbf:example.com"); + let events = room + .messages( + &event_id!("$152037280074GZeOm:localhost"), + Some(&end_event), + expected_events.len(), + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 3); + assert_eq!(events.last().unwrap().event_id(), end_event); + assert!(!events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events[0..4]) + .any(|(a, b)| &a != b)); + + let end_event = event_id!("$1444812213350496Cbbbf:example.com"); + let events = room + .messages( + &event_id!("$1444812213350496Ccccf:example.com"), + Some(&end_event), + expected_events.len(), + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 2); + assert_eq!(events.last().unwrap().event_id(), end_event); + assert!(!events + .iter() + .rev() + .map(|event| event.event_id()) + .zip(&expected_events[2..]) + .any(|(a, b)| &a != b)); + + mocked_messages.assert(); + } } diff --git a/matrix_sdk/src/room/common.rs b/matrix_sdk/src/room/common.rs index cb425425433..f9e4b3c26bf 100644 --- a/matrix_sdk/src/room/common.rs +++ b/matrix_sdk/src/room/common.rs @@ -1,17 +1,20 @@ -use std::{ops::Deref, sync::Arc}; +use std::{cmp::min, convert::TryFrom, ops::Deref, sync::Arc}; -use matrix_sdk_base::deserialized_responses::MembersResponse; +use matrix_sdk_base::deserialized_responses::{MembersResponse, SyncRoomEvent}; use matrix_sdk_common::locks::Mutex; use ruma::{ api::client::r0::{ + context::get_context, media::{get_content, get_content_thumbnail}, membership::{get_member_events, join_room_by_id, leave_room}, - message::get_message_events, + message::{get_message_events, get_message_events::Direction}, }, - UserId, + events::AnyRoomEvent, + serde::Raw, + EventId, UserId, }; -use crate::{BaseRoom, Client, Result, RoomMember}; +use crate::{BaseRoom, Client, Result, RoomMember, UInt}; /// A struct containing methods that are common for Joined, Invited and Left /// Rooms @@ -110,27 +113,34 @@ impl Common { } } - /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and - /// returns a `get_message_events::Response` that contains a chunk of - /// room and state events (`AnyRoomEvent` and `AnyStateEvent`). + /// Gets a slice of the timeline of this room + /// + /// Returns a slice of the timeline between `start` and `end`, no longer + /// then `limit`. If the number of events is fewer then `limit` it means + /// that in the given direction no more events exist. + /// If the timeline doesn't contain an event with the given `start` `None` + /// is returned. /// /// # Arguments /// - /// * `request` - The easiest way to create this request is using the - /// `get_message_events::Request` itself. + /// * `start` - An `EventId` that indicates the start of the slice. + /// + /// * `end` - An `EventId` that indicates the end of the slice. + /// + /// * `limit` - The maximum number of events that should be returned. + /// + /// * `direction` - The direction of the search and returned events. /// /// # Examples /// ```no_run /// # use std::convert::TryFrom; /// use matrix_sdk::Client; - /// # use matrix_sdk::identifiers::room_id; - /// # use matrix_sdk::api::r0::filter::RoomEventFilter; - /// # use matrix_sdk::api::r0::message::get_message_events::Request as MessagesRequest; + /// # use matrix_sdk::identifiers::{event_id, room_id}; + /// # use matrix_sdk::api::r0::message::get_message_events::Direction; /// # use url::Url; /// /// # let homeserver = Url::parse("http://example.com").unwrap(); /// let room_id = room_id!("!roomid:example.com"); - /// let request = MessagesRequest::backward(&room_id, "t47429-4392820_219380_26003_2265"); /// /// let mut client = Client::new(homeserver).unwrap(); /// # let room = client @@ -138,15 +148,125 @@ impl Common { /// # .unwrap(); /// # use futures::executor::block_on; /// # block_on(async { - /// assert!(room.messages(request).await.is_ok()); + /// assert!(room.messages(&event_id!("$xxxxxx:example.org"), None, 10, Direction::Backward).await.is_ok()); /// # }); /// ``` pub async fn messages( &self, - request: impl Into>, - ) -> Result { - let request = request.into(); - self.client.send(request, None).await + start: &EventId, + end: Option<&EventId>, + limit: usize, + direction: Direction, + ) -> Result>> { + let room_id = self.inner.room_id(); + let events = if let Some(mut stored) = self + .client + .store() + .get_timeline(room_id, Some(start), end, Some(limit), direction.clone()) + .await? + { + // We found a gab or the end of the stored timeline. + if let Some(token) = stored.token { + let mut request = get_message_events::Request::new( + self.inner.room_id(), + &token, + direction.clone(), + ); + request.limit = + UInt::try_from((limit - stored.events.len()) as u64).unwrap_or(UInt::MAX); + + let response = self.client.send(request, None).await?; + + // FIXME: we may recevied an invalied server response that ruma considers valid + // See https://github.com/ruma/ruma/issues/644 + if response.end.is_none() && response.start.is_none() { + return Ok(Some(stored.events)); + } + + let response_events = self + .client + .base_client + .receive_messages(room_id, &direction, &response) + .await?; + + let mut response_events = if let Some(end) = end { + if let Some(position) = + response_events.iter().position(|event| &event.event_id() == end) + { + response_events.into_iter().take(position + 1).collect() + } else { + response_events + } + } else { + response_events + }; + + match direction { + Direction::Forward => { + response_events.append(&mut stored.events); + stored.events = response_events; + } + Direction::Backward => stored.events.append(&mut response_events), + } + } + stored.events + } else { + // Fallback to context API because we don't know the start event + let mut request = get_context::Request::new(room_id, start); + + // We need to take limit twice because the context api returns events before + // and after the given event + request.limit = UInt::try_from((limit * 2) as u64).unwrap_or(UInt::MAX); + + let mut context = self.client.send(request, None).await?; + + let event = if let Some(event) = context.event { + event + } else { + return Ok(None); + }; + + let mut response = get_message_events::Response::new(); + response.start = context.start; + response.end = context.end; + let before_length = context.events_before.len(); + let after_length = context.events_after.len(); + let mut events: Vec> = + context.events_after.into_iter().rev().collect(); + events.push(event); + events.append(&mut context.events_before); + response.chunk = events; + response.state = context.state; + let response_events = self + .client + .base_client + .receive_messages(room_id, &Direction::Backward, &response) + .await?; + + let response_events: Vec = match direction { + Direction::Forward => { + let lower_bound = if before_length > limit { before_length - limit } else { 0 }; + response_events[lower_bound..=before_length].to_vec() + } + Direction::Backward => response_events + [after_length..min(response_events.len(), after_length + limit)] + .to_vec(), + }; + + if let Some(end) = end { + if let Some(position) = + response_events.iter().position(|event| &event.event_id() == end) + { + response_events.into_iter().take(position + 1).collect() + } else { + response_events + } + } else { + response_events + } + }; + + Ok(Some(events)) } pub(crate) async fn request_members(&self) -> Result> { diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 7885307722d..bbae3d44827 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -25,7 +25,7 @@ use std::{ use matrix_sdk_common::{ deserialized_responses::{ AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms, - StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, + StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice, }, instant::Instant, locks::RwLock, @@ -48,7 +48,11 @@ use ruma::{ DeviceId, }; use ruma::{ - api::client::r0::{self as api, push::get_notifications::Notification}, + api::client::r0::{ + self as api, + message::get_message_events::{Direction, Response as GetMessageEventsResponse}, + push::get_notifications::Notification, + }, events::{ room::member::{MemberEventContent, MembershipState}, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, @@ -836,6 +840,15 @@ impl BaseClient { let notification_count = new_info.unread_notifications.into(); room_info.update_notification_count(notification_count); + changes.add_timeline( + &room_id, + TimelineSlice::new( + timeline.events.iter().cloned().rev().collect(), + next_batch.clone(), + timeline.prev_batch.clone(), + ), + ); + new_rooms.join.insert( room_id, JoinedRoom::new( @@ -879,6 +892,14 @@ impl BaseClient { self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes) .await; + changes.add_timeline( + &room_id, + TimelineSlice::new( + timeline.events.iter().cloned().rev().collect(), + next_batch.clone(), + timeline.prev_batch.clone(), + ), + ); changes.add_room(room_info); new_rooms .leave @@ -949,6 +970,64 @@ impl BaseClient { } } + /// Receive a successful /messages response. + /// + /// * `response` - The successful response from /messages. + pub async fn receive_messages( + &self, + room_id: &RoomId, + direction: &Direction, + response: &GetMessageEventsResponse, + ) -> Result> { + let mut changes = StateChanges::default(); + + let mut events: Vec = vec![]; + for event in &response.chunk { + #[allow(unused_mut)] + let mut event: SyncRoomEvent = event.clone().into(); + + #[cfg(feature = "encryption")] + match hoist_room_event_prev_content(&event.event) { + Ok(e) => { + if let AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted( + encrypted, + )) = e + { + if let Some(olm) = self.olm_machine().await { + if let Ok(decrypted) = olm.decrypt_room_event(&encrypted, room_id).await + { + event = decrypted; + } + } + } + } + Err(error) => { + warn!("Error deserializing event {:?}", error); + } + } + + events.push(event); + } + + let (chunk, start, end) = match direction { + Direction::Backward => { + (events.clone(), response.start.clone().unwrap(), response.end.clone()) + } + Direction::Forward => ( + events.iter().rev().cloned().collect(), + response.end.clone().unwrap(), + response.start.clone(), + ), + }; + + let timeline = TimelineSlice::new(chunk, start, end); + changes.add_timeline(room_id, timeline); + + self.store().save_changes(&changes).await?; + + Ok(events) + } + /// Receive a get member events response and convert it to a deserialized /// `MembersResponse` /// diff --git a/matrix_sdk_base/src/store/memory_store.rs b/matrix_sdk_base/src/store/memory_store.rs index 8bf113581e3..e7833fe59d3 100644 --- a/matrix_sdk_base/src/store/memory_store.rs +++ b/matrix_sdk_base/src/store/memory_store.rs @@ -21,6 +21,7 @@ use dashmap::{DashMap, DashSet}; use lru::LruCache; use matrix_sdk_common::{async_trait, instant::Instant, locks::Mutex}; use ruma::{ + api::client::r0::message::get_message_events::Direction, events::{ presence::PresenceEvent, receipt::Receipt, @@ -34,9 +35,9 @@ use ruma::{ }; use tracing::info; -use super::{Result, RoomInfo, StateChanges, StateStore}; +use super::{Result, RoomInfo, StateChanges, StateStore, StoredTimelineSlice}; use crate::{ - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, TimelineSlice}, media::{MediaRequest, UniqueKey}, }; @@ -66,6 +67,8 @@ pub struct MemoryStore { room_event_receipts: Arc>>>>, media: Arc>>>, + timeline_slices: Arc>>, + event_id_to_timeline_slice: Arc>>, } impl MemoryStore { @@ -90,6 +93,8 @@ impl MemoryStore { room_user_receipts: DashMap::new().into(), room_event_receipts: DashMap::new().into(), media: Arc::new(Mutex::new(LruCache::new(100))), + timeline_slices: DashMap::new().into(), + event_id_to_timeline_slice: DashMap::new().into(), } } @@ -271,6 +276,23 @@ impl MemoryStore { } } + for (room, timeline) in &changes.timeline { + // FIXME: make sure that we don't add events already known + for event in &timeline.events { + let event_id = event.event_id(); + + self.event_id_to_timeline_slice + .entry(room.clone()) + .or_insert_with(DashMap::new) + .insert(event_id, timeline.start.clone()); + } + + self.timeline_slices + .entry(room.clone()) + .or_insert_with(DashMap::new) + .insert(timeline.start.clone(), timeline.clone()); + } + info!("Saved changes in {:?}", now.elapsed()); Ok(()) @@ -424,6 +446,93 @@ impl MemoryStore { Ok(()) } + + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + let event_id_to_timeline_slice = self.event_id_to_timeline_slice.get(room_id).unwrap(); + let timeline_slices = self.timeline_slices.get(room_id).unwrap(); + + let slice = if let Some(start) = start { + event_id_to_timeline_slice.get(start).and_then(|s| timeline_slices.get(&*s)) + } else { + self.get_sync_token().await?.and_then(|s| timeline_slices.get(&*s)) + }; + + let mut slice = if let Some(slice) = slice { + slice + } else { + return Ok(None); + }; + + let mut timeline = StoredTimelineSlice::new(vec![], None); + + match direction { + Direction::Forward => { + loop { + timeline.events.append(&mut slice.events.iter().rev().cloned().collect()); + + timeline.token = Some(slice.start.clone()); + + let found_end = end.map_or(false, |end| { + slice.events.iter().any(|event| &event.event_id() == end) + }); + + if found_end { + break; + } + + if let Some(next_slice) = timeline_slices.get(&slice.start) { + slice = next_slice; + } else { + // No more timeline slices or a gap in the known timeline + break; + } + + if let Some(limit) = limit { + if timeline.events.len() >= limit { + break; + } + } + } + } + Direction::Backward => { + loop { + timeline.events.append(&mut slice.events.clone()); + timeline.token = slice.end.clone(); + + let found_end = end.map_or(false, |end| { + slice.events.iter().any(|event| &event.event_id() == end) + }); + + if found_end { + break; + } + + if let Some(prev_slice) = + slice.end.as_deref().and_then(|end| timeline_slices.get(end)) + { + slice = prev_slice; + } else { + // No more timeline slices or we have a gap in the known timeline + break; + } + + if let Some(limit) = limit { + if timeline.events.len() >= limit { + break; + } + } + } + } + } + Ok(Some(timeline)) + } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -555,6 +664,16 @@ impl StateStore for MemoryStore { async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { self.remove_media_content_for_uri(uri).await } + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + self.get_timeline(room_id, start, end, limit, direction).await + } } #[cfg(test)] diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index 05fadc3fb68..82e76331da7 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -23,7 +23,9 @@ use std::{ use dashmap::DashMap; use matrix_sdk_common::{async_trait, locks::RwLock, AsyncTraitDeps}; use ruma::{ - api::client::r0::push::get_notifications::Notification, + api::client::r0::{ + message::get_message_events::Direction, push::get_notifications::Notification, + }, events::{ presence::PresenceEvent, receipt::{Receipt, ReceiptEventContent}, @@ -39,7 +41,7 @@ use ruma::{ use sled::Db; use crate::{ - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent, TimelineSlice}, media::MediaRequest, rooms::{RoomInfo, RoomType}, Room, Session, @@ -280,6 +282,29 @@ pub trait StateStore: AsyncTraitDeps { /// /// * `uri` - The `MxcUri` of the media files. async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()>; + + /// Get a slice of the timeline of a room. + /// + /// # Arguments + /// + /// * `room_id` - The id of the room for which the timeline should be + /// fetched. + /// + /// * `start` - The start point from which events should be returned. + /// + /// * `end` - The end point to which events should be returned. + /// + /// * `limit` - The maximum number of events to return. + /// + /// * `direction` - The direction events should be returned. + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result>; } /// A state store wrapper for the SDK. @@ -453,6 +478,9 @@ pub struct StateChanges { pub ambiguity_maps: BTreeMap>>, /// A map of `RoomId` to a vector of `Notification`s pub notifications: BTreeMap>, + + /// A mapping of `RoomId` to a `TimelineSlice` + pub timeline: BTreeMap, } impl StateChanges { @@ -537,4 +565,27 @@ impl StateChanges { pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) { self.receipts.insert(room_id.to_owned(), event); } + + /// Update the `StateChanges` struct with the given room with a new + /// `TimelineSlice`. + pub fn add_timeline(&mut self, room_id: &RoomId, timeline: TimelineSlice) { + self.timeline.insert(room_id.to_owned(), timeline); + } +} + +/// Store state changes and pass them to the StateStore. +#[derive(Debug, Default)] +pub struct StoredTimelineSlice { + /// A start token to fetch more events if the requested slice isn't fully + /// known. + pub token: Option, + + /// The requested events + pub events: Vec, +} + +impl StoredTimelineSlice { + pub fn new(events: Vec, token: Option) -> Self { + Self { events, token } + } } diff --git a/matrix_sdk_base/src/store/sled_store/mod.rs b/matrix_sdk_base/src/store/sled_store/mod.rs index 4f3ba8f2474..b65204fc2cc 100644 --- a/matrix_sdk_base/src/store/sled_store/mod.rs +++ b/matrix_sdk_base/src/store/sled_store/mod.rs @@ -28,6 +28,7 @@ use futures::{ }; use matrix_sdk_common::async_trait; use ruma::{ + api::client::r0::message::get_message_events::Direction, events::{ presence::PresenceEvent, receipt::Receipt, @@ -46,9 +47,9 @@ use sled::{ use tracing::info; use self::store_key::{EncryptedEvent, StoreKey}; -use super::{Result, RoomInfo, StateChanges, StateStore, StoreError}; +use super::{Result, RoomInfo, StateChanges, StateStore, StoreError, StoredTimelineSlice}; use crate::{ - deserialized_responses::MemberEvent, + deserialized_responses::{MemberEvent, SyncRoomEvent}, media::{MediaRequest, UniqueKey}, }; @@ -118,6 +119,44 @@ impl EncodeKey for (&str, &str) { } } +impl EncodeKey for (&str, usize) { + fn encode(&self) -> Vec { + [ + self.0.as_bytes(), + &[ENCODE_SEPARATOR], + &self.1.to_be_bytes().to_vec(), + &[ENCODE_SEPARATOR], + ] + .concat() + } +} + +impl EncodeKey for (usize, usize) { + fn encode(&self) -> Vec { + [self.0.to_be_bytes(), self.1.to_be_bytes()].concat() + } +} + +impl EncodeKey for (&str, usize, usize) { + fn encode(&self) -> Vec { + [ + self.0.as_bytes(), + &[ENCODE_SEPARATOR], + &self.1.to_be_bytes().to_vec(), + &[ENCODE_SEPARATOR], + &self.2.to_be_bytes().to_vec(), + &[ENCODE_SEPARATOR], + ] + .concat() + } +} + +impl EncodeKey for usize { + fn encode(&self) -> Vec { + self.to_be_bytes().to_vec() + } +} + impl EncodeKey for (&str, &str, &str) { fn encode(&self) -> Vec { [ @@ -167,6 +206,19 @@ pub fn decode_key_value(key: &[u8], position: usize) -> Option { values.get(position).map(|s| String::from_utf8_lossy(s).to_string()) } +pub fn decode_batch_idx_position(key: &[u8]) -> Option<(usize, usize)> { + let (first, second) = key.split_at((usize::BITS / u8::BITS) as usize); + + let batch_idx = usize::from_be_bytes((first).try_into().ok()?); + let position = usize::from_be_bytes((second).try_into().ok()?); + + Some((batch_idx, position)) +} + +pub fn decode_usize(key: &[u8]) -> Option { + Some(usize::from_be_bytes(key.try_into().ok()?)) +} + #[derive(Clone)] pub struct SledStore { path: Option, @@ -189,6 +241,16 @@ pub struct SledStore { room_user_receipts: Tree, room_event_receipts: Tree, media: Tree, + // Map an EventId to the batch index and position in the batch + event_id_to_position: Tree, + // List of batches and the events + timeline_events: Tree, + batch_idx_to_start_token: Tree, + batch_idx_to_end_token: Tree, + start_token_to_batch_idx_position: Tree, + end_token_to_batch_idx_position: Tree, + // Keep track of the highest batch index + highest_batch_idx: Tree, } impl std::fmt::Debug for SledStore { @@ -226,6 +288,15 @@ impl SledStore { let media = db.open_tree("media")?; + let event_id_to_position = db.open_tree("event_id_to_position")?; + let timeline_events = db.open_tree("events")?; + let batch_idx_to_start_token = db.open_tree("batch_idx_to_start_token")?; + let batch_idx_to_end_token = db.open_tree("batch_idx_to_end_token")?; + let start_token_to_batch_idx_position = + db.open_tree("start_token_to_batch_idx_position")?; + let end_token_to_batch_idx_position = db.open_tree("end_token_to_batch_idx_position")?; + let highest_batch_idx = db.open_tree("highest_batch_idx")?; + Ok(Self { path, inner: db, @@ -247,6 +318,13 @@ impl SledStore { room_user_receipts, room_event_receipts, media, + event_id_to_position, + timeline_events, + batch_idx_to_start_token, + batch_idx_to_end_token, + start_token_to_batch_idx_position, + end_token_to_batch_idx_position, + highest_batch_idx, }) } @@ -504,9 +582,29 @@ impl SledStore { ret?; - let ret: Result<(), TransactionError> = - (&self.room_user_receipts, &self.room_event_receipts).transaction( - |(room_user_receipts, room_event_receipts)| { + let ret: Result<(), TransactionError> = ( + &self.room_user_receipts, + &self.room_event_receipts, + &self.event_id_to_position, + &self.timeline_events, + &self.batch_idx_to_start_token, + &self.batch_idx_to_end_token, + &self.start_token_to_batch_idx_position, + &self.end_token_to_batch_idx_position, + &self.highest_batch_idx, + ) + .transaction( + |( + room_user_receipts, + room_event_receipts, + event_id_to_position, + timeline_events, + batch_idx_to_start_token, + batch_idx_to_end_token, + start_token_to_batch_idx_position, + end_token_to_batch_idx_position, + highest_batch_idx, + )| { for (room, content) in &changes.receipts { for (event_id, receipts) in &content.0 { for (receipt_type, receipts) in receipts { @@ -550,6 +648,129 @@ impl SledStore { } } + for (room, timeline) in &changes.timeline { + // Skip empty timeline slices + if timeline.events.is_empty() { + continue; + } + + let expandable = { + // Handle overlapping slices. + let first_event = timeline.events.first().unwrap().event_id(); + let last_event = timeline.events.last().unwrap().event_id(); + + let found_first = if let Some((batch_idx, current_position)) = + event_id_to_position + .get((room.as_str(), first_event.as_str()).encode())? + .map(|e| decode_batch_idx_position(&e).unwrap()) + { + Some((false, true, batch_idx, current_position)) + } else { + None + }; + + let found_last = if let Some((batch_idx, current_position)) = + event_id_to_position + .get((room.as_str(), last_event.as_str()).encode())? + .map(|e| decode_batch_idx_position(&e).unwrap()) + { + let current_position = current_position - timeline.events.len() + 1; + + Some((true, false, batch_idx, current_position)) + } else { + None + }; + + // The store already knowns all events of this slice. + if found_first.is_some() && found_last.is_some() { + continue; + } + + found_first.or(found_last) + }; + + // Lookup the previous or next batch + let expandable = if let Some(expandable) = expandable { + Some(expandable) + } else if let Some((batch_idx, position)) = end_token_to_batch_idx_position + .remove((room.as_str(), timeline.start.as_str()).encode())? + .map(|e| decode_batch_idx_position(&e).unwrap()) + { + Some((false, true, batch_idx, position)) + } else if let Some(end) = &timeline.end { + if let Some((batch_idx, position)) = start_token_to_batch_idx_position + .remove((room.as_str(), end.as_str()).encode())? + .map(|e| decode_batch_idx_position(&e).unwrap()) + { + let position = position - timeline.events.len(); + Some((true, false, batch_idx, position)) + } else { + None + } + } else { + None + }; + + let (new_start_token, new_end_token, batch_idx, current_position) = + if let Some(expandable) = expandable { + expandable + } else { + // If no expandable batch was found we add a new batch to the store + let new_batch_idx = highest_batch_idx + .get(room.as_str().encode())? + .map(|e| decode_usize(&e).unwrap()) + .map_or(0, |v| v + 1); + + highest_batch_idx + .insert(room.as_str().encode(), new_batch_idx.encode())?; + + (true, true, new_batch_idx, usize::MAX / 2) + }; + + if new_start_token { + batch_idx_to_start_token.insert( + (room.as_str(), batch_idx).encode(), + timeline.start.as_str().encode(), + )?; + + start_token_to_batch_idx_position.insert( + (room.as_str(), timeline.start.as_str()).encode(), + (batch_idx, current_position).encode(), + )?; + } + + if new_end_token { + if let Some(end) = &timeline.end { + batch_idx_to_end_token.insert( + (room.as_str(), batch_idx).encode(), + end.as_str().encode(), + )?; + + end_token_to_batch_idx_position.insert( + (room.as_str(), end.as_str()).encode(), + (batch_idx, current_position + timeline.events.len()).encode(), + )?; + } + } + + for (position, event) in timeline.events.iter().enumerate() { + let position = position + current_position; + let old_event = timeline_events.insert( + (room.as_str(), batch_idx, position).encode(), + self.serialize_event(event) + .map_err(ConflictableTransactionError::Abort)?, + )?; + + // A new event was inserted + if old_event.is_none() { + event_id_to_position.insert( + (room.as_str(), event.event_id().as_str()).encode(), + (batch_idx, position).encode(), + )?; + } + } + } + Ok(()) }, ); @@ -768,6 +989,223 @@ impl SledStore { Ok(self.media.apply_batch(batch)?) } + + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + let beginning = if let Some(start) = start { + self.event_id_to_position + .get((room_id.as_str(), start.as_str()).encode())? + .map(|t| decode_batch_idx_position(&t).unwrap()) + } else if let Some(token) = self.get_sync_token().await? { + // The timeline beyond the sync token isn't known, so don't bother to check the + // store. + match direction { + Direction::Forward => { + return Ok(Some(StoredTimelineSlice::new(Vec::new(), Some(token)))); + } + Direction::Backward => self + .start_token_to_batch_idx_position + .get((room_id.as_str(), token.as_str()).encode())? + .map(|t| decode_batch_idx_position(&t).unwrap()), + } + } else { + return Ok(None); + }; + + let ending = if let Some(end) = end { + self.event_id_to_position + .get((room_id.as_str(), end.as_str()).encode())? + .map(|t| decode_batch_idx_position(&t).unwrap()) + } else { + None + }; + + if let Some((mut batch_idx, mut position)) = beginning { + let mut events: Vec = Vec::new(); + let mut token: Option; + + match direction { + Direction::Forward => loop { + token = None; + let store_key = self.store_key.clone(); + + let current_limit_position = + limit.map(|limit| position - (limit - events.len()) + 1); + let current_limit = if let Some((_, end_position)) = + ending.filter(|(end_batch_idx, _)| end_batch_idx == &batch_idx) + { + if let Some(current_limit_position) = current_limit_position { + if current_limit_position > end_position { + Some(current_limit_position) + } else { + Some(end_position) + } + } else { + Some(end_position) + } + } else { + current_limit_position + }; + + let range = if let Some(current_limit) = current_limit { + self.timeline_events.range( + (room_id.as_str(), batch_idx, current_limit).encode() + ..=(room_id.as_str(), batch_idx, position).encode(), + ) + } else { + self.timeline_events.range( + (room_id.as_str(), batch_idx).encode() + ..=(room_id.as_str(), batch_idx, position).encode(), + ) + }; + + let mut part: Vec = range + .rev() + .filter_map(move |e| { + deserialize_event::(store_key.clone(), &e.ok()?.1).ok() + }) + .collect(); + + events.append(&mut part); + + if let Some(limit) = limit { + if events.len() >= limit { + break; + } + } + + if let Some(end) = end { + if let Some(last) = events.last() { + if &last.event_id() == end { + break; + } + } + } + + // Not enought events where found in this batch, therefore, go to the previous + // batch. + if let Some(start_token) = self + .batch_idx_to_start_token + .get((room_id.as_str(), batch_idx).encode())? + .map(|t| decode_key_value(&t, 0).unwrap()) + { + if let Some((next_batch_idx, next_position)) = self + .end_token_to_batch_idx_position + .get((room_id.as_str(), start_token.as_str()).encode())? + .map(|t| decode_batch_idx_position(&t).unwrap()) + { + // We don't have any other batch with this token + if batch_idx == next_batch_idx { + token = Some(start_token); + break; + } + + batch_idx = next_batch_idx; + position = next_position - 1; + continue; + } + + token = Some(start_token); + } + break; + }, + Direction::Backward => loop { + token = None; + let store_key = self.store_key.clone(); + + let current_limit_position = + limit.map(|limit| position + (limit - events.len())); + let current_limit = if let Some((_, end_position)) = + ending.filter(|(end_batch_idx, _)| end_batch_idx == &batch_idx) + { + let end_position = end_position + 1; + if let Some(current_limit_position) = current_limit_position { + if current_limit_position < end_position { + Some(current_limit_position) + } else { + Some(end_position) + } + } else { + Some(end_position) + } + } else { + current_limit_position + }; + + let range = if let Some(current_limit) = current_limit { + self.timeline_events.range( + (room_id.as_str(), batch_idx, position).encode() + ..(room_id.as_str(), batch_idx, current_limit).encode(), + ) + } else { + self.timeline_events.range( + (room_id.as_str(), batch_idx, position).encode() + ..(room_id.as_str(), batch_idx + 1).encode(), + ) + }; + + let mut part = range + .filter_map(move |e| { + deserialize_event::(store_key.clone(), &e.ok()?.1).ok() + }) + .collect(); + + events.append(&mut part); + + if let Some(limit) = limit { + if events.len() >= limit { + break; + } + } + + if let Some(end) = end { + if let Some(last) = events.last() { + if &last.event_id() == end { + break; + } + } + } + + // Not enought events where found in this batch, therefore, go to the previous + // batch. + if let Some(end_token) = self + .batch_idx_to_end_token + .get((room_id.as_str(), batch_idx).encode())? + .map(|t| decode_key_value(&t, 0).unwrap()) + { + if let Some((prev_batch_idx, prev_position)) = self + .start_token_to_batch_idx_position + .get((room_id.as_str(), end_token.as_str()).encode())? + .map(|t| decode_batch_idx_position(&t).unwrap()) + { + // We don't have any other batch with this token + if batch_idx == prev_batch_idx { + token = Some(end_token); + break; + } + + batch_idx = prev_batch_idx; + position = prev_position; + continue; + } + + token = Some(end_token); + } + break; + }, + } + + return Ok(Some(StoredTimelineSlice::new(events, token))); + } + + Ok(None) + } } #[async_trait] @@ -893,15 +1331,46 @@ impl StateStore for SledStore { async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { self.remove_media_content_for_uri(uri).await } + + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + self.get_timeline(room_id, start, end, limit, direction).await + } +} + +fn deserialize_event Deserialize<'b>>( + store_key: Arc>, + event: &[u8], +) -> Result { + if let Some(key) = &*store_key { + let encrypted: EncryptedEvent = serde_json::from_slice(event)?; + Ok(key.decrypt(encrypted)?) + } else { + Ok(serde_json::from_slice(event)?) + } } #[cfg(test)] mod test { use std::convert::TryFrom; - use matrix_sdk_test::async_test; + use http::Response; + use matrix_sdk_test::{async_test, test_json}; use ruma::{ - api::client::r0::media::get_content_thumbnail::Method, + api::{ + client::r0::{ + media::get_content_thumbnail::Method, + message::get_message_events::{Direction, Response as MessageResponse}, + sync::sync_events::Response as SyncResponse, + }, + IncomingResponse, + }, events::{ room::{ member::{MemberEventContent, MembershipState}, @@ -918,7 +1387,7 @@ mod test { use super::{SledStore, StateChanges}; use crate::{ - deserialized_responses::MemberEvent, + deserialized_responses::{MemberEvent, SyncRoomEvent, TimelineSlice}, media::{MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, StateStore, }; @@ -1132,4 +1601,551 @@ mod test { assert!(store.get_media_content(&request_file).await.unwrap().is_none()); assert!(store.get_media_content(&request_thumbnail).await.unwrap().is_none()); } + + #[async_test] + async fn test_timeline() { + let store = SledStore::open().unwrap(); + let mut stored_events = Vec::new(); + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .is_none()); + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .is_none()); + + // Add a sync response + let sync = SyncResponse::try_from_http_response( + Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).unwrap()).unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[&room_id].timeline; + let events: Vec = + timeline.events.iter().rev().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, sync.next_batch.clone(), timeline.prev_batch.clone()); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .is_none()); + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .is_none()); + + // Add the next batch token + let mut changes = StateChanges::default(); + changes.sync_token = Some(sync.next_batch.clone()); + store.save_changes(&changes).await.unwrap(); + + let forward_events = store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .unwrap(); + assert_eq!(forward_events.events.len(), 0); + assert_eq!(forward_events.token, Some(sync.next_batch.clone())); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(backward_events.events.len(), 6); + assert_eq!(backward_events.token, timeline.prev_batch.clone()); + + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Add a message batch before the sync response + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Add the same message batch again + let timeline_slice = + TimelineSlice::new(events, messages.start.clone().unwrap(), messages.end.clone()); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(backward_events.events.len(), 9); + assert_eq!(backward_events.token, messages.end.clone()); + + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Add a batch after the previous batch + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, messages.start.clone().unwrap(), messages.end.clone()); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(backward_events.events.len(), 12); + assert_eq!(backward_events.token, messages.end.clone()); + + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + let first_block_end = messages.end.clone(); + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::GAPPED_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let gapped_events: Vec = + messages.chunk.iter().cloned().map(Into::into).collect(); + + // Add a detached batch to create a gap in the known timeline + let timeline_slice = TimelineSlice::new( + gapped_events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + let gapped_block_end = messages.end.clone(); + assert_eq!(backward_events.events.len(), 12); + assert_eq!(backward_events.token, first_block_end); + + // Fill the gap that was created before + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::GAPPED_ROOM_MESSAGES_FILLER).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + stored_events.append(&mut gapped_events.clone()); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // The most recent event + let first_event = event_id!("$098237280074GZeOm:localhost"); + let backward_events = store + .get_timeline(&room_id, Some(&first_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + let last_event = event_id!("$1444812213350496Ccccr:example.com"); + + // Read from the last known event forward + let forward_events = store + .get_timeline(&room_id, Some(&last_event), None, None, Direction::Forward) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), stored_events.len()); + assert_eq!(forward_events.token, Some(sync.next_batch.clone())); + assert!(!forward_events + .events + .iter() + .rev() + .zip(backward_events.events) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Read from the last known event backwards + let events = store + .get_timeline(&room_id, Some(&last_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(events.events.len(), 1); + assert_eq!(events.events.first().unwrap().event_id(), last_event); + assert_eq!(events.token, gapped_block_end); + + let events = store + .get_timeline(&room_id, Some(&last_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(events.events.len(), 1); + assert_eq!(events.events.first().unwrap().event_id(), last_event); + assert_eq!(events.token, gapped_block_end); + + let end_event = event_id!("$1444812213350496Caaar:example.com"); + + // Get a slice of the timeline + let backward_events = store + .get_timeline(&room_id, Some(&first_event), Some(&end_event), None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + let expected_events = &stored_events[..stored_events.len() - 2]; + + assert_eq!(backward_events.events.len(), expected_events.len()); + assert_eq!(backward_events.token, None); + assert!(!backward_events + .events + .iter() + .zip(expected_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + let forward_events = store + .get_timeline(&room_id, Some(&end_event), Some(&first_event), None, Direction::Forward) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), expected_events.len()); + assert_eq!(forward_events.token, None); + + assert!(!forward_events + .events + .iter() + .rev() + .zip(expected_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Get a slice of the timeline where the end isn't known + let unknown_end_event = event_id!("$XXXXXXXXX:example.com"); + + let backward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&unknown_end_event), + None, + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(!backward_events + .events + .iter() + .zip(expected_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + let forward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&unknown_end_event), + None, + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), 1); + assert_eq!(forward_events.events.first().unwrap().event_id(), first_event); + assert_eq!(forward_events.token, Some(sync.next_batch.clone())); + + // Get a slice of the timeline with limit with more then the number of events + // between start and end + let limit = 4; + let backward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&end_event), + Some(limit), + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + let expected_events = &stored_events[..limit]; + assert_eq!(backward_events.events.len(), limit); + assert_eq!(backward_events.token, None); + assert!(!backward_events + .events + .iter() + .zip(expected_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + let forward_events = store + .get_timeline( + &room_id, + Some(&last_event), + Some(&first_event), + Some(limit), + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), limit); + assert_eq!(forward_events.token, None); + + let expected_events = &stored_events[stored_events.len() - limit..]; + assert!(!forward_events + .events + .iter() + .rev() + .zip(expected_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Get a slice of the timeline with limit with less then limit events between + // start and end + let limit = 30; + let backward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&last_event), + Some(limit), + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, None); + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + let forward_events = store + .get_timeline( + &room_id, + Some(&last_event), + Some(&first_event), + Some(limit), + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), stored_events.len()); + assert_eq!(forward_events.token, None); + + assert!(!forward_events + .events + .iter() + .rev() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Add a second sync response + let sync = SyncResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::MORE_SYNC_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[&room_id].timeline; + let events: Vec = + timeline.events.iter().rev().cloned().map(Into::into).collect(); + + for event in events.clone().into_iter().rev() { + stored_events.insert(0, event); + } + + let timeline_slice = + TimelineSlice::new(events, sync.next_batch.clone(), timeline.prev_batch.clone()); + let mut changes = StateChanges::default(); + changes.sync_token = Some(sync.next_batch.clone()); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Add an overlapping timeline slice to the store + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::OVERLAPPING_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.push(events.last().cloned().unwrap()); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + let end_token = messages.end; + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, end_token); + assert!(!backward_events + .events + .iter() + .zip(stored_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + + // Add an overlapping patch to the start of everything + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::OVERLAPPING_ROOM_MESSAGES_BATCH_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + let mut expected_events = events.clone(); + expected_events.extend_from_slice(&stored_events[1..]); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let start_event = event_id!("$1444812213350496Cbbbr3:example.com"); + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, Some(&start_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), expected_events.len()); + assert_eq!(backward_events.token, end_token); + assert!(!backward_events + .events + .iter() + .zip(expected_events.iter()) + .any(|(a, b)| a.event_id() != b.event_id())); + } } diff --git a/matrix_sdk_common/src/deserialized_responses.rs b/matrix_sdk_common/src/deserialized_responses.rs index 1a8c28af07e..5a8121e099c 100644 --- a/matrix_sdk_common/src/deserialized_responses.rs +++ b/matrix_sdk_common/src/deserialized_responses.rs @@ -9,8 +9,8 @@ use ruma::{ }, }, events::{ - room::member::MemberEventContent, AnySyncRoomEvent, StateEvent, StrippedStateEvent, - SyncStateEvent, Unsigned, + room::member::MemberEventContent, AnyRoomEvent, AnySyncRoomEvent, StateEvent, + StrippedStateEvent, SyncStateEvent, Unsigned, }, identifiers::{DeviceKeyAlgorithm, EventId, RoomId, UserId}, serde::Raw, @@ -96,12 +96,26 @@ pub struct SyncRoomEvent { pub encryption_info: Option, } +impl SyncRoomEvent { + /// Get the event id of this `SyncRoomEvent` + pub fn event_id(&self) -> EventId { + self.event.get_field::("event_id").unwrap().unwrap() + } +} + impl From> for SyncRoomEvent { fn from(inner: Raw) -> Self { Self { encryption_info: None, event: inner } } } +impl From> for SyncRoomEvent { + fn from(inner: Raw) -> Self { + // FIXME: we should strip the room id from `Raw` + Self { encryption_info: None, event: Raw::from_json(Raw::into_json(inner)) } + } +} + #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct SyncResponse { /// The batch token to supply in the `since` param of the next `/sync` @@ -235,6 +249,26 @@ impl Timeline { } } +/// A slice of the timeline in the room. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct TimelineSlice { + /// The `next_batch` or `from` token used to obtain this slice + pub start: String, + + /// The `prev_batch` or `to` token used to obtain this slice + /// If `None` this `TimelineSlice` is the begining of the room + pub end: Option, + + /// A list of events. + pub events: Vec, +} + +impl TimelineSlice { + pub fn new(events: Vec, start: String, end: Option) -> Self { + Self { start, end, events } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde( try_from = "SyncStateEvent", diff --git a/matrix_sdk_test/src/test_json/events.rs b/matrix_sdk_test/src/test_json/events.rs index b0cde0ddf9f..e1bccbe86e2 100644 --- a/matrix_sdk_test/src/test_json/events.rs +++ b/matrix_sdk_test/src/test_json/events.rs @@ -143,6 +143,324 @@ lazy_static! { }); } +lazy_static! { + pub static ref SYNC_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaaf:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbf:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccf:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_2269", + "start": "t392-516_47314_0_7_1_1_1_11444_1" + }); +} + +lazy_static! { + pub static ref SYNC_ROOM_MESSAGES_BATCH_2: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaak:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbk:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Cccck:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_2270", + "start": "t47409-4357353_219380_26003_2269" + }); +} + +lazy_static! { + pub static ref OVERLAPPING_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaar:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbr:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccr:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccnewmessage:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_3300", + "start": "t47409-4357353_219380_26003_3310" + }); +} + +lazy_static! { + pub static ref OVERLAPPING_ROOM_MESSAGES_BATCH_2: JsonValue = json!({ + "chunk": [ + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbr3:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccr3:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq3:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccnewmessage3:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + }, + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$098237280074GZeOm2:localhost", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + ], + "end": "t47409-4357353_219380_26003_3320", + "start": "t47409-4357353_219380_26003_3330" + }); +} + +lazy_static! { + pub static ref GAPPED_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaar:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbr:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccr:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_3010", + "start": "t47409-4357353_219380_26003_3000" + }); +} + +lazy_static! { + pub static ref GAPPED_ROOM_MESSAGES_FILLER: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaat:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbt:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Cccct:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_3000", + "start": "t47409-4357353_219380_26003_2270" + }); +} + lazy_static! { pub static ref KEYS_QUERY: JsonValue = json!({ "device_keys": { @@ -629,3 +947,181 @@ lazy_static! { "type": "m.typing" }); } + +lazy_static! { + pub static ref CONTEXT_MESSAGE: JsonValue = json!({ + "end": "t29-57_2_0_2", + "events_after": [ + { + "content": { + "body": "This is an example text message", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnafter1:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "This is an example text message", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnafter2:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "This is an example text message", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnafter3:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + } + ], + "event": { + "content": { + "body": "filename.jpg", + "info": { + "h": 398, + "w": 394, + "mimetype": "image/jpeg", + "size": 31037 + }, + "url": "mxc://example.org/JWEIFJgwEIhweiWJE", + "msgtype": "m.image" + }, + "type": "m.room.message", + "event_id": "$f3h4d129462ha:example.com", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + "events_before": [ + { + "content": { + "body": "something-important.doc", + "filename": "something-important.doc", + "info": { + "mimetype": "application/msword", + "size": 46144 + }, + "msgtype": "m.file", + "url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnbefore1:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "something-important.doc", + "filename": "something-important.doc", + "info": { + "mimetype": "application/msword", + "size": 46144 + }, + "msgtype": "m.file", + "url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnbefore2:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "something-important.doc", + "filename": "something-important.doc", + "info": { + "mimetype": "application/msword", + "size": 46144 + }, + "msgtype": "m.file", + "url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnbefore3:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + } + ], + "start": "t27-54_2_0_2", + "state": [ + { + "content": { + "creator": "@example:example.org", + "room_version": "1", + "m.federate": true, + "predecessor": { + "event_id": "$something:example.org", + "room_id": "!oldroom:example.org" + } + }, + "type": "m.room.create", + "event_id": "$143273582443PhrSn2:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + }, + "state_key": "" + }, + { + "content": { + "membership": "join", + "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF", + "displayname": "Alice Margatroid" + }, + "type": "m.room.member", + "event_id": "$143273582443PhrSn:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + }, + "state_key": "@alice:example.org" + } + ] + }); +} diff --git a/matrix_sdk_test/src/test_json/mod.rs b/matrix_sdk_test/src/test_json/mod.rs index c0ab33e3436..8645c2f175d 100644 --- a/matrix_sdk_test/src/test_json/mod.rs +++ b/matrix_sdk_test/src/test_json/mod.rs @@ -12,14 +12,18 @@ pub mod members; pub mod sync; pub use events::{ - ALIAS, ALIASES, EVENT_ID, KEYS_QUERY, KEYS_UPLOAD, LOGIN, LOGIN_RESPONSE_ERR, LOGIN_TYPES, - LOGOUT, MEMBER, MEMBER_NAME_CHANGE, MESSAGE_EDIT, MESSAGE_TEXT, NAME, POWER_LEVELS, PRESENCE, + ALIAS, ALIASES, CONTEXT_MESSAGE, EVENT_ID, GAPPED_ROOM_MESSAGES_BATCH_1, + GAPPED_ROOM_MESSAGES_FILLER, KEYS_QUERY, KEYS_UPLOAD, LOGIN, LOGIN_RESPONSE_ERR, LOGIN_TYPES, + LOGOUT, MEMBER, MEMBER_NAME_CHANGE, MESSAGE_EDIT, MESSAGE_TEXT, NAME, + OVERLAPPING_ROOM_MESSAGES_BATCH_1, OVERLAPPING_ROOM_MESSAGES_BATCH_2, POWER_LEVELS, PRESENCE, PUBLIC_ROOMS, REACTION, REDACTED, REDACTED_INVALID, REDACTED_STATE, REDACTION, - REGISTRATION_RESPONSE_ERR, ROOM_ID, ROOM_MESSAGES, TYPING, + REGISTRATION_RESPONSE_ERR, ROOM_ID, ROOM_MESSAGES, SYNC_ROOM_MESSAGES_BATCH_1, + SYNC_ROOM_MESSAGES_BATCH_2, TYPING, }; pub use members::MEMBERS; pub use sync::{ - DEFAULT_SYNC_SUMMARY, INVITE_SYNC, LEAVE_SYNC, LEAVE_SYNC_EVENT, MORE_SYNC, SYNC, VOIP_SYNC, + DEFAULT_SYNC_SUMMARY, INVITE_SYNC, LEAVE_SYNC, LEAVE_SYNC_EVENT, MORE_SYNC, MORE_SYNC_2, SYNC, + VOIP_SYNC, }; lazy_static! { diff --git a/matrix_sdk_test/src/test_json/sync.rs b/matrix_sdk_test/src/test_json/sync.rs index cb1dc57c81b..dcdaaf9c3bd 100644 --- a/matrix_sdk_test/src/test_json/sync.rs +++ b/matrix_sdk_test/src/test_json/sync.rs @@ -717,6 +717,155 @@ lazy_static! { }); } +lazy_static! { + pub static ref MORE_SYNC_2: JsonValue = json!({ + "device_one_time_keys_count": {}, + "next_batch": "s526_47314_0_7_1_1_1_11444_3", + "device_lists": { + "changed": [ + "@example:example.org" + ], + "left": [] + }, + "rooms": { + "invite": {}, + "join": { + "!SVkFJHzfwvuaIEawgC:localhost": { + "summary": {}, + "account_data": { + "events": [] + }, + "ephemeral": { + "events": [] + }, + "state": { + "events": [] + }, + "timeline": { + "events": [ + { + "content": { + "body": "baba", + "format": "org.matrix.custom.html", + "formatted_body": "baba", + "msgtype": "m.text" + }, + "event_id": "$152037280074GZeOm2:localhost", + "origin_server_ts": 152037280, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { + "age": 598971425 + } + }, + { + "content": { + "body": " * edited message", + "m.new_content": { + "body": "edited message", + "msgtype": "m.text" + }, + "m.relates_to": { + "event_id": "$someeventid:localhost", + "rel_type": "m.replace" + }, + "msgtype": "m.text" + }, + "event_id": "$editevid2:localhost", + "origin_server_ts": 159026265, + "sender": "@alice:matrix.org", + "type": "m.room.message", + "unsigned": { + "age": 85 + } + }, + { + "content": { + "reason": "😀" + }, + "event_id": "$151957878228ssqrJ2:localhost", + "origin_server_ts": 151957878, + "sender": "@example:localhost", + "type": "m.room.redaction", + "redacts": "$151957878228ssqrj:localhost", + "unsigned": { + "age": 85 + } + }, + { + "content": {}, + "event_id": "$15275046980maRLj2:localhost", + "origin_server_ts": 152750469, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { + "age": 19334, + "redacted_because": { + "content": {}, + "event_id": "$15275047031IXQRi:localhost", + "origin_server_ts": 152750470, + "redacts": "$15275046980maRLj:localhost", + "sender": "@example:localhost", + "type": "m.room.redaction", + "unsigned": { + "age": 14523 + } + }, + "redacted_by": "$15275047031IXQRi:localhost" + } + }, + { + "content": { + "m.relates_to": { + "event_id": "$15275047031IXQRi:localhost", + "key": "👍", + "rel_type": "m.annotation" + } + }, + "event_id": "$15275047031IXQRi2:localhost", + "origin_server_ts": 159027581, + "sender": "@alice:matrix.org", + "type": "m.reaction", + "unsigned": { + "age": 85 + } + }, + { + "content": { + "body": "This is a notice", + "format": "org.matrix.custom.html", + "formatted_body": "This is a notice", + "msgtype": "m.notice" + }, + "event_id": "$098237280074GZeOm2:localhost", + "origin_server_ts": 162037280, + "sender": "@bot:localhost", + "type": "m.room.message", + "unsigned": { + "age": 25 + } + }, + ], + "limited": true, + "prev_batch": "s526_47314_0_7_1_1_1_11444_2" + }, + "unread_notifications": { + "highlight_count": 0, + "notification_count": 11 + } + } + }, + "leave": {} + }, + "to_device": { + "events": [] + }, + "presence": { + "events": [] + } + }); +} + lazy_static! { pub static ref INVITE_SYNC: JsonValue = json!({ "device_one_time_keys_count": {},