diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 4b2d1babc9f..8105d50ce94 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -27,7 +27,7 @@ use std::{ use matrix_sdk_common::{ deserialized_responses::{ AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms, - StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, + StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice, }, instant::Instant, locks::RwLock, @@ -50,7 +50,11 @@ use ruma::{ DeviceId, }; use ruma::{ - api::client::r0::{self as api, push::get_notifications::Notification}, + api::client::r0::{ + self as api, + message::get_message_events::{Direction, Response as GetMessageEventsResponse}, + push::get_notifications::Notification, + }, events::{ room::member::MembershipState, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, @@ -773,6 +777,15 @@ impl BaseClient { let notification_count = new_info.unread_notifications.into(); room_info.update_notification_count(notification_count); + changes.add_timeline( + &room_id, + TimelineSlice::new( + timeline.events.iter().cloned().rev().collect(), + next_batch.clone(), + timeline.prev_batch.clone(), + ), + ); + new_rooms.join.insert( room_id, JoinedRoom::new( @@ -816,6 +829,14 @@ impl BaseClient { self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes) .await; + changes.add_timeline( + &room_id, + TimelineSlice::new( + timeline.events.iter().cloned().rev().collect(), + next_batch.clone(), + timeline.prev_batch.clone(), + ), + ); changes.add_room(room_info); new_rooms .leave @@ -892,6 +913,59 @@ impl BaseClient { } } + /// Receive a successful /messages response. + /// + /// * `response` - The successful response from /messages. + pub async fn receive_messages( + &self, + room_id: &RoomId, + direction: &Direction, + response: &GetMessageEventsResponse, + ) -> Result> { + let mut changes = StateChanges::default(); + + let mut events: Vec = vec![]; + for event in &response.chunk { + #[allow(unused_mut)] + let mut event: SyncRoomEvent = event.clone().into(); + + #[cfg(feature = "encryption")] + match event.event.deserialize() { + Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted(encrypted))) => { + if let Some(olm) = self.olm_machine().await { + if let Ok(decrypted) = olm.decrypt_room_event(&encrypted, room_id).await { + event = decrypted.into(); + } + } + } + Ok(_) => {} + Err(error) => { + warn!("Error deserializing event {:?}", error); + } + } + + events.push(event); + } + + let (chunk, start, end) = match direction { + Direction::Backward => { + (events.clone(), response.start.clone().unwrap(), response.end.clone()) + } + Direction::Forward => ( + events.iter().rev().cloned().collect(), + response.end.clone().unwrap(), + response.start.clone(), + ), + }; + + let timeline = TimelineSlice::new(chunk, start, end); + changes.add_timeline(room_id, timeline); + + self.store().save_changes(&changes).await?; + + Ok(events) + } + /// Receive a get member events response and convert it to a deserialized /// `MembersResponse` /// diff --git a/crates/matrix-sdk-base/src/lib.rs b/crates/matrix-sdk-base/src/lib.rs index 79f9497d445..d72cd128c7c 100644 --- a/crates/matrix-sdk-base/src/lib.rs +++ b/crates/matrix-sdk-base/src/lib.rs @@ -43,4 +43,4 @@ pub use client::{BaseClient, BaseClientConfig}; #[cfg(feature = "encryption")] pub use matrix_sdk_crypto as crypto; pub use rooms::{Room, RoomInfo, RoomMember, RoomType}; -pub use store::{StateChanges, StateStore, Store, StoreError}; +pub use store::{StateChanges, StateStore, Store, StoreError, StoredTimelineSlice}; diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index eb803acd9bb..8125d869101 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -21,6 +21,7 @@ use dashmap::{DashMap, DashSet}; use lru::LruCache; use matrix_sdk_common::{async_trait, instant::Instant, locks::Mutex}; use ruma::{ + api::client::r0::message::get_message_events::Direction, events::{ presence::PresenceEvent, receipt::Receipt, @@ -34,7 +35,7 @@ use ruma::{ }; use tracing::info; -use super::{Result, RoomInfo, StateChanges, StateStore}; +use super::{Result, RoomInfo, StateChanges, StateStore, StoredTimelineSlice}; use crate::{ deserialized_responses::{MemberEvent, StrippedMemberEvent}, media::{MediaRequest, UniqueKey}, @@ -272,6 +273,8 @@ impl MemoryStore { } } + // TODO: implement writing timeline to the store. + info!("Saved changes in {:?}", now.elapsed()); Ok(()) @@ -438,6 +441,23 @@ impl MemoryStore { Ok(()) } + + async fn get_timeline( + &self, + _room_id: &RoomId, + _start: Option<&EventId>, + _end: Option<&EventId>, + _limit: Option, + _direction: Direction, + ) -> Result> { + // TODO: implement reading from the store. + Ok(None) + } + + async fn remove_timeline(&self, _room_id: Option<&RoomId>) -> Result<()> { + // TODO: implement once writing the timeline to the store is implemented. + Ok(()) + } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -584,6 +604,21 @@ impl StateStore for MemoryStore { async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { self.remove_media_content_for_uri(uri).await } + + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + self.get_timeline(room_id, start, end, limit, direction).await + } + + async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()> { + self.remove_timeline(room_id).await + } } #[cfg(test)] diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 83b423439fc..1dec35cc0b0 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -23,7 +23,9 @@ use std::{ use dashmap::DashMap; use matrix_sdk_common::{async_trait, locks::RwLock, AsyncTraitDeps}; use ruma::{ - api::client::r0::push::get_notifications::Notification, + api::client::r0::{ + message::get_message_events::Direction, push::get_notifications::Notification, + }, events::{ presence::PresenceEvent, receipt::{Receipt, ReceiptEventContent}, @@ -39,7 +41,7 @@ use ruma::{ use sled::Db; use crate::{ - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent, TimelineSlice}, media::MediaRequest, rooms::{RoomInfo, RoomType}, Room, Session, @@ -313,6 +315,37 @@ pub trait StateStore: AsyncTraitDeps { /// /// * `uri` - The `MxcUri` of the media files. async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()>; + + /// Get a slice of the timeline of a room. + /// + /// # Arguments + /// + /// * `room_id` - The id of the room for which the timeline should be + /// fetched. + /// + /// * `start` - The start point from which events should be returned. + /// + /// * `end` - The end point to which events should be returned. + /// + /// * `limit` - The maximum number of events to return. + /// + /// * `direction` - The direction events should be returned. + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result>; + + /// Remove the stored timeline. + /// + /// # Arguments + /// + /// * `room_id` - The id of the room for which the timeline should be + /// removed. If `None` the timeline for every stored room is removed. + async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()>; } /// A state store wrapper for the SDK. @@ -486,6 +519,8 @@ pub struct StateChanges { pub ambiguity_maps: BTreeMap, BTreeMap>>>, /// A map of `RoomId` to a vector of `Notification`s pub notifications: BTreeMap, Vec>, + /// A mapping of `RoomId` to a `TimelineSlice` + pub timeline: BTreeMap, TimelineSlice>, } impl StateChanges { @@ -570,4 +605,28 @@ impl StateChanges { pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) { self.receipts.insert(room_id.to_owned(), event); } + + /// Update the `StateChanges` struct with the given room with a new + /// `TimelineSlice`. + pub fn add_timeline(&mut self, room_id: &RoomId, timeline: TimelineSlice) { + self.timeline.insert(room_id.to_owned(), timeline); + } +} + +/// A slice of the timeline obtained from the store. +#[derive(Debug, Default)] +pub struct StoredTimelineSlice { + /// A start token to fetch more events if the requested slice isn't fully + /// known. + pub token: Option, + + /// The requested events + pub events: Vec, +} + +#[cfg(feature = "sled_state_store")] +impl StoredTimelineSlice { + pub(crate) fn new(events: Vec, token: Option) -> Self { + Self { token, events } + } } diff --git a/crates/matrix-sdk-base/src/store/sled_store/mod.rs b/crates/matrix-sdk-base/src/store/sled_store/mod.rs index 97be63b3a0c..11dc73e4a1a 100644 --- a/crates/matrix-sdk-base/src/store/sled_store/mod.rs +++ b/crates/matrix-sdk-base/src/store/sled_store/mod.rs @@ -17,6 +17,7 @@ mod store_key; use std::{ collections::BTreeSet, convert::{TryFrom, TryInto}, + mem::size_of, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -26,11 +27,13 @@ use futures_core::stream::Stream; use futures_util::stream::{self, TryStreamExt}; use matrix_sdk_common::async_trait; use ruma::{ + api::client::r0::message::get_message_events::Direction, events::{ presence::PresenceEvent, receipt::Receipt, room::member::{MembershipState, RoomMemberEventContent}, - AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType, + AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncMessageEvent, AnySyncRoomEvent, + AnySyncStateEvent, EventType, Redact, }, receipt::ReceiptType, serde::Raw, @@ -42,12 +45,12 @@ use sled::{ Config, Db, Transactional, Tree, }; use tokio::task::spawn_blocking; -use tracing::info; +use tracing::{info, warn}; use self::store_key::{EncryptedEvent, StoreKey}; -use super::{Result, RoomInfo, StateChanges, StateStore, StoreError}; +use super::{Result, RoomInfo, StateChanges, StateStore, StoreError, StoredTimelineSlice}; use crate::{ - deserialized_responses::MemberEvent, + deserialized_responses::{MemberEvent, SyncRoomEvent}, media::{MediaRequest, UniqueKey}, }; @@ -117,6 +120,32 @@ impl EncodeKey for (&str, &str) { } } +impl EncodeKey for (&RoomId, BatchIdx) { + fn encode(&self) -> Vec { + [ + self.0.as_bytes(), + &[ENCODE_SEPARATOR], + &self.1.to_be_bytes().to_vec(), + &[ENCODE_SEPARATOR], + ] + .concat() + } +} + +impl EncodeKey for (&str, BatchIdx, usize) { + fn encode(&self) -> Vec { + [ + self.0.as_bytes(), + &[ENCODE_SEPARATOR], + &self.1.to_be_bytes().to_vec(), + &[ENCODE_SEPARATOR], + &self.2.to_be_bytes().to_vec(), + &[ENCODE_SEPARATOR], + ] + .concat() + } +} + impl EncodeKey for (&str, &str, &str) { fn encode(&self) -> Vec { [ @@ -166,6 +195,67 @@ pub fn decode_key_value(key: &[u8], position: usize) -> Option { values.get(position).map(|s| String::from_utf8_lossy(s).to_string()) } +#[derive(Clone, Copy, Default, PartialEq)] +struct BatchIdx(usize); + +impl From for BatchIdx { + fn from(item: sled::IVec) -> Self { + Self(usize::from_be_bytes( + item.as_ref().try_into().expect("The batch index wasn't properly encoded"), + )) + } +} + +impl BatchIdx { + fn next(self) -> Self { + Self(self.0 + 1) + } + + fn to_be_bytes(self) -> [u8; size_of::()] { + self.0.to_be_bytes() + } +} + +impl From for sled::IVec { + fn from(item: BatchIdx) -> Self { + item.to_be_bytes()[..].into() + } +} + +struct ExpandableBatch { + start_token_changed: bool, + end_token_changed: bool, + batch_idx: BatchIdx, + current_position: usize, +} + +#[derive(Clone, Copy)] +struct EventPosition { + position: usize, + batch_idx: BatchIdx, +} + +impl From for EventPosition { + fn from(item: sled::IVec) -> Self { + let (first, second) = item.split_at(size_of::()); + + let batch_idx = BatchIdx(usize::from_be_bytes( + first.try_into().expect("The event position wasn't properly encoded"), + )); + let position = usize::from_be_bytes( + second.try_into().expect("The event position wasn't properly encoded"), + ); + + Self { batch_idx, position } + } +} + +impl From for sled::IVec { + fn from(item: EventPosition) -> Self { + [item.batch_idx.0.to_be_bytes(), item.position.to_be_bytes()].concat().into() + } +} + #[derive(Clone)] pub struct SledStore { path: Option, @@ -189,6 +279,16 @@ pub struct SledStore { room_event_receipts: Tree, media: Tree, custom: Tree, + // Map an EventId to the batch index and position in the batch + event_id_to_position: Tree, + // List of batches and the events + timeline_events: Tree, + batch_idx_to_start_token: Tree, + batch_idx_to_end_token: Tree, + start_token_to_batch_idx_position: Tree, + end_token_to_batch_idx_position: Tree, + // Keep track of the highest batch index + highest_batch_idx: Tree, } impl std::fmt::Debug for SledStore { @@ -228,6 +328,15 @@ impl SledStore { let custom = db.open_tree("custom")?; + let event_id_to_position = db.open_tree("event_id_to_position")?; + let timeline_events = db.open_tree("events")?; + let batch_idx_to_start_token = db.open_tree("batch_idx_to_start_token")?; + let batch_idx_to_end_token = db.open_tree("batch_idx_to_end_token")?; + let start_token_to_batch_idx_position = + db.open_tree("start_token_to_batch_idx_position")?; + let end_token_to_batch_idx_position = db.open_tree("end_token_to_batch_idx_position")?; + let highest_batch_idx = db.open_tree("highest_batch_idx")?; + Ok(Self { path, inner: db, @@ -250,6 +359,13 @@ impl SledStore { room_event_receipts, media, custom, + event_id_to_position, + timeline_events, + batch_idx_to_start_token, + batch_idx_to_end_token, + start_token_to_batch_idx_position, + end_token_to_batch_idx_position, + highest_batch_idx, }) } @@ -507,9 +623,31 @@ impl SledStore { ret?; - let ret: Result<(), TransactionError> = - (&self.room_user_receipts, &self.room_event_receipts).transaction( - |(room_user_receipts, room_event_receipts)| { + let ret: Result<(), TransactionError> = ( + &self.room_user_receipts, + &self.room_event_receipts, + &self.event_id_to_position, + &self.timeline_events, + &self.batch_idx_to_start_token, + &self.batch_idx_to_end_token, + &self.start_token_to_batch_idx_position, + &self.end_token_to_batch_idx_position, + &self.highest_batch_idx, + &self.room_info, + ) + .transaction( + |( + room_user_receipts, + room_event_receipts, + event_id_to_position, + timeline_events, + batch_idx_to_start_token, + batch_idx_to_end_token, + start_token_to_batch_idx_position, + end_token_to_batch_idx_position, + highest_batch_idx, + room_info, + )| { for (room, content) in &changes.receipts { for (event_id, receipts) in &content.0 { for (receipt_type, receipts) in receipts { @@ -553,6 +691,218 @@ impl SledStore { } } + for (room, timeline) in &changes.timeline { + let events: Vec<&SyncRoomEvent> = timeline + .events + .iter() + .filter(|event| event.event_id().is_some()) + .collect(); + + let expandable = { + // Handle overlapping slices. + let (first_event, last_event) = + if let (Some(first_event), Some(last_event)) = + (events.first(), events.last()) + { + ( + first_event.event_id().unwrap(), + last_event.event_id().unwrap(), + ) + } else { + continue; + }; + + let found_first = event_id_to_position + .get((room.as_str(), first_event.as_str()).encode())? + .map(EventPosition::from) + .map(|batch| ExpandableBatch { + start_token_changed: false, + end_token_changed: true, + batch_idx: batch.batch_idx, + current_position: batch.position, + }); + + let found_last = event_id_to_position + .get((room.as_str(), last_event.as_str()).encode())? + .map(EventPosition::from) + .map(|batch| { + let current_position = batch.position - events.len(); + + ExpandableBatch { + start_token_changed: true, + end_token_changed: false, + batch_idx: batch.batch_idx, + current_position, + } + }); + + found_first.or(found_last) + }; + + // Lookup the previous or next batch + let expandable = if let Some(expandable) = expandable { + Some(expandable) + } else if let Some(batch) = end_token_to_batch_idx_position + .remove((room.as_str(), timeline.start.as_str()).encode())? + .map(EventPosition::from) + { + Some(ExpandableBatch { + start_token_changed: false, + end_token_changed: true, + batch_idx: batch.batch_idx, + current_position: batch.position, + }) + } else if let Some(end) = &timeline.end { + if let Some(batch) = start_token_to_batch_idx_position + .remove((room.as_str(), end.as_str()).encode())? + .map(EventPosition::from) + { + let current_position = batch.position - events.len(); + Some(ExpandableBatch { + start_token_changed: true, + end_token_changed: false, + batch_idx: batch.batch_idx, + current_position, + }) + } else { + None + } + } else { + None + }; + + let expandable = if let Some(expandable) = expandable { + expandable + } else { + // If no expandable batch was found we add a new batch to the store + let batch_idx = highest_batch_idx + .get(room.as_str().encode())? + .map(BatchIdx::from) + .map_or(BatchIdx::default(), BatchIdx::next); + + highest_batch_idx.insert(room.as_str().encode(), batch_idx)?; + + ExpandableBatch { + start_token_changed: true, + end_token_changed: true, + batch_idx, + current_position: usize::MAX / 2, + } + }; + + // Remove events already known from the store + for event in &events { + if let Some(batch) = event_id_to_position + .remove( + (room.as_str(), event.event_id().unwrap().as_str()).encode(), + )? + .map(EventPosition::from) + { + timeline_events.remove( + (room.as_str(), batch.batch_idx, batch.position).encode(), + )?; + } + } + + for (position, event) in events.iter().enumerate() { + let position = position + expandable.current_position; + + let old_event = timeline_events.insert( + (room.as_str(), expandable.batch_idx, position).encode(), + self.serialize_event(event) + .map_err(ConflictableTransactionError::Abort)?, + )?; + + if old_event.is_none() { + event_id_to_position.insert( + (room.as_str(), event.event_id().unwrap().as_str()).encode(), + EventPosition { batch_idx: expandable.batch_idx, position }, + )?; + } + + // Redact events + if let Ok(AnySyncRoomEvent::Message( + AnySyncMessageEvent::RoomRedaction(redaction), + )) = event.event.deserialize() + { + if let Some(batch) = event_id_to_position + .get((room.as_str(), redaction.redacts.as_str()).encode())? + .map(EventPosition::from) + { + if let Some(full_event) = timeline_events + .get( + (room.as_str(), batch.batch_idx, batch.position) + .encode(), + )? + .and_then(|t| { + self.deserialize_event::(&t).ok() + }) + .and_then(|e| e.event.deserialize().ok()) + { + let room_version = room_info + .get((&**room).encode())? + .map(|r| self.deserialize_event::(&r)) + .transpose() + .map(|i| { + i.and_then(|info| { + info.base_info + .create + .map(|event| event.room_version) + }) + }); + if let Ok(Some(room_version)) = room_version { + let redacted_event: AnySyncRoomEvent = + full_event.redact(redaction, &room_version).into(); + timeline_events.insert( + (room.as_str(), batch.batch_idx, batch.position) + .encode(), + self.serialize_event(&redacted_event) + .map_err(ConflictableTransactionError::Abort)?, + )?; + } else { + warn!( + "Was unable to find the room version for {}", + room + ); + } + } + } + } + } + + if expandable.start_token_changed { + batch_idx_to_start_token.insert( + (room.as_ref(), expandable.batch_idx).encode(), + timeline.start.as_str().encode(), + )?; + + start_token_to_batch_idx_position.insert( + (room.as_str(), timeline.start.as_str()).encode(), + EventPosition { + batch_idx: expandable.batch_idx, + position: expandable.current_position, + }, + )?; + } + + if expandable.end_token_changed { + if let Some(end) = &timeline.end { + batch_idx_to_end_token.insert( + (room.as_ref(), expandable.batch_idx).encode(), + end.as_str().encode(), + )?; + + end_token_to_batch_idx_position.insert( + (room.as_str(), end.as_str()).encode(), + EventPosition { + batch_idx: expandable.batch_idx, + position: expandable.current_position + events.len(), + }, + )?; + } + } + } + Ok(()) }, ); @@ -846,6 +1196,257 @@ impl SledStore { Ok(self.media.apply_batch(batch)?) } + + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + let beginning = if let Some(start) = start { + if let Some(batch) = self + .event_id_to_position + .get((room_id.as_str(), start.as_str()).encode())? + .map(EventPosition::from) + { + batch + } else { + return Ok(None); + } + } else if let Some(token) = self.get_sync_token().await? { + // The timeline beyond the sync token isn't known, so don't bother to check the + // store. + match direction { + Direction::Forward => { + return Ok(Some(StoredTimelineSlice::new(Vec::new(), Some(token)))); + } + Direction::Backward => { + if let Some(batch) = self + .start_token_to_batch_idx_position + .get((room_id.as_str(), token.as_str()).encode())? + .map(EventPosition::from) + { + batch + } else { + return Ok(Some(StoredTimelineSlice::new(Vec::new(), Some(token)))); + } + } + } + } else { + return Ok(None); + }; + + let ending = if let Some(end) = end { + self.event_id_to_position + .get((room_id.as_str(), end.as_str()).encode())? + .map(EventPosition::from) + } else { + None + }; + + let (mut batch_idx, mut position) = (beginning.batch_idx, beginning.position); + let mut events: Vec = Vec::new(); + let mut token: Option; + + match direction { + Direction::Forward => loop { + token = None; + + let current_limit_position = + limit.map(|limit| position - (limit - events.len()) + 1); + let current_limit = if let Some(end_position) = ending + .filter(|end_batch| end_batch.batch_idx == batch_idx) + .map(|end_batch| end_batch.position) + { + if let Some(current_limit_position) = current_limit_position { + if current_limit_position > end_position { + Some(current_limit_position) + } else { + Some(end_position) + } + } else { + Some(end_position) + } + } else { + current_limit_position + }; + + let range = if let Some(current_limit) = current_limit { + self.timeline_events.range( + (room_id.as_str(), batch_idx, current_limit).encode() + ..=(room_id.as_str(), batch_idx, position).encode(), + ) + } else { + self.timeline_events.range( + (room_id, batch_idx).encode() + ..=(room_id.as_str(), batch_idx, position).encode(), + ) + }; + + let mut part: Vec = range + .rev() + .filter_map(move |e| self.deserialize_event::(&e.ok()?.1).ok()) + .collect(); + + events.append(&mut part); + + if let Some(limit) = limit { + if events.len() >= limit { + break; + } + } + + if let Some(end) = end { + if let Some(last) = events.last() { + if &last.event_id().unwrap() == end { + break; + } + } + } + + // Not enough events where found in this batch, therefore, go to the previous + // batch. + if let Some(start_token) = self + .batch_idx_to_start_token + .get((room_id, batch_idx).encode())? + .map(|t| decode_key_value(&t, 0).unwrap()) + { + if let Some(next_batch) = self + .end_token_to_batch_idx_position + .get((room_id.as_str(), start_token.as_str()).encode())? + .map(EventPosition::from) + { + // We don't have any other batch with this token + if batch_idx == next_batch.batch_idx { + token = Some(start_token); + break; + } + + batch_idx = next_batch.batch_idx; + position = next_batch.position - 1; + continue; + } + + token = Some(start_token); + } + break; + }, + Direction::Backward => loop { + token = None; + + let current_limit_position = limit.map(|limit| position + (limit - events.len())); + let current_limit = if let Some(end_position) = ending + .filter(|end_batch| end_batch.batch_idx == batch_idx) + .map(|end_batch| end_batch.position) + { + let end_position = end_position + 1; + if let Some(current_limit_position) = current_limit_position { + if current_limit_position < end_position { + Some(current_limit_position) + } else { + Some(end_position) + } + } else { + Some(end_position) + } + } else { + current_limit_position + }; + + let range = if let Some(current_limit) = current_limit { + self.timeline_events.range( + (room_id.as_str(), batch_idx, position).encode() + ..(room_id.as_str(), batch_idx, current_limit).encode(), + ) + } else { + self.timeline_events.range( + (room_id.as_str(), batch_idx, position).encode() + ..(room_id, batch_idx.next()).encode(), + ) + }; + + let mut part = range + .filter_map(move |e| self.deserialize_event::(&e.ok()?.1).ok()) + .collect(); + + events.append(&mut part); + + if let Some(limit) = limit { + if events.len() >= limit { + break; + } + } + + if let Some(end) = end { + if let Some(last) = events.last() { + if &last.event_id().unwrap() == end { + break; + } + } + } + + // Not enough events where found in this batch, therefore, go to the previous + // batch. + if let Some(end_token) = self + .batch_idx_to_end_token + .get((room_id, batch_idx).encode())? + .map(|t| decode_key_value(&t, 0).unwrap()) + { + if let Some(prev_batch) = self + .start_token_to_batch_idx_position + .get((room_id.as_str(), end_token.as_str()).encode())? + .map(EventPosition::from) + { + // We don't have any other batch with this token + if batch_idx == prev_batch.batch_idx { + token = Some(end_token); + break; + } + + batch_idx = prev_batch.batch_idx; + position = prev_batch.position; + continue; + } + + token = Some(end_token); + } + break; + }, + } + + Ok(Some(StoredTimelineSlice::new(events, token))) + } + + async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()> { + let forest = [ + &self.event_id_to_position, + &self.timeline_events, + &self.batch_idx_to_start_token, + &self.batch_idx_to_end_token, + &self.start_token_to_batch_idx_position, + &self.end_token_to_batch_idx_position, + &self.highest_batch_idx, + ]; + + for tree in forest { + let mut batch = sled::Batch::default(); + let keys = if let Some(room_id) = room_id { + tree.scan_prefix(room_id.as_str().encode()).keys() + } else { + tree.iter().keys() + }; + + for key in keys { + batch.remove(key?); + } + + tree.apply_batch(batch)? + } + + Ok(()) + } } #[async_trait] @@ -987,13 +1588,36 @@ impl StateStore for SledStore { async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { self.remove_media_content_for_uri(uri).await } + + async fn get_timeline( + &self, + room_id: &RoomId, + start: Option<&EventId>, + end: Option<&EventId>, + limit: Option, + direction: Direction, + ) -> Result> { + self.get_timeline(room_id, start, end, limit, direction).await + } + + async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()> { + self.remove_timeline(room_id).await + } } #[cfg(test)] mod test { - use matrix_sdk_test::async_test; + use http::Response; + use matrix_sdk_test::{async_test, test_json}; use ruma::{ - api::client::r0::media::get_content_thumbnail::Method, + api::{ + client::r0::{ + media::get_content_thumbnail::Method, + message::get_message_events::{Direction, Response as MessageResponse}, + sync::sync_events::Response as SyncResponse, + }, + IncomingResponse, + }, event_id, events::{ room::{ @@ -1012,7 +1636,7 @@ mod test { use super::{Result, SledStore, StateChanges}; use crate::{ - deserialized_responses::MemberEvent, + deserialized_responses::{MemberEvent, SyncRoomEvent, TimelineSlice}, media::{MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, StateStore, }; @@ -1241,4 +1865,576 @@ mod test { Ok(()) } + + #[async_test] + async fn test_timeline() { + let store = SledStore::open().unwrap(); + let mut stored_events = Vec::new(); + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .is_none()); + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .is_none()); + + // Add a sync response + let sync = SyncResponse::try_from_http_response( + Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).unwrap()).unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[room_id].timeline; + let events: Vec = + timeline.events.iter().rev().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, sync.next_batch.clone(), timeline.prev_batch.clone()); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .is_none()); + assert!(store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .is_none()); + + // Add the next batch token + let changes = + StateChanges { sync_token: Some(sync.next_batch.clone()), ..Default::default() }; + store.save_changes(&changes).await.unwrap(); + + let forward_events = store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .unwrap(); + assert_eq!(forward_events.events.len(), 0); + assert_eq!(forward_events.token, Some(sync.next_batch.clone())); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(backward_events.events.len(), 6); + assert_eq!(backward_events.token, timeline.prev_batch.clone()); + + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Add a message batch before the sync response + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Add the same message batch again + let timeline_slice = + TimelineSlice::new(events, messages.start.clone().unwrap(), messages.end.clone()); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), 9); + assert_eq!(backward_events.token, messages.end.clone()); + + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Add a batch after the previous batch + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, messages.start.clone().unwrap(), messages.end.clone()); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(backward_events.events.len(), 12); + assert_eq!(backward_events.token, messages.end.clone()); + + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + let first_block_end = messages.end.clone(); + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::GAPPED_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let gapped_events: Vec = + messages.chunk.iter().cloned().map(Into::into).collect(); + + // Add a detached batch to create a gap in the known timeline + let timeline_slice = TimelineSlice::new( + gapped_events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + let gapped_block_end = messages.end.clone(); + assert_eq!(backward_events.events.len(), 12); + assert_eq!(backward_events.token, first_block_end); + + // Fill the gap that was created before + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::GAPPED_ROOM_MESSAGES_FILLER).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + stored_events.append(&mut gapped_events.clone()); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // The most recent event + let first_event = event_id!("$098237280074GZeOm:localhost"); + let backward_events = store + .get_timeline(&room_id, Some(&first_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + let last_event = event_id!("$1444812213350496Ccccr:example.com"); + + // Read from the last known event forward + let forward_events = store + .get_timeline(&room_id, Some(&last_event), None, None, Direction::Forward) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), stored_events.len()); + assert_eq!(forward_events.token, Some(sync.next_batch.clone())); + assert!(forward_events + .events + .iter() + .rev() + .zip(backward_events.events) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Read from the last known event backwards + let events = store + .get_timeline(&room_id, Some(&last_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(events.events.len(), 1); + assert_eq!(events.events.first().unwrap().event_id().unwrap(), last_event); + assert_eq!(events.token, gapped_block_end); + + let events = store + .get_timeline(&room_id, Some(&last_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + assert_eq!(events.events.len(), 1); + assert_eq!(events.events.first().unwrap().event_id().unwrap(), last_event); + assert_eq!(events.token, gapped_block_end); + + let end_event = event_id!("$1444812213350496Caaar:example.com"); + + // Get a slice of the timeline + let backward_events = store + .get_timeline(&room_id, Some(&first_event), Some(&end_event), None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + let expected_events = &stored_events[..stored_events.len() - 2]; + + assert_eq!(backward_events.events.len(), expected_events.len()); + assert_eq!(backward_events.token, None); + assert!(backward_events + .events + .iter() + .zip(expected_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + let forward_events = store + .get_timeline(&room_id, Some(&end_event), Some(&first_event), None, Direction::Forward) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), expected_events.len()); + assert_eq!(forward_events.token, None); + + assert!(forward_events + .events + .iter() + .rev() + .zip(expected_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Get a slice of the timeline where the end isn't known + let unknown_end_event = event_id!("$XXXXXXXXX:example.com"); + + let backward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&unknown_end_event), + None, + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(backward_events + .events + .iter() + .zip(expected_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + let forward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&unknown_end_event), + None, + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), 1); + assert_eq!(forward_events.events.first().unwrap().event_id().unwrap(), first_event); + assert_eq!(forward_events.token, Some(sync.next_batch.clone())); + + // Get a slice of the timeline with limit with more then the number of events + // between start and end + let limit = 4; + let backward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&end_event), + Some(limit), + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + let expected_events = &stored_events[..limit]; + assert_eq!(backward_events.events.len(), limit); + assert_eq!(backward_events.token, None); + assert!(backward_events + .events + .iter() + .zip(expected_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + let forward_events = store + .get_timeline( + &room_id, + Some(&last_event), + Some(&first_event), + Some(limit), + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), limit); + assert_eq!(forward_events.token, None); + + let expected_events = &stored_events[stored_events.len() - limit..]; + assert!(forward_events + .events + .iter() + .rev() + .zip(expected_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Get a slice of the timeline with limit with less then limit events between + // start and end + let limit = 30; + let backward_events = store + .get_timeline( + &room_id, + Some(&first_event), + Some(&last_event), + Some(limit), + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, None); + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + let forward_events = store + .get_timeline( + &room_id, + Some(&last_event), + Some(&first_event), + Some(limit), + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(forward_events.events.len(), stored_events.len()); + assert_eq!(forward_events.token, None); + + assert!(forward_events + .events + .iter() + .rev() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Add a second sync response + let sync = SyncResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::MORE_SYNC_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[room_id].timeline; + let events: Vec = + timeline.events.iter().rev().cloned().map(Into::into).collect(); + + for event in events.clone().into_iter().rev() { + stored_events.insert(0, event); + } + + let timeline_slice = + TimelineSlice::new(events, sync.next_batch.clone(), timeline.prev_batch.clone()); + let mut changes = + StateChanges { sync_token: Some(sync.next_batch.clone()), ..Default::default() }; + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, gapped_block_end); + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Add an overlapping timeline slice to the store + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::OVERLAPPING_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + stored_events.push(events.last().cloned().unwrap()); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + let end_token = messages.end; + assert_eq!(backward_events.events.len(), stored_events.len()); + assert_eq!(backward_events.token, end_token); + assert!(backward_events + .events + .iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Add an overlapping patch to the start of everything + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::OVERLAPPING_ROOM_MESSAGES_BATCH_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages.chunk.iter().cloned().map(Into::into).collect(); + + let mut expected_events = events.clone(); + expected_events.extend_from_slice(&stored_events[1..]); + + let timeline_slice = TimelineSlice::new( + events.clone(), + messages.start.clone().unwrap(), + messages.end.clone(), + ); + let mut changes = StateChanges::default(); + changes.add_timeline(&room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + let start_event = event_id!("$1444812213350496Cbbbr3:example.com"); + // Read all of the known timeline from the beginning backwards + let backward_events = store + .get_timeline(&room_id, Some(&start_event), None, None, Direction::Backward) + .await + .unwrap() + .unwrap(); + + assert_eq!(backward_events.events.len(), expected_events.len()); + assert_eq!(backward_events.token, end_token); + assert!(backward_events + .events + .iter() + .zip(expected_events.iter()) + .all(|(a, b)| a.event_id() == b.event_id())); + + // Clear the timeline for a room + assert!(store.remove_timeline(Some(&room_id)).await.is_ok()); + + assert_eq!( + store + .get_timeline(&room_id, None, None, None, Direction::Forward) + .await + .unwrap() + .unwrap() + .token + .unwrap(), + sync.next_batch + ); + assert_eq!( + store + .get_timeline(&room_id, None, None, None, Direction::Backward) + .await + .unwrap() + .unwrap() + .token + .unwrap(), + sync.next_batch + ); + } } diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index b757e7e8799..0f66aea6f9c 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -97,6 +97,13 @@ pub struct SyncRoomEvent { pub encryption_info: Option, } +impl SyncRoomEvent { + /// Get the event id of this `SyncRoomEvent` if the event has any valid id. + pub fn event_id(&self) -> Option> { + self.event.get_field::>("event_id").ok().flatten() + } +} + impl From> for SyncRoomEvent { fn from(inner: Raw) -> Self { Self { encryption_info: None, event: inner } @@ -112,6 +119,12 @@ impl From for SyncRoomEvent { Self { encryption_info: o.encryption_info, event: Raw::from_json(o.event.into_json()) } } } +impl From> for SyncRoomEvent { + fn from(inner: Raw) -> Self { + // FIXME: we should strip the room id from `Raw` + Self { encryption_info: None, event: Raw::from_json(Raw::into_json(inner)) } + } +} #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct SyncResponse { @@ -255,6 +268,26 @@ impl Timeline { } } +/// A slice of the timeline in the room. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct TimelineSlice { + /// The `next_batch` or `from` token used to obtain this slice + pub start: String, + + /// The `prev_batch` or `to` token used to obtain this slice + /// If `None` this `TimelineSlice` is the beginning of the room + pub end: Option, + + /// A list of events. + pub events: Vec, +} + +impl TimelineSlice { + pub fn new(events: Vec, start: String, end: Option) -> Self { + Self { start, end, events } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(try_from = "SyncRoomMemberEvent", into = "SyncRoomMemberEvent")] pub struct MemberEvent { diff --git a/crates/matrix-sdk-test/src/test_json/events.rs b/crates/matrix-sdk-test/src/test_json/events.rs index 11431a03877..a5231fc0b23 100644 --- a/crates/matrix-sdk-test/src/test_json/events.rs +++ b/crates/matrix-sdk-test/src/test_json/events.rs @@ -143,6 +143,324 @@ lazy_static! { }); } +lazy_static! { + pub static ref SYNC_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaaf:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbf:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccf:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_2269", + "start": "t392-516_47314_0_7_1_1_1_11444_1" + }); +} + +lazy_static! { + pub static ref SYNC_ROOM_MESSAGES_BATCH_2: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaak:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbk:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Cccck:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_2270", + "start": "t47409-4357353_219380_26003_2269" + }); +} + +lazy_static! { + pub static ref OVERLAPPING_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaar:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbr:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccr:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccnewmessage:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_3300", + "start": "t47409-4357353_219380_26003_3310" + }); +} + +lazy_static! { + pub static ref OVERLAPPING_ROOM_MESSAGES_BATCH_2: JsonValue = json!({ + "chunk": [ + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbr3:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccr3:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq3:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccnewmessage3:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + }, + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$098237280074GZeOm2:localhost", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + ], + "end": "t47409-4357353_219380_26003_3320", + "start": "t47409-4357353_219380_26003_3330" + }); +} + +lazy_static! { + pub static ref GAPPED_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaar:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbr:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccr:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_3010", + "start": "t47409-4357353_219380_26003_3000" + }); +} + +lazy_static! { + pub static ref GAPPED_ROOM_MESSAGES_FILLER: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaat:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbt:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Cccct:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_3000", + "start": "t47409-4357353_219380_26003_2270" + }); +} + lazy_static! { pub static ref KEYS_QUERY: JsonValue = json!({ "device_keys": { @@ -646,3 +964,181 @@ lazy_static! { "type": "m.typing" }); } + +lazy_static! { + pub static ref CONTEXT_MESSAGE: JsonValue = json!({ + "end": "t29-57_2_0_2", + "events_after": [ + { + "content": { + "body": "This is an example text message", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnafter1:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "This is an example text message", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnafter2:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "This is an example text message", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + "formatted_body": "This is an example text message" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnafter3:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + } + ], + "event": { + "content": { + "body": "filename.jpg", + "info": { + "h": 398, + "w": 394, + "mimetype": "image/jpeg", + "size": 31037 + }, + "url": "mxc://example.org/JWEIFJgwEIhweiWJE", + "msgtype": "m.image" + }, + "type": "m.room.message", + "event_id": "$f3h4d129462ha:example.com", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + "events_before": [ + { + "content": { + "body": "something-important.doc", + "filename": "something-important.doc", + "info": { + "mimetype": "application/msword", + "size": 46144 + }, + "msgtype": "m.file", + "url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnbefore1:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "something-important.doc", + "filename": "something-important.doc", + "info": { + "mimetype": "application/msword", + "size": 46144 + }, + "msgtype": "m.file", + "url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnbefore2:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + }, + { + "content": { + "body": "something-important.doc", + "filename": "something-important.doc", + "info": { + "mimetype": "application/msword", + "size": 46144 + }, + "msgtype": "m.file", + "url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe" + }, + "type": "m.room.message", + "event_id": "$143273582443PhrSnbefore3:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + } + } + ], + "start": "t27-54_2_0_2", + "state": [ + { + "content": { + "creator": "@example:example.org", + "room_version": "1", + "m.federate": true, + "predecessor": { + "event_id": "$something:example.org", + "room_id": "!oldroom:example.org" + } + }, + "type": "m.room.create", + "event_id": "$143273582443PhrSn2:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + }, + "state_key": "" + }, + { + "content": { + "membership": "join", + "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF", + "displayname": "Alice Margatroid" + }, + "type": "m.room.member", + "event_id": "$143273582443PhrSn:example.org", + "room_id": "!636q39766251:example.com", + "sender": "@example:example.org", + "origin_server_ts": 143273582, + "unsigned": { + "age": 1234 + }, + "state_key": "@alice:example.org" + } + ] + }); +} diff --git a/crates/matrix-sdk-test/src/test_json/mod.rs b/crates/matrix-sdk-test/src/test_json/mod.rs index ad5ecab02d2..6f2da0ed676 100644 --- a/crates/matrix-sdk-test/src/test_json/mod.rs +++ b/crates/matrix-sdk-test/src/test_json/mod.rs @@ -12,14 +12,18 @@ pub mod members; pub mod sync; pub use events::{ - ALIAS, ALIASES, EVENT_ID, KEYS_QUERY, KEYS_UPLOAD, LOGIN, LOGIN_RESPONSE_ERR, LOGIN_TYPES, + ALIAS, ALIASES, CONTEXT_MESSAGE, EVENT_ID, GAPPED_ROOM_MESSAGES_BATCH_1, + GAPPED_ROOM_MESSAGES_FILLER, KEYS_QUERY, KEYS_UPLOAD, LOGIN, LOGIN_RESPONSE_ERR, LOGIN_TYPES, LOGIN_WITH_DISCOVERY, LOGOUT, MEMBER, MEMBER_NAME_CHANGE, MESSAGE_EDIT, MESSAGE_TEXT, NAME, - POWER_LEVELS, PRESENCE, PUBLIC_ROOMS, REACTION, REDACTED, REDACTED_INVALID, REDACTED_STATE, - REDACTION, REGISTRATION_RESPONSE_ERR, ROOM_ID, ROOM_MESSAGES, TYPING, + OVERLAPPING_ROOM_MESSAGES_BATCH_1, OVERLAPPING_ROOM_MESSAGES_BATCH_2, POWER_LEVELS, PRESENCE, + PUBLIC_ROOMS, REACTION, REDACTED, REDACTED_INVALID, REDACTED_STATE, REDACTION, + REGISTRATION_RESPONSE_ERR, ROOM_ID, ROOM_MESSAGES, SYNC_ROOM_MESSAGES_BATCH_1, + SYNC_ROOM_MESSAGES_BATCH_2, TYPING, }; pub use members::MEMBERS; pub use sync::{ - DEFAULT_SYNC_SUMMARY, INVITE_SYNC, LEAVE_SYNC, LEAVE_SYNC_EVENT, MORE_SYNC, SYNC, VOIP_SYNC, + DEFAULT_SYNC_SUMMARY, INVITE_SYNC, LEAVE_SYNC, LEAVE_SYNC_EVENT, MORE_SYNC, MORE_SYNC_2, SYNC, + VOIP_SYNC, }; lazy_static! { diff --git a/crates/matrix-sdk-test/src/test_json/sync.rs b/crates/matrix-sdk-test/src/test_json/sync.rs index cb1dc57c81b..2cc6616f345 100644 --- a/crates/matrix-sdk-test/src/test_json/sync.rs +++ b/crates/matrix-sdk-test/src/test_json/sync.rs @@ -717,6 +717,130 @@ lazy_static! { }); } +lazy_static! { + pub static ref MORE_SYNC_2: JsonValue = json!({ + "next_batch": "s526_47314_0_7_1_1_1_11444_3", + "rooms": { + "join": { + "!SVkFJHzfwvuaIEawgC:localhost": { + "timeline": { + "events": [ + { + "content": { + "body": "baba", + "format": "org.matrix.custom.html", + "formatted_body": "baba", + "msgtype": "m.text" + }, + "event_id": "$152037280074GZeOm2:localhost", + "origin_server_ts": 152037280, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { + "age": 598971425 + } + }, + { + "content": { + "body": " * edited message", + "m.new_content": { + "body": "edited message", + "msgtype": "m.text" + }, + "m.relates_to": { + "event_id": "$someeventid:localhost", + "rel_type": "m.replace" + }, + "msgtype": "m.text" + }, + "event_id": "$editevid2:localhost", + "origin_server_ts": 159026265, + "sender": "@alice:matrix.org", + "type": "m.room.message", + "unsigned": { + "age": 85 + } + }, + { + "content": { + "reason": "😀" + }, + "event_id": "$151957878228ssqrJ2:localhost", + "origin_server_ts": 151957878, + "sender": "@example:localhost", + "type": "m.room.redaction", + "redacts": "$151957878228ssqrj:localhost", + "unsigned": { + "age": 85 + } + }, + { + "content": {}, + "event_id": "$15275046980maRLj2:localhost", + "origin_server_ts": 152750469, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { + "age": 19334, + "redacted_because": { + "content": {}, + "event_id": "$15275047031IXQRi:localhost", + "origin_server_ts": 152750470, + "redacts": "$15275046980maRLj:localhost", + "sender": "@example:localhost", + "type": "m.room.redaction", + "unsigned": { + "age": 14523 + } + }, + "redacted_by": "$15275047031IXQRi:localhost" + } + }, + { + "content": { + "m.relates_to": { + "event_id": "$15275047031IXQRi:localhost", + "key": "👍", + "rel_type": "m.annotation" + } + }, + "event_id": "$15275047031IXQRi2:localhost", + "origin_server_ts": 159027581, + "sender": "@alice:matrix.org", + "type": "m.reaction", + "unsigned": { + "age": 85 + } + }, + { + "content": { + "body": "This is a notice", + "format": "org.matrix.custom.html", + "formatted_body": "This is a notice", + "msgtype": "m.notice" + }, + "event_id": "$098237280074GZeOm2:localhost", + "origin_server_ts": 162037280, + "sender": "@bot:localhost", + "type": "m.room.message", + "unsigned": { + "age": 25 + } + }, + ], + "limited": true, + "prev_batch": "s526_47314_0_7_1_1_1_11444_2" + }, + "unread_notifications": { + "highlight_count": 0, + "notification_count": 11 + } + } + }, + }, + }); +} + lazy_static! { pub static ref INVITE_SYNC: JsonValue = json!({ "device_one_time_keys_count": {}, diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index d5e037dd569..0924074b03e 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -2372,6 +2372,8 @@ pub(crate) mod test { use matrix_sdk_base::media::{MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}; use matrix_sdk_test::{test_json, EventBuilder, EventsJson}; use mockito::{mock, Matcher}; + #[cfg(feature = "sled_state_store")] + use ruma::api::client::r0::message::get_message_events::Direction; use ruma::{ api::{ client::{ @@ -3680,4 +3682,182 @@ pub(crate) mod test { matches::assert_matches!(encryption_event, AnySyncStateEvent::RoomEncryption(_)); } + + #[cfg(feature = "sled_state_store")] + #[tokio::test] + async fn messages() { + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + let client = logged_in_client().await; + + let _m = mock("GET", Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string())) + .with_status(200) + .with_body(test_json::MORE_SYNC.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let mocked_messages = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/rooms/.*/messages.*from=t392-516_47314_0_7_1_1_1_11444_1.*" + .to_string(), + ), + ) + .with_status(200) + .with_body(test_json::SYNC_ROOM_MESSAGES_BATCH_1.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let _m = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/rooms/.*/messages.*from=s526_47314_0_7_1_1_1_11444_2" + .to_string(), + ), + ) + .with_status(200) + .match_header("authorization", "Bearer 1234") + .create(); + + let mocked_context = + mock("GET", Matcher::Regex(r"^/_matrix/client/r0/rooms/.*/context/.*".to_string())) + .with_status(200) + .with_body(test_json::CONTEXT_MESSAGE.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let _ = client.sync_once(sync_settings).await.unwrap(); + + let room = client.get_joined_room(&room_id).unwrap(); + + // Try to get the full timeline + let events = room.messages(None, None, 3, Direction::Forward).await.unwrap().unwrap(); + + assert_eq!(events.len(), 0); + + let events = room.messages(None, None, 3, Direction::Backward).await.unwrap().unwrap(); + + let expected_events = [ + event_id!("$098237280074GZeOm:localhost"), + event_id!("$15275047031IXQRi:localhost"), + event_id!("$15275046980maRLj:localhost"), + ]; + + assert_eq!(events.len(), expected_events.len()); + assert!(events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events) + .all(|(a, b)| a.as_ref().unwrap() == b)); + + // Try to get the timeline starting at an event not known to the store. + let events = room + .messages(Some(&event_id!("$f3h4d129462ha:example.com")), None, 3, Direction::Backward) + .await + .unwrap() + .unwrap(); + + let expected_events = [ + event_id!("$f3h4d129462ha:example.com"), + event_id!("$143273582443PhrSnbefore1:example.org"), + event_id!("$143273582443PhrSnbefore2:example.org"), + ]; + + assert_eq!(events.len(), expected_events.len()); + assert!(events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events) + .all(|(a, b)| a.as_ref().unwrap() == b)); + + mocked_context.assert(); + + let expected_events = [ + event_id!("$152037280074GZeOm:localhost"), + event_id!("$1444812213350496Caaaf:example.com"), + event_id!("$1444812213350496Cbbbf:example.com"), + event_id!("$1444812213350496Ccccf:example.com"), + ]; + + let events = room + .messages( + Some(&event_id!("$152037280074GZeOm:localhost")), + None, + expected_events.len() as u32, + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), expected_events.len()); + assert!(events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events) + .all(|(a, b)| a.as_ref().unwrap() == b)); + + let events = room + .messages( + Some(&event_id!("$1444812213350496Ccccf:example.com")), + None, + expected_events.len() as u32, + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), expected_events.len()); + assert!(events + .iter() + .rev() + .map(|event| event.event_id()) + .zip(&expected_events) + .all(|(a, b)| a.as_ref().unwrap() == b)); + + let end_event = event_id!("$1444812213350496Cbbbf:example.com"); + let events = room + .messages( + Some(&event_id!("$152037280074GZeOm:localhost")), + Some(&end_event), + expected_events.len() as u32, + Direction::Backward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 3); + assert_eq!(events.last().unwrap().event_id().unwrap(), end_event); + assert!(events + .iter() + .map(|event| event.event_id()) + .zip(&expected_events[0..4]) + .any(|(a, b)| a.as_ref().unwrap() == b)); + + let end_event = event_id!("$1444812213350496Cbbbf:example.com"); + let events = room + .messages( + Some(&event_id!("$1444812213350496Ccccf:example.com")), + Some(&end_event), + expected_events.len() as u32, + Direction::Forward, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 2); + assert_eq!(events.last().unwrap().event_id().unwrap(), end_event); + assert!(events + .iter() + .rev() + .map(|event| event.event_id()) + .zip(&expected_events[2..]) + .all(|(a, b)| a.as_ref().unwrap() == b)); + + mocked_messages.assert(); + } } diff --git a/crates/matrix-sdk/src/error.rs b/crates/matrix-sdk/src/error.rs index cd02986a3f2..9353648e1e9 100644 --- a/crates/matrix-sdk/src/error.rs +++ b/crates/matrix-sdk/src/error.rs @@ -58,11 +58,6 @@ pub enum HttpError { #[error("the queried endpoint requires authentication but was called before logging in")] AuthenticationRequired, - /// Client tried to force authentication but did not provide an access - /// token. - #[error("tried to force authentication but no access token was provided")] - ForcedAuthenticationWithoutAccessToken, - /// Queried endpoint is not meant for clients. #[error("the queried endpoint is not meant for clients")] NotClientRequest, diff --git a/crates/matrix-sdk/src/http_client.rs b/crates/matrix-sdk/src/http_client.rs index d25039b002a..8d7edb29cc9 100644 --- a/crates/matrix-sdk/src/http_client.rs +++ b/crates/matrix-sdk/src/http_client.rs @@ -142,7 +142,7 @@ impl HttpClient { if let Some(session) = read_guard.as_ref() { SendAccessToken::Always(session.access_token.as_str()) } else { - return Err(HttpError::ForcedAuthenticationWithoutAccessToken); + SendAccessToken::None } } else { match Request::METADATA.authentication { diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index a1206e433c7..ebfd9dc45ef 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -1,24 +1,30 @@ -use std::{ops::Deref, sync::Arc}; +use std::{cmp::min, convert::TryFrom, ops::Deref, sync::Arc}; -use matrix_sdk_base::deserialized_responses::{MembersResponse, RoomEvent}; +use matrix_sdk_base::{ + deserialized_responses::{MembersResponse, RoomEvent, SyncRoomEvent}, + StoredTimelineSlice, +}; use matrix_sdk_common::locks::Mutex; use ruma::{ api::client::r0::{ + context::get_context, membership::{get_member_events, join_room_by_id, leave_room}, - message::get_message_events, + message::{get_message_events, get_message_events::Direction}, room::get_room_event, }, events::{ - room::history_visibility::HistoryVisibility, AnyStateEvent, AnySyncStateEvent, EventType, + room::history_visibility::HistoryVisibility, AnyRoomEvent, AnyStateEvent, + AnySyncStateEvent, EventType, }, serde::Raw, - EventId, UserId, + EventId, UInt, UserId, }; +use tracing::trace; use crate::{ media::{MediaFormat, MediaRequest, MediaType}, room::RoomType, - BaseRoom, Client, Result, RoomMember, + BaseRoom, Client, HttpResult, Result, RoomMember, }; /// A struct containing methods that are common for Joined, Invited and Left @@ -129,12 +135,8 @@ impl Common { } /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and - /// returns a `Messages` struct that contains a chunk of room and state - /// events (`RoomEvent` and `AnyStateEvent`). - /// - /// With the encryption feature, messages are decrypted if possible. If - /// decryption fails for an individual message, that message is returned - /// undecrypted. + /// returns a `get_message_events::Response` that contains a chunk of + /// room and state events (`AnyRoomEvent` and `AnyStateEvent`). /// /// # Arguments /// @@ -162,46 +164,221 @@ impl Common { /// # .unwrap(); /// # use futures::executor::block_on; /// # block_on(async { - /// assert!(room.messages(request).await.is_ok()); + /// assert!(room.request_messages(request).await.is_ok()); /// # }); /// ``` - pub async fn messages( + pub async fn request_messages( &self, request: impl Into>, - ) -> Result { + ) -> HttpResult { let request = request.into(); - let http_response = self.client.send(request, None).await?; + self.client.send(request, None).await + } - let mut response = Messages { - start: http_response.start, - end: http_response.end, - chunk: Vec::with_capacity(http_response.chunk.len()), - state: http_response.state, - }; + /// Gets a slice of the timeline of this room + /// + /// Returns a slice of the timeline between `start` and `end`, no longer + /// then `limit`. If the number of events is fewer then `limit` it means + /// that in the given direction no more events exist. + /// If the timeline doesn't contain an event with the given `start` `None` + /// is returned. + /// + /// # Arguments + /// + /// * `start` - An `EventId` that indicates the start of the slice. If + /// `None` the most recent + /// events in the given direction are returned. + /// + /// * `end` - An `EventId` that indicates the end of the slice. + /// + /// * `limit` - The maximum number of events that should be returned. + /// + /// * `direction` - The direction of the search and returned events. + /// + /// # Examples + /// ```no_run + /// # use std::convert::TryFrom; + /// use matrix_sdk::Client; + /// # use matrix_sdk::ruma::{event_id, room_id}; + /// # use matrix_sdk::ruma::api::client::r0::{ + /// # filter::RoomEventFilter, + /// # message::get_message_events::{Direction, Request as MessagesRequest}, + /// # }; + /// # use url::Url; + /// + /// # let homeserver = Url::parse("http://example.com").unwrap(); + /// let room_id = room_id!("!roomid:example.com"); + /// + /// let mut client = Client::new(homeserver).unwrap(); + /// # let room = client + /// # .get_joined_room(&room_id) + /// # .unwrap(); + /// # use futures::executor::block_on; + /// # block_on(async { + /// assert!(room.messages(Some(&event_id!("$xxxxxx:example.org")), None, 10, Direction::Backward).await.is_ok()); + /// # }); + /// ``` + pub async fn messages( + &self, + start: Option<&EventId>, + end: Option<&EventId>, + limit: u32, + direction: Direction, + ) -> Result>> { + let room_id = self.inner.room_id(); + if let Some(stored) = self + .client + .store() + .get_timeline(room_id, start, end, Some(limit as usize), direction.clone()) + .await? + { + // We found a gap or the end of the stored timeline. + Ok(Some(self.request_missing_messages(stored, end, limit, &direction).await?)) + } else { + // The start event wasn't found in the store fallback to the context api. + self.request_missing_messages_via_context(start, end, limit, &direction).await + } + } + + fn context_to_message_response( + &self, + mut context: get_context::Response, + ) -> Option { + let mut response = get_message_events::Response::new(); + response.start = context.start; + response.end = context.end; + + let mut events: Vec> = context.events_after.into_iter().rev().collect(); + events.push(context.event?); + events.append(&mut context.events_before); + + response.chunk = events; + response.state = context.state; - for event in http_response.chunk { - #[cfg(feature = "encryption")] - let event = match event.deserialize() { - Ok(event) => self.client.decrypt_room_event(&event).await, - Err(_) => { - // "Broken" messages (i.e., those that cannot be deserialized) are - // returned unchanged so that the caller can handle them individually. - RoomEvent { event, encryption_info: None } + Some(response) + } + + async fn request_missing_messages( + &self, + mut stored: StoredTimelineSlice, + end: Option<&EventId>, + limit: u32, + direction: &Direction, + ) -> Result> { + if let Some(token) = stored.token { + trace!("A gap in the stored timeline was found. Request messages from token {}", token); + + let room_id = self.inner.room_id(); + let mut request = get_message_events::Request::new(room_id, &token, direction.clone()); + // The number of events found in the store is never more then `limit`. + request.limit = UInt::from(limit - stored.events.len() as u32); + + let response = self.request_messages(request).await?; + + // FIXME: we may received an invalied server response that ruma considers valid + // See https://github.com/ruma/ruma/issues/644 + if response.end.is_none() && response.start.is_none() { + return Ok(stored.events); + } + + let response_events = + self.client.base_client().receive_messages(room_id, direction, &response).await?; + + // If our end event is part of our event list make sure we don't return past the + // end, otherwise return the whole list. + let mut response_events = if let Some(end) = end { + if let Some(position) = response_events + .iter() + .position(|event| event.event_id().map_or(false, |event_id| &event_id == end)) + { + response_events.into_iter().take(position + 1).collect() + } else { + response_events } + } else { + response_events }; - #[cfg(not(feature = "encryption"))] - let event = RoomEvent { event, encryption_info: None }; - - response.chunk.push(event); + match direction { + Direction::Forward => { + response_events.append(&mut stored.events); + stored.events = response_events; + } + Direction::Backward => stored.events.append(&mut response_events), + } } + Ok(stored.events) + } - Ok(response) + async fn request_missing_messages_via_context( + &self, + start: Option<&EventId>, + end: Option<&EventId>, + limit: u32, + direction: &Direction, + ) -> Result>> { + let start = if let Some(start) = start { + start + } else { + // If no start event was given it's not possible to find the context of it. + return Ok(None); + }; + + trace!("The start event with id {} wasn't found in the stored timeline. Fallback to the context API", start); + + let room_id = self.inner.room_id(); + let mut request = get_context::Request::new(room_id, start); + + // We need to take limit twice because the context api returns events before + // and after the given event + request.limit = UInt::try_from(limit as u64 * 2u64).unwrap_or(UInt::MAX); + + let context = self.client.send(request, None).await?; + + let limit = limit as usize; + let before_length = context.events_before.len(); + let after_length = context.events_after.len(); + let response = if let Some(response) = self.context_to_message_response(context) { + response + } else { + return Ok(None); + }; + let response_events = self + .client + .base_client() + .receive_messages(room_id, &Direction::Backward, &response) + .await?; + + let response_events: Vec = match direction { + Direction::Forward => { + let lower_bound = if before_length > limit { before_length - limit } else { 0 }; + response_events[lower_bound..=before_length].to_vec() + } + Direction::Backward => response_events + [after_length..min(response_events.len(), after_length + limit)] + .to_vec(), + }; + + let result = if let Some(end) = end { + if let Some(position) = response_events + .iter() + .position(|event| event.event_id().map_or(false, |event_id| &event_id == end)) + { + response_events.into_iter().take(position + 1).collect() + } else { + response_events + } + } else { + response_events + }; + + Ok(Some(result)) } /// Fetch the event with the given `EventId` in this room. pub async fn event(&self, event_id: &EventId) -> Result { let request = get_room_event::Request::new(self.room_id(), event_id); + let event = self.client.send(request, None).await?.event.deserialize()?; #[cfg(feature = "encryption")]