diff --git a/crates/matrix-sdk-base/Cargo.toml b/crates/matrix-sdk-base/Cargo.toml index 65a04046617..484379ea8a2 100644 --- a/crates/matrix-sdk-base/Cargo.toml +++ b/crates/matrix-sdk-base/Cargo.toml @@ -34,10 +34,12 @@ indexeddb_state_store = ["indexed_db_futures", "wasm-bindgen", "pbkdf2", "hmac", indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"] [dependencies] +async-stream = "0.3.2" chacha20poly1305 = { version = "0.9.0", optional = true } dashmap = "4.0.2" futures-core = "0.3.15" futures-util = { version = "0.3.15", default-features = false } +futures-channel = "0.3.15" hmac = { version = "0.12.0", optional = true } lru = "0.7.2" matrix-sdk-common = { version = "0.4.0", path = "../matrix-sdk-common" } diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 02d1a2c5db0..40cb62cff84 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -30,7 +30,7 @@ use matrix_sdk_common::locks::Mutex; use matrix_sdk_common::{ deserialized_responses::{ AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms, - StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, + StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice, }, instant::Instant, locks::RwLock, @@ -848,6 +848,16 @@ impl BaseClient { let notification_count = new_info.unread_notifications.into(); room_info.update_notification_count(notification_count); + let timeline_slice = TimelineSlice::new( + timeline.events.clone(), + next_batch.clone(), + timeline.prev_batch.clone(), + timeline.limited, + true, + ); + + changes.add_timeline(&room_id, timeline_slice); + new_rooms.join.insert( room_id, JoinedRoom::new( @@ -958,11 +968,33 @@ impl BaseClient { room.update_summary(room_info.clone()) } } + for (room_id, room_info) in &changes.stripped_room_infos { if let Some(room) = self.store.get_stripped_room(room_id) { room.update_summary(room_info.clone()) } } + + for (room_id, timeline_slice) in &changes.timeline { + if let Some(room) = self.store.get_room(room_id) { + room.add_timeline_slice(timeline_slice).await; + } + } + } + + /// Receive a timeline slice obtained from a messages request. + /// + /// You should pass only slices requested from the store to this function. + /// + /// * `timeline` - The `TimelineSlice` + pub async fn receive_messages(&self, room_id: &RoomId, timeline: TimelineSlice) -> Result<()> { + let mut changes = StateChanges::default(); + + changes.add_timeline(room_id, timeline); + + self.store().save_changes(&changes).await?; + + Ok(()) } /// Receive a get member events response and convert it to a deserialized diff --git a/crates/matrix-sdk-base/src/lib.rs b/crates/matrix-sdk-base/src/lib.rs index fa8439170ae..5eacb5e7f49 100644 --- a/crates/matrix-sdk-base/src/lib.rs +++ b/crates/matrix-sdk-base/src/lib.rs @@ -39,6 +39,7 @@ pub use matrix_sdk_common::*; pub use crate::{ error::{Error, Result}, session::Session, + timeline_stream::TimelineStreamError, }; mod client; @@ -47,6 +48,7 @@ pub mod media; mod rooms; mod session; mod store; +mod timeline_stream; pub use client::{BaseClient, BaseClientConfig}; #[cfg(feature = "encryption")] diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index afb07cd89ff..587538f2426 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -14,7 +14,11 @@ use std::sync::{Arc, RwLock as SyncRwLock}; +use dashmap::DashSet; +use futures_channel::mpsc; +use futures_core::stream::Stream; use futures_util::stream::{self, StreamExt}; +use matrix_sdk_common::locks::Mutex; use ruma::{ api::client::r0::sync::sync_events::RoomSummary as RumaSummary, events::{ @@ -34,12 +38,13 @@ use ruma::{ EventId, MxcUri, RoomAliasId, RoomId, UserId, }; use serde::{Deserialize, Serialize}; -use tracing::debug; +use tracing::{debug, warn}; use super::{BaseRoomInfo, RoomMember}; use crate::{ - deserialized_responses::UnreadNotificationsCount, + deserialized_responses::{SyncRoomEvent, TimelineSlice, UnreadNotificationsCount}, store::{Result as StoreResult, StateStore}, + timeline_stream::{TimelineStreamBackward, TimelineStreamError, TimelineStreamForward}, }; /// The underlying room data structure collecting state for joined, left and @@ -50,6 +55,8 @@ pub struct Room { own_user_id: Arc, inner: Arc>, store: Arc, + forward_timeline_streams: Arc>>>, + backward_timeline_streams: Arc>>>, } /// The room summary containing member counts and members that should be used to @@ -107,6 +114,8 @@ impl Room { room_id: room_info.room_id.clone(), store, inner: Arc::new(SyncRwLock::new(room_info)), + forward_timeline_streams: Default::default(), + backward_timeline_streams: Default::default(), } } @@ -467,6 +476,112 @@ impl Room { ) -> StoreResult, Receipt)>> { self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await } + + /// Get two stream into the timeline. + /// First one is forward in time and the second one is backward in time. + pub async fn timeline( + &self, + ) -> StoreResult<( + impl Stream, + impl Stream>, + )> { + // We need to hold the lock while we create the stream so that we don't lose new + // sync responses + let mut forward_timeline_streams = self.forward_timeline_streams.lock().await; + let mut backward_timeline_streams = self.backward_timeline_streams.lock().await; + let sync_token = self.store.get_sync_token().await?; + let event_ids = Arc::new(DashSet::new()); + + let (backward_stream, backward_sender) = if let Some((stored_events, end_token)) = + self.store.room_timeline(&self.room_id).await? + { + TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events)) + } else { + TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None) + }; + + backward_timeline_streams.push(backward_sender); + + let (forward_stream, forward_sender) = TimelineStreamForward::new(event_ids); + forward_timeline_streams.push(forward_sender); + + Ok((forward_stream, backward_stream)) + } + + /// Create a stream that returns all events of the room's timeline forward + /// in time. + /// + /// If you need also a backward stream you should use + /// [`timeline`][`crate::Room::timeline`] + pub async fn timeline_forward(&self) -> StoreResult> { + let mut forward_timeline_streams = self.forward_timeline_streams.lock().await; + let event_ids = Arc::new(DashSet::new()); + + let (forward_stream, forward_sender) = TimelineStreamForward::new(event_ids); + forward_timeline_streams.push(forward_sender); + + Ok(forward_stream) + } + + /// Create a stream that returns all events of the room's timeline backward + /// in time. + /// + /// If you need also a forward stream you should use + /// [`timeline`][`crate::Room::timeline`] + pub async fn timeline_backward( + &self, + ) -> StoreResult>> { + let mut backward_timeline_streams = self.backward_timeline_streams.lock().await; + let sync_token = self.store.get_sync_token().await?; + let event_ids = Arc::new(DashSet::new()); + + let (backward_stream, backward_sender) = if let Some((stored_events, end_token)) = + self.store.room_timeline(&self.room_id).await? + { + TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events)) + } else { + TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None) + }; + + backward_timeline_streams.push(backward_sender); + + Ok(backward_stream) + } + + /// Add a new timeline slice to the timeline streams. + pub async fn add_timeline_slice(&self, timeline: &TimelineSlice) { + if timeline.sync { + let mut streams = self.forward_timeline_streams.lock().await; + let mut remaining_streams = Vec::with_capacity(streams.len()); + while let Some(mut forward) = streams.pop() { + if !forward.is_closed() { + if let Err(error) = forward.try_send(timeline.clone()) { + if error.is_full() { + warn!("Drop timeline slice because the limit of the buffer for the forward stream is reached"); + } + } else { + remaining_streams.push(forward); + } + } + } + *streams = remaining_streams; + } else { + let mut streams = self.backward_timeline_streams.lock().await; + let mut remaining_streams = Vec::with_capacity(streams.len()); + while let Some(mut backward) = streams.pop() { + if !backward.is_closed() { + if let Err(error) = backward.try_send(timeline.clone()) { + if error.is_full() { + warn!("Drop timeline slice because the limit of the buffer for the backward stream is reached"); + } + } else { + remaining_streams.push(backward); + } + } + } + *streams = remaining_streams; + } + } } /// The underlying pure data structure for joined and left rooms. diff --git a/crates/matrix-sdk-base/src/store/indexeddb_store.rs b/crates/matrix-sdk-base/src/store/indexeddb_store.rs index 07e52c7669d..112e33a911c 100644 --- a/crates/matrix-sdk-base/src/store/indexeddb_store.rs +++ b/crates/matrix-sdk-base/src/store/indexeddb_store.rs @@ -14,6 +14,7 @@ use std::collections::BTreeSet; +use futures_util::stream; use indexed_db_futures::prelude::*; use matrix_sdk_common::{async_trait, SafeEncode}; use ruma::{ @@ -21,19 +22,21 @@ use ruma::{ presence::PresenceEvent, receipt::Receipt, room::member::{MembershipState, RoomMemberEventContent}, - AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType, + AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncMessageEvent, AnySyncRoomEvent, + AnySyncStateEvent, EventType, Redact, }, receipt::ReceiptType, serde::Raw, - EventId, MxcUri, RoomId, UserId, + EventId, MxcUri, RoomId, RoomVersionId, UserId, }; use serde::{Deserialize, Serialize}; +use tracing::{info, warn}; use wasm_bindgen::JsValue; use self::store_key::{EncryptedEvent, StoreKey}; -use super::{store_key, Result, RoomInfo, StateChanges, StateStore, StoreError}; +use super::{store_key, BoxStream, Result, RoomInfo, StateChanges, StateStore, StoreError}; use crate::{ - deserialized_responses::MemberEvent, + deserialized_responses::{MemberEvent, SyncRoomEvent}, media::{MediaRequest, UniqueKey}, }; @@ -56,36 +59,40 @@ mod KEYS { // STORES - pub const SESSION: &'static str = "session"; - pub const ACCOUNT_DATA: &'static str = "account_data"; + pub const SESSION: &str = "session"; + pub const ACCOUNT_DATA: &str = "account_data"; - pub const MEMBERS: &'static str = "members"; - pub const PROFILES: &'static str = "profiles"; - pub const DISPLAY_NAMES: &'static str = "display_names"; - pub const JOINED_USER_IDS: &'static str = "joined_user_ids"; - pub const INVITED_USER_IDS: &'static str = "invited_user_ids"; + pub const MEMBERS: &str = "members"; + pub const PROFILES: &str = "profiles"; + pub const DISPLAY_NAMES: &str = "display_names"; + pub const JOINED_USER_IDS: &str = "joined_user_ids"; + pub const INVITED_USER_IDS: &str = "invited_user_ids"; - pub const ROOM_STATE: &'static str = "room_state"; - pub const ROOM_INFOS: &'static str = "room_infos"; - pub const PRESENCE: &'static str = "presence"; - pub const ROOM_ACCOUNT_DATA: &'static str = "room_account_data"; + pub const ROOM_STATE: &str = "room_state"; + pub const ROOM_INFOS: &str = "room_infos"; + pub const PRESENCE: &str = "presence"; + pub const ROOM_ACCOUNT_DATA: &str = "room_account_data"; - pub const STRIPPED_ROOM_INFOS: &'static str = "stripped_room_infos"; - pub const STRIPPED_MEMBERS: &'static str = "stripped_members"; - pub const STRIPPED_ROOM_STATE: &'static str = "stripped_room_state"; + pub const STRIPPED_ROOM_INFOS: &str = "stripped_room_infos"; + pub const STRIPPED_MEMBERS: &str = "stripped_members"; + pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state"; - pub const ROOM_USER_RECEIPTS: &'static str = "room_user_receipts"; - pub const ROOM_EVENT_RECEIPTS: &'static str = "room_event_receipts"; + pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts"; + pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts"; - pub const MEDIA: &'static str = "media"; + pub const ROOM_TIMELINE: &str = "room_timeline"; + pub const ROOM_TIMELINE_METADATA: &str = "room_timeline_metadata"; + pub const ROOM_EVENT_ID_TO_POSITION: &str = "room_event_id_to_position"; - pub const CUSTOM: &'static str = "custom"; + pub const MEDIA: &str = "media"; + + pub const CUSTOM: &str = "custom"; // static keys - pub const STORE_KEY: &'static str = "store_key"; - pub const FILTER: &'static str = "filter"; - pub const SYNC_TOKEN: &'static str = "sync_token"; + pub const STORE_KEY: &str = "store_key"; + pub const FILTER: &str = "filter"; + pub const SYNC_TOKEN: &str = "sync_token"; } impl From for StoreError { @@ -144,6 +151,10 @@ impl IndexeddbStore { db.create_object_store(KEYS::ROOM_USER_RECEIPTS)?; db.create_object_store(KEYS::ROOM_EVENT_RECEIPTS)?; + db.create_object_store(KEYS::ROOM_TIMELINE)?; + db.create_object_store(KEYS::ROOM_TIMELINE_METADATA)?; + db.create_object_store(KEYS::ROOM_EVENT_ID_TO_POSITION)?; + db.create_object_store(KEYS::MEDIA)?; db.create_object_store(KEYS::CUSTOM)?; @@ -279,7 +290,7 @@ impl IndexeddbStore { (!changes.profiles.is_empty(), KEYS::PROFILES), (!changes.state.is_empty(), KEYS::ROOM_STATE), (!changes.room_account_data.is_empty(), KEYS::ROOM_ACCOUNT_DATA), - (!changes.room_infos.is_empty(), KEYS::ROOM_INFOS), + (!changes.room_infos.is_empty() || !changes.timeline.is_empty(), KEYS::ROOM_INFOS), (!changes.receipts.is_empty(), KEYS::ROOM_EVENT_RECEIPTS), (!changes.stripped_state.is_empty(), KEYS::STRIPPED_ROOM_STATE), (!changes.stripped_members.is_empty(), KEYS::STRIPPED_MEMBERS), @@ -302,7 +313,15 @@ impl IndexeddbStore { stores.extend([KEYS::ROOM_EVENT_RECEIPTS, KEYS::ROOM_USER_RECEIPTS]) } - if stores.len() == 0 { + if !changes.timeline.is_empty() { + stores.extend([ + KEYS::ROOM_TIMELINE, + KEYS::ROOM_TIMELINE_METADATA, + KEYS::ROOM_EVENT_ID_TO_POSITION, + ]) + } + + if stores.is_empty() { // nothing to do, quit early return Ok(()); } @@ -468,6 +487,183 @@ impl IndexeddbStore { } } + if !changes.timeline.is_empty() { + let timeline_store = tx.object_store(KEYS::ROOM_TIMELINE)?; + let timeline_metadata_store = tx.object_store(KEYS::ROOM_TIMELINE_METADATA)?; + let event_id_to_position_store = tx.object_store(KEYS::ROOM_EVENT_ID_TO_POSITION)?; + let room_infos = tx.object_store(KEYS::ROOM_INFOS)?; + + for (room_id, timeline) in &changes.timeline { + if timeline.sync { + info!("Save new timeline batch from sync response for {}", room_id); + } else { + info!("Save new timeline batch from messages response for {}", room_id); + } + let room_key = room_id.encode(); + + let metadata: Option = if timeline.limited { + info!( + "Delete stored timeline for {} because the sync response was limited", + room_id + ); + + let range = room_id.encode_to_range().map_err(StoreError::Codec)?; + let stores = + &[&timeline_store, &timeline_metadata_store, &event_id_to_position_store]; + for store in stores { + for key in store.get_all_keys_with_key(&range)?.await?.iter() { + store.delete(&key)?; + } + } + + None + } else { + let metadata: Option = timeline_metadata_store + .get(&room_key)? + .await? + .map(|v| v.into_serde()) + .transpose()?; + if let Some(mut metadata) = metadata { + if !timeline.sync && Some(&timeline.start) != metadata.end.as_ref() { + // This should only happen when a developer adds a wrong timeline + // batch to the `StateChanges` or the server returns a wrong response + // to our request. + warn!("Drop unexpected timeline batch for {}", room_id); + return Ok(()); + } + + // Check if the event already exists in the store + let mut delete_timeline = false; + for event in &timeline.events { + if let Some(event_id) = event.event_id() { + let event_key = (room_id, &event_id).encode(); + if event_id_to_position_store + .count_with_key_owned(event_key)? + .await? + > 0 + { + delete_timeline = true; + break; + } + } + } + + if delete_timeline { + info!( + "Delete stored timeline for {} because of duplicated events", + room_id + ); + + let range = room_id.encode_to_range().map_err(StoreError::Codec)?; + let stores = &[ + &timeline_store, + &timeline_metadata_store, + &event_id_to_position_store, + ]; + for store in stores { + for key in store.get_all_keys_with_key(&range)?.await?.iter() { + store.delete(&key)?; + } + } + + None + } else if timeline.sync { + metadata.start = timeline.start.clone(); + Some(metadata) + } else { + metadata.end = timeline.end.clone(); + Some(metadata) + } + } else { + None + } + }; + + let mut metadata = if let Some(metadata) = metadata { + metadata + } else { + TimelineMetadata { + start: timeline.start.clone(), + end: timeline.end.clone(), + start_position: usize::MAX / 2, + end_position: usize::MAX / 2, + } + }; + + if timeline.sync { + let room_version = room_infos + .get(&room_key)? + .await? + .map(|r| self.deserialize_event::(r)) + .transpose()? + .and_then(|info| info.base_info.create.map(|event| event.room_version)) + .unwrap_or_else(|| { + warn!( + "Unable to find the room version for {}, assume version 9", + room_id + ); + RoomVersionId::V9 + }); + for event in timeline.events.iter().rev() { + // Redact events already in store only on sync response + if let Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomRedaction( + redaction, + ))) = event.event.deserialize() + { + let redacts_key = (room_id, &redaction.redacts).encode(); + if let Some(position_key) = + event_id_to_position_store.get_owned(redacts_key)?.await? + { + if let Some(mut full_event) = timeline_store + .get(&position_key)? + .await? + .map(|e| { + self.deserialize_event::(e) + .map_err(StoreError::from) + }) + .transpose()? + { + let inner_event = full_event.event.deserialize()?; + full_event.event = Raw::new(&AnySyncRoomEvent::from( + inner_event.redact(redaction, &room_version), + ))?; + timeline_store.put_key_val_owned( + position_key, + &self.serialize_event(&full_event)?, + )?; + } + } + } + + metadata.start_position -= 1; + let key = (room_id, &metadata.start_position).encode(); + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + let event_key = (room_id, &event_id).encode(); + event_id_to_position_store.put_key_val(&event_key, &key)?; + } + + timeline_store.put_key_val_owned(key, &self.serialize_event(&event)?)?; + } + } else { + for event in timeline.events.iter() { + metadata.end_position += 1; + let key = (room_id, &metadata.end_position).encode(); + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + let event_key = (room_id, &event_id).encode(); + event_id_to_position_store.put_key_val(&event_key, &key)?; + } + + timeline_store.put_key_val_owned(key, &self.serialize_event(&event)?)?; + } + } + + timeline_metadata_store + .put_key_val_owned(room_key, &JsValue::from_serde(&metadata)?)?; + } + } + tx.await.into_result().map_err::(|e| e.into()) } @@ -503,7 +699,7 @@ impl IndexeddbStore { room_id: &RoomId, event_type: EventType, ) -> Result>> { - let range = (room_id, &event_type).encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = (room_id, &event_type).encode_to_range().map_err(StoreError::Codec)?; Ok(self .inner .transaction_on_one_with_mode(KEYS::ROOM_STATE, IdbTransactionMode::Readonly)? @@ -546,7 +742,7 @@ impl IndexeddbStore { } pub async fn get_user_ids_stream(&self, room_id: &RoomId) -> Result>> { - let range = room_id.encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = room_id.encode_to_range().map_err(StoreError::Codec)?; let skip = room_id.as_encoded_string().len() + 1; Ok(self .inner @@ -563,7 +759,7 @@ impl IndexeddbStore { } pub async fn get_invited_user_ids(&self, room_id: &RoomId) -> Result>> { - let range = room_id.encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = room_id.encode_to_range().map_err(StoreError::Codec)?; let entries = self .inner .transaction_on_one_with_mode(KEYS::INVITED_USER_IDS, IdbTransactionMode::Readonly)? @@ -578,7 +774,7 @@ impl IndexeddbStore { } pub async fn get_joined_user_ids(&self, room_id: &RoomId) -> Result>> { - let range = room_id.encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = room_id.encode_to_range().map_err(StoreError::Codec)?; Ok(self .inner .transaction_on_one_with_mode(KEYS::JOINED_USER_IDS, IdbTransactionMode::Readonly)? @@ -688,7 +884,7 @@ impl IndexeddbStore { ) -> Result, Receipt)>> { let key = (room_id, &receipt_type, event_id); let prefix_len = key.as_encoded_string().len() + 1; - let range = key.encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = key.encode_to_range().map_err(StoreError::Codec)?; let tx = self.inner.transaction_on_one_with_mode( KEYS::ROOM_EVENT_RECEIPTS, IdbTransactionMode::Readonly, @@ -698,8 +894,10 @@ impl IndexeddbStore { let mut all = Vec::new(); for k in store.get_all_keys_with_key(&range)?.await?.iter() { // FIXME: we should probably parallelize this... - let res = - store.get(&k)?.await?.ok_or(StoreError::Codec(format!("no data at {:?}", k)))?; + let res = store + .get(&k)? + .await? + .ok_or_else(|| StoreError::Codec(format!("no data at {:?}", k)))?; let u = if let Some(k_str) = k.as_string() { UserId::parse(&k_str[prefix_len..]) .map_err(|e| StoreError::Codec(format!("{:?}", e)))? @@ -740,7 +938,7 @@ impl IndexeddbStore { let jskey = &JsValue::from_str( core::str::from_utf8(key).map_err(|e| StoreError::Codec(format!("{:}", e)))?, ); - self.get_custom_value_for_js(&jskey).await + self.get_custom_value_for_js(jskey).await } async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result>> { @@ -781,7 +979,7 @@ impl IndexeddbStore { } async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> { - let range = uri.encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = uri.encode_to_range().map_err(StoreError::Codec)?; let tx = self.inner.transaction_on_one_with_mode(KEYS::MEDIA, IdbTransactionMode::Readwrite)?; let store = tx.object_store(KEYS::MEDIA)?; @@ -808,6 +1006,9 @@ impl IndexeddbStore { KEYS::ROOM_USER_RECEIPTS, KEYS::STRIPPED_ROOM_STATE, KEYS::STRIPPED_MEMBERS, + KEYS::ROOM_TIMELINE, + KEYS::ROOM_TIMELINE_METADATA, + KEYS::ROOM_EVENT_ID_TO_POSITION, ]; let all_stores = { @@ -826,7 +1027,7 @@ impl IndexeddbStore { tx.object_store(store_name)?.delete(&room_key)?; } - let range = room_id.encode_to_range().map_err(|e| StoreError::Codec(e))?; + let range = room_id.encode_to_range().map_err(StoreError::Codec)?; for store_name in prefixed_stores { let store = tx.object_store(store_name)?; for key in store.get_all_keys_with_key(&range)?.await?.iter() { @@ -835,6 +1036,40 @@ impl IndexeddbStore { } tx.await.into_result().map_err::(|e| e.into()) } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + let key = room_id.encode(); + let tx = self.inner.transaction_on_multi_with_mode( + &[KEYS::ROOM_TIMELINE, KEYS::ROOM_TIMELINE_METADATA], + IdbTransactionMode::Readonly, + )?; + let timeline = tx.object_store(KEYS::ROOM_TIMELINE)?; + let metadata = tx.object_store(KEYS::ROOM_TIMELINE_METADATA)?; + + let metadata: Option = + metadata.get(&key)?.await?.map(|v| v.into_serde()).transpose()?; + if metadata.is_none() { + info!("No timeline for {} was previously stored", room_id); + return Ok(None); + } + let end_token = metadata.and_then(|m| m.end); + #[allow(clippy::needless_collect)] + let timeline: Vec> = timeline + .get_all_with_key(&key)? + .await? + .iter() + .map(|v| self.deserialize_event(v).map_err(|e| e.into())) + .collect(); + + let stream = Box::pin(stream::iter(timeline.into_iter())); + + info!("Found previously stored timeline for {}, with end token {:?}", room_id, end_token); + + Ok(Some((stream, end_token))) + } } #[async_trait(?Send)] @@ -980,6 +1215,21 @@ impl StateStore for IndexeddbStore { async fn remove_room(&self, room_id: &RoomId) -> Result<()> { self.remove_room(room_id).await } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + self.room_timeline(room_id).await + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct TimelineMetadata { + pub start: String, + pub start_position: usize, + pub end: Option, + pub end_position: usize, } #[cfg(test)] diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 01c231bf1bd..b982ebd2a42 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -4,9 +4,19 @@ macro_rules! statestore_integration_tests { ($($name:ident)*) => { $( mod $name { + + use futures_util::StreamExt; + use http::Response; use matrix_sdk_test::{async_test, test_json}; use ruma::{ - api::client::r0::media::get_content_thumbnail::Method, + api::{ + client::r0::{ + media::get_content_thumbnail::Method, + message::get_message_events::Response as MessageResponse, + sync::sync_events::Response as SyncResponse, + }, + IncomingResponse, + }, device_id, event_id, events::{ presence::PresenceEvent, EventContent, @@ -30,7 +40,7 @@ macro_rules! statestore_integration_tests { use crate::{ RoomType, Session, - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, RoomEvent, SyncRoomEvent, TimelineSlice}, media::{MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, store::{ Store, @@ -519,7 +529,155 @@ macro_rules! statestore_integration_tests { assert_eq!(store.get_stripped_room_infos().await?.len(), 0); Ok(()) } + + #[async_test] + async fn test_room_timeline() { + let store = get_store().await.unwrap(); + let mut stored_events = Vec::new(); + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + + // Before the first sync the timeline should be empty + assert!(store.room_timeline(room_id).await.unwrap().is_none()); + + // Add sync response + let sync = SyncResponse::try_from_http_response( + Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).unwrap()).unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[room_id].timeline; + let events: Vec = timeline.events.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = TimelineSlice::new( + events, + sync.next_batch.clone(), + timeline.prev_batch.clone(), + false, + true, + ); + let mut changes = StateChanges::new(sync.next_batch.clone()); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, timeline.prev_batch.as_deref()) + .await; + + // Add message response + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages + .chunk + .iter() + .cloned() + .map(|event| RoomEvent { event, encryption_info: None }.into()) + .collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false); + let mut changes = StateChanges::default(); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; + + // Add second message response + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages + .chunk + .iter() + .cloned() + .map(|event| RoomEvent { event, encryption_info: None }.into()) + .collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false); + let mut changes = StateChanges::default(); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; + + // Add second sync response + let sync = SyncResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::MORE_SYNC_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[room_id].timeline; + let events: Vec = timeline.events.iter().cloned().map(Into::into).collect(); + + let mut prev_stored_events = stored_events; + stored_events = events.clone(); + stored_events.append(&mut prev_stored_events); + + let timeline_slice = TimelineSlice::new( + events, + sync.next_batch.clone(), + timeline.prev_batch.clone(), + false, + true, + ); + let mut changes = StateChanges::new(sync.next_batch.clone()); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; + + // Check if limited sync removes the stored timeline + let end_token = Some("end token".to_string()); + let timeline_slice = TimelineSlice::new( + Vec::new(), + "start token".to_string(), + end_token.clone(), + true, + true, + ); + let mut changes = StateChanges::default(); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &Vec::new(), end_token.as_deref()).await; + } + + async fn check_timeline_events( + room_id: &RoomId, + store: &dyn StateStore, + stored_events: &[SyncRoomEvent], + expected_end_token: Option<&str>, + ) { + let (timeline_iter, end_token) = store.room_timeline(room_id).await.unwrap().unwrap(); + + assert_eq!(end_token.as_deref(), expected_end_token); + + let timeline = timeline_iter.collect::>>().await; + + assert!(timeline + .into_iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.unwrap().event_id() == b.event_id())); + } + } + )* } } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 24774087dd6..ef173df7222 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -13,10 +13,11 @@ // limitations under the License. use std::{ - collections::BTreeSet, + collections::{BTreeMap, BTreeSet, HashMap}, sync::{Arc, RwLock}, }; +use async_stream::stream; use dashmap::{DashMap, DashSet}; use lru::LruCache; #[allow(unused_imports)] @@ -27,18 +28,18 @@ use ruma::{ receipt::Receipt, room::member::{MembershipState, RoomMemberEventContent}, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, - AnySyncStateEvent, EventType, + AnySyncMessageEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, Redact, }, receipt::ReceiptType, serde::Raw, - EventId, MxcUri, RoomId, UserId, + EventId, MxcUri, RoomId, RoomVersionId, UserId, }; #[allow(unused_imports)] -use tracing::info; +use tracing::{info, warn}; -use super::{Result, RoomInfo, StateChanges, StateStore}; +use super::{BoxStream, Result, RoomInfo, StateChanges, StateStore}; use crate::{ - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent}, media::{MediaRequest, UniqueKey}, }; @@ -68,6 +69,7 @@ pub struct MemoryStore { >, media: Arc>>>, custom: Arc, Vec>>, + room_timeline: Arc, TimelineData>>, } impl MemoryStore { @@ -93,6 +95,7 @@ impl MemoryStore { room_event_receipts: Default::default(), media: Arc::new(Mutex::new(LruCache::new(100))), custom: DashMap::new().into(), + room_timeline: Default::default(), } } @@ -274,6 +277,119 @@ impl MemoryStore { } } + for (room, timeline) in &changes.timeline { + if timeline.sync { + info!("Save new timeline batch from sync response for {}", room); + } else { + info!("Save new timeline batch from messages response for {}", room); + } + + let data = if timeline.limited { + info!("Delete stored timeline for {} because the sync response was limited", room); + self.room_timeline.remove(room); + None + } else if let Some(mut data) = self.room_timeline.get_mut(room) { + if !timeline.sync && Some(&timeline.start) != data.end.as_ref() { + // This should only happen when a developer adds a wrong timeline + // batch to the `StateChanges` or the server returns a wrong response + // to our request. + warn!("Drop unexpected timeline batch for {}", room); + return Ok(()); + } + + // Check if the event already exists in the store + let mut delete_timeline = false; + for event in &timeline.events { + if let Some(event_id) = event.event_id() { + if data.event_id_to_position.contains_key(&event_id) { + delete_timeline = true; + break; + } + } + } + + if delete_timeline { + info!("Delete stored timeline for {} because of duplicated events", room); + self.room_timeline.remove(room); + None + } else if timeline.sync { + data.start = timeline.start.clone(); + Some(data) + } else { + data.end = timeline.end.clone(); + Some(data) + } + } else { + None + }; + + let mut data = &mut *if let Some(data) = data { + data + } else { + let data = TimelineData { + start: timeline.start.clone(), + end: timeline.end.clone(), + ..Default::default() + }; + self.room_timeline.insert(room.to_owned(), data); + self.room_timeline.get_mut(room).unwrap() + }; + + // Create a copy of the events if the stream created via `room_timeline()` isn't + // fully consumed + let data_events = Arc::make_mut(&mut data.events); + + if timeline.sync { + let mut room_version = None; + for event in timeline.events.iter().rev() { + // Redact events already in store only on sync response + if let Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomRedaction( + redaction, + ))) = event.event.deserialize() + { + if let Some(position) = data.event_id_to_position.get(&redaction.redacts) { + if let Some(mut full_event) = data_events.get_mut(position) { + let inner_event = full_event.event.deserialize()?; + if room_version.is_none() { + room_version = Some(self.room_info + .get(room) + .and_then(|info| { + info.base_info + .create + .as_ref() + .map(|event| event.room_version.clone()) + }).unwrap_or_else(|| { + warn!("Unable to find the room version for {}, assume version 9", room); + RoomVersionId::V9 + })); + } + + full_event.event = Raw::new(&AnySyncRoomEvent::from( + inner_event.redact(redaction, room_version.as_ref().unwrap()), + ))?; + } + } + } + + data.start_position -= 1; + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + data.event_id_to_position.insert(event_id, data.start_position); + } + data_events.insert(data.start_position, event.to_owned()); + } + } else { + for event in timeline.events.iter() { + data.end_position += 1; + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + data.event_id_to_position.insert(event_id, data.end_position); + } + data_events.insert(data.end_position, event.to_owned()); + } + } + } + info!("Saved changes in {:?}", now.elapsed()); Ok(()) @@ -455,9 +571,32 @@ impl MemoryStore { self.stripped_members.remove(room_id); self.room_user_receipts.remove(room_id); self.room_event_receipts.remove(room_id); + self.room_timeline.remove(room_id); Ok(()) } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + if let Some(data) = self.room_timeline.get(room_id) { + let events = data.events.clone(); + let stream = stream! { + for item in events.values() { + yield Ok(item.to_owned()); + } + }; + info!( + "Found previously stored timeline for {}, with end token {:?}", + room_id, data.end + ); + Ok(Some((Box::pin(stream), data.end.to_owned()))) + } else { + info!("No timeline for {} was previously stored", room_id); + Ok(None) + } + } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -608,6 +747,23 @@ impl StateStore for MemoryStore { async fn remove_room(&self, room_id: &RoomId) -> Result<()> { self.remove_room(room_id).await } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + self.room_timeline(room_id).await + } +} + +#[derive(Debug, Default)] +struct TimelineData { + pub start: String, + pub start_position: isize, + pub end: Option, + pub end_position: isize, + pub events: Arc>, + pub event_id_to_position: HashMap, isize>, } #[cfg(test)] diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 499cca028d5..9db6739d63f 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -17,6 +17,7 @@ use std::path::Path; use std::{ collections::{BTreeMap, BTreeSet}, ops::Deref, + pin::Pin, sync::Arc, }; @@ -40,6 +41,8 @@ use ruma::{ EventId, MxcUri, RoomId, UserId, }; +pub type BoxStream = Pin + Send>>; + #[cfg(any(feature = "sled_state_store", feature = "indexeddb_state_store"))] mod store_key; @@ -50,7 +53,7 @@ use sled::Db; mod indexeddb_store; use crate::{ - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent, TimelineSlice}, media::MediaRequest, rooms::{RoomInfo, RoomType}, Room, Session, @@ -354,6 +357,19 @@ pub trait StateStore: AsyncTraitDeps { /// /// * `room_id` - The `RoomId` of the room to delete. async fn remove_room(&self, room_id: &RoomId) -> Result<()>; + + /// Get a stream of the stored timeline + /// + /// # Arguments + /// + /// * `room_id` - The `RoomId` of the room to delete. + /// + /// Returns a stream of events and a token that can be used to request + /// previous events. + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>>; } /// A state store wrapper for the SDK. @@ -562,6 +578,8 @@ pub struct StateChanges { pub ambiguity_maps: BTreeMap, BTreeMap>>>, /// A map of `RoomId` to a vector of `Notification`s pub notifications: BTreeMap, Vec>, + /// A mapping of `RoomId` to a `TimelineSlice` + pub timeline: BTreeMap, TimelineSlice>, } impl StateChanges { @@ -646,4 +664,10 @@ impl StateChanges { pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) { self.receipts.insert(room_id.to_owned(), event); } + + /// Update the `StateChanges` struct with the given room with a new + /// `TimelineSlice`. + pub fn add_timeline(&mut self, room_id: &RoomId, timeline: TimelineSlice) { + self.timeline.insert(room_id.to_owned(), timeline); + } } diff --git a/crates/matrix-sdk-base/src/store/sled_store.rs b/crates/matrix-sdk-base/src/store/sled_store.rs index 96c570e45f5..5ab94d18c8e 100644 --- a/crates/matrix-sdk-base/src/store/sled_store.rs +++ b/crates/matrix-sdk-base/src/store/sled_store.rs @@ -28,11 +28,12 @@ use ruma::{ presence::PresenceEvent, receipt::Receipt, room::member::{MembershipState, RoomMemberEventContent}, - AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType, + AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncMessageEvent, AnySyncRoomEvent, + AnySyncStateEvent, EventType, Redact, }, receipt::ReceiptType, serde::Raw, - EventId, MxcUri, RoomId, UserId, + EventId, MxcUri, RoomId, RoomVersionId, UserId, }; use serde::{Deserialize, Serialize}; use sled::{ @@ -40,12 +41,12 @@ use sled::{ Config, Db, Transactional, Tree, }; use tokio::task::spawn_blocking; -use tracing::info; +use tracing::{info, warn}; use self::store_key::{EncryptedEvent, StoreKey}; -use super::{store_key, Result, RoomInfo, StateChanges, StateStore, StoreError}; +use super::{store_key, BoxStream, Result, RoomInfo, StateChanges, StateStore, StoreError}; use crate::{ - deserialized_responses::MemberEvent, + deserialized_responses::{MemberEvent, SyncRoomEvent}, media::{MediaRequest, UniqueKey}, }; @@ -169,6 +170,19 @@ impl EncodeKey for EventType { } } +impl EncodeKey for EventId { + fn encode(&self) -> Vec { + self.as_str().encode() + } +} + +impl EncodeKey for (&RoomId, usize) { + fn encode(&self) -> Vec { + [self.0.as_bytes(), &[ENCODE_SEPARATOR], self.1.to_be_bytes().as_ref(), &[ENCODE_SEPARATOR]] + .concat() + } +} + /// Get the value at `position` in encoded `key`. /// /// The key must have been encoded with the `EncodeKey` trait. `position` @@ -205,6 +219,9 @@ pub struct SledStore { room_event_receipts: Tree, media: Tree, custom: Tree, + room_timeline: Tree, + room_timeline_metadata: Tree, + room_event_id_to_position: Tree, } impl std::fmt::Debug for SledStore { @@ -244,6 +261,10 @@ impl SledStore { let custom = db.open_tree("custom")?; + let room_timeline = db.open_tree("room_timeline")?; + let room_timeline_metadata = db.open_tree("room_timeline_metadata")?; + let room_event_id_to_position = db.open_tree("room_event_id_to_position")?; + Ok(Self { path, inner: db, @@ -266,6 +287,9 @@ impl SledStore { room_event_receipts, media, custom, + room_timeline, + room_timeline_metadata, + room_event_id_to_position, }) } @@ -575,6 +599,8 @@ impl SledStore { ret?; + self.save_room_timeline(changes).await?; + self.inner.flush_async().await?; info!("Saved changes in {:?}", now.elapsed()); @@ -973,10 +999,232 @@ impl SledStore { ret?; + self.remove_room_timeline(room_id).await?; + self.inner.flush_async().await?; Ok(()) } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + let db = self.clone(); + let key = room_id.encode(); + let metadata: Option = db + .room_timeline_metadata + .get(key.as_slice())? + .map(|v| serde_json::from_slice(&v).map_err(StoreError::Json)) + .transpose()?; + if metadata.is_none() { + info!("No timeline for {} was previously stored", room_id); + return Ok(None); + } + let end_token = metadata.and_then(|m| m.end); + let stream = Box::pin(stream::iter( + db.room_timeline + .scan_prefix(key) + .map(move |v| db.deserialize_event(&v?.1).map_err(|e| e.into())), + )); + + info!("Found previously stored timeline for {}, with end token {:?}", room_id, end_token); + + Ok(Some((stream, end_token))) + } + + async fn remove_room_timeline(&self, room_id: &RoomId) -> Result<()> { + let room_key = room_id.encode(); + info!("Remove stored timeline for {}", room_id); + + let mut timeline_batch = sled::Batch::default(); + for key in self.room_timeline.scan_prefix(room_key.as_slice()).keys() { + timeline_batch.remove(key?) + } + + let mut event_id_to_position_batch = sled::Batch::default(); + for key in self.room_event_id_to_position.scan_prefix(room_key.as_slice()).keys() { + event_id_to_position_batch.remove(key?) + } + + let ret: Result<(), TransactionError> = + (&self.room_timeline, &self.room_timeline_metadata, &self.room_event_id_to_position) + .transaction( + |(room_timeline, room_timeline_metadata, room_event_id_to_position)| { + room_timeline_metadata.remove(room_key.as_slice())?; + + room_timeline.apply_batch(&timeline_batch)?; + room_event_id_to_position.apply_batch(&event_id_to_position_batch)?; + + Ok(()) + }, + ); + + ret?; + + Ok(()) + } + + async fn save_room_timeline(&self, changes: &StateChanges) -> Result<()> { + let mut timeline_batch = sled::Batch::default(); + let mut event_id_to_position_batch = sled::Batch::default(); + let mut timeline_metadata_batch = sled::Batch::default(); + + for (room_id, timeline) in &changes.timeline { + if timeline.sync { + info!("Save new timeline batch from sync response for {}", room_id); + } else { + info!("Save new timeline batch from messages response for {}", room_id); + } + let room_key = room_id.encode(); + + let metadata: Option = if timeline.limited { + info!( + "Delete stored timeline for {} because the sync response was limited", + room_id + ); + self.remove_room_timeline(room_id).await?; + None + } else { + let metadata: Option = self + .room_timeline_metadata + .get(room_key.as_slice())? + .map(|v| serde_json::from_slice(&v).map_err(StoreError::Json)) + .transpose()?; + if let Some(mut metadata) = metadata { + if !timeline.sync && Some(&timeline.start) != metadata.end.as_ref() { + // This should only happen when a developer adds a wrong timeline + // batch to the `StateChanges` or the server returns a wrong response + // to our request. + warn!("Drop unexpected timeline batch for {}", room_id); + return Ok(()); + } + + // Check if the event already exists in the store + let mut delete_timeline = false; + for event in &timeline.events { + if let Some(event_id) = event.event_id() { + let event_key = (room_id.as_ref(), event_id.as_ref()).encode(); + if self.room_event_id_to_position.contains_key(event_key)? { + delete_timeline = true; + break; + } + } + } + + if delete_timeline { + info!( + "Delete stored timeline for {} because of duplicated events", + room_id + ); + self.remove_room_timeline(room_id).await?; + None + } else if timeline.sync { + metadata.start = timeline.start.clone(); + Some(metadata) + } else { + metadata.end = timeline.end.clone(); + Some(metadata) + } + } else { + None + } + }; + + let mut metadata = if let Some(metadata) = metadata { + metadata + } else { + TimelineMetadata { + start: timeline.start.clone(), + end: timeline.end.clone(), + start_position: usize::MAX / 2, + end_position: usize::MAX / 2, + } + }; + let room_version = self + .room_info + .get(room_id.encode())? + .map(|r| self.deserialize_event::(&r)) + .transpose()? + .and_then(|info| info.base_info.create.map(|event| event.room_version)) + .unwrap_or_else(|| { + warn!("Unable to find the room version for {}, assume version 9", room_id); + RoomVersionId::V9 + }); + + if timeline.sync { + for event in timeline.events.iter().rev() { + // Redact events already in store only on sync response + if let Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomRedaction( + redaction, + ))) = event.event.deserialize() + { + let redacts_key = (room_id.as_ref(), redaction.redacts.as_ref()).encode(); + if let Some(position_key) = + self.room_event_id_to_position.get(redacts_key)? + { + if let Some(mut full_event) = self + .room_timeline + .get(position_key.as_ref())? + .map(|e| { + self.deserialize_event::(&e) + .map_err(StoreError::from) + }) + .transpose()? + { + let inner_event = full_event.event.deserialize()?; + + full_event.event = Raw::new(&AnySyncRoomEvent::from( + inner_event.redact(redaction, &room_version), + ))?; + timeline_batch + .insert(position_key, self.serialize_event(&full_event)?); + } + } + } + + metadata.start_position -= 1; + let key = (room_id.as_ref(), metadata.start_position).encode(); + timeline_batch.insert(key.as_slice(), self.serialize_event(&event)?); + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + let event_key = (room_id.as_ref(), event_id.as_ref()).encode(); + event_id_to_position_batch.insert(event_key.as_slice(), key.as_slice()); + } + } + } else { + for event in timeline.events.iter() { + metadata.end_position += 1; + let key = (room_id.as_ref(), metadata.end_position).encode(); + timeline_batch.insert(key.as_slice(), self.serialize_event(&event)?); + // Only add event with id to the position map + if let Some(event_id) = event.event_id() { + let event_key = (room_id.as_ref(), event_id.as_ref()).encode(); + event_id_to_position_batch.insert(event_key.as_slice(), key.as_slice()); + } + } + } + + timeline_metadata_batch.insert(room_key, serde_json::to_vec(&metadata)?); + } + + let ret: Result<(), TransactionError> = + (&self.room_timeline, &self.room_timeline_metadata, &self.room_event_id_to_position) + .transaction( + |(room_timeline, room_timeline_metadata, room_event_id_to_position)| { + room_timeline_metadata.apply_batch(&timeline_metadata_batch)?; + + room_timeline.apply_batch(&timeline_batch)?; + room_event_id_to_position.apply_batch(&event_id_to_position_batch)?; + + Ok(()) + }, + ); + + ret?; + + Ok(()) + } } #[async_trait] @@ -1122,11 +1370,25 @@ impl StateStore for SledStore { async fn remove_room(&self, room_id: &RoomId) -> Result<()> { self.remove_room(room_id).await } + + async fn room_timeline( + &self, + room_id: &RoomId, + ) -> Result>, Option)>> { + self.room_timeline(room_id).await + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct TimelineMetadata { + pub start: String, + pub start_position: usize, + pub end: Option, + pub end_position: usize, } #[cfg(test)] mod test { - use super::{Result, SledStore}; async fn get_store() -> Result { diff --git a/crates/matrix-sdk-base/src/timeline_stream.rs b/crates/matrix-sdk-base/src/timeline_stream.rs new file mode 100644 index 00000000000..ed458e8e4f4 --- /dev/null +++ b/crates/matrix-sdk-base/src/timeline_stream.rs @@ -0,0 +1,248 @@ +use std::{fmt, pin::Pin, sync::Arc}; + +use dashmap::DashSet; +use futures_channel::mpsc; +use futures_core::{ + stream::{BoxStream, Stream}, + task::{Context, Poll}, +}; +use ruma::EventId; +use thiserror::Error; +use tracing::trace; + +use crate::{ + deserialized_responses::{SyncRoomEvent, TimelineSlice}, + store::Result, +}; + +const CHANNEL_LIMIT: usize = 10; + +/// Errors in a timeline stream +#[derive(Error, Debug)] +pub enum TimelineStreamError { + /// The end of the stored timeline was reached. + #[error("the end of the stored timeline was reached")] + EndCache { + /// The given token should be used to request more events from the + /// server. + fetch_more_token: String, + }, + /// The event in the store produced an error + #[error("the event in the store produced an error")] + Store(crate::StoreError), +} + +/// A `Stream` of timeline of a room +pub struct TimelineStreamBackward<'a> { + receiver: mpsc::Receiver, + stored_events: Option>>, + pending: Vec, + event_ids: Arc>>, + token: Option, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for TimelineStreamBackward<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TimelineStream") + .field("event_ids", &self.event_ids) + .field("token", &self.token) + .field("pending", &self.pending) + .finish() + } +} + +impl<'a> TimelineStreamBackward<'a> { + /// Creates a tuple containing `Self` and a `Sender` to send new events to + /// this stream + /// + /// The stream is terminated when the entire timeline was returned, + /// otherwise an error will be returned till the issue is resolved, aka + /// new messages are loaded. + /// + /// # Arguments + /// + /// * `event_ids` - A set to store `EventId`s already seen by the stream. + /// This should be shared + /// between the `TimelineStreamBackward` and `TimelineStreamForward` stream. + /// + /// * `token` - The current sync token or the token that identifies the end + /// of the `stored_events`. + /// + /// * `stored_events` - A stream of events that are currently stored + /// locally. + pub(crate) fn new( + event_ids: Arc>>, + token: Option, + stored_events: Option>>, + ) -> (Self, mpsc::Sender) { + let (sender, receiver) = mpsc::channel(CHANNEL_LIMIT); + let self_ = Self { event_ids, pending: Vec::new(), stored_events, token, receiver }; + + (self_, sender) + } + + fn handle_new_slice( + &mut self, + slice: TimelineSlice, + ) -> Poll>> { + // Check if this is the batch we are expecting + if self.token.is_some() && self.token != Some(slice.start) { + trace!("Store received a timeline batch that wasn't expected"); + return Poll::Pending; + } + + // There is a gap in the timeline. Therefore, terminate the stream. + if slice.limited { + return Poll::Ready(None); + } + + // The end of the timeline was reached + if slice.events.is_empty() { + return Poll::Ready(None); + } + + for event in slice.events.into_iter().rev().filter(|event| { + self.event_ids + .insert(event.event_id().expect("Timeline events always have an event id.")) + }) { + self.pending.push(event); + } + self.token = slice.end; + + if let Some(event) = self.pending.pop() { + Poll::Ready(Some(Ok(event))) + } else if let Some(token) = &self.token { + Poll::Ready(Some(Err(TimelineStreamError::EndCache { + fetch_more_token: token.to_string(), + }))) + } else { + Poll::Ready(None) + } + } +} + +impl<'a> Stream for TimelineStreamBackward<'a> { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if let Some(stored_events) = &mut this.stored_events { + match Pin::new(stored_events).poll_next(cx) { + Poll::Ready(None) => {} + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(event)) => { + return Poll::Ready(Some(event.map_err(TimelineStreamError::Store))) + } + } + + // We returned all events the store gave to us + this.stored_events = None; + } + + if let Some(event) = this.pending.pop() { + Poll::Ready(Some(Ok(event))) + } else { + loop { + match Pin::new(&mut this.receiver).poll_next(cx) { + Poll::Ready(Some(slice)) => match this.handle_new_slice(slice) { + Poll::Pending => continue, + other => break other, + }, + Poll::Ready(None) => break Poll::Ready(None), + Poll::Pending => { + if let Some(token) = &this.token { + // We tell the consumer that there are no more evens in cache + break Poll::Ready(Some(Err(TimelineStreamError::EndCache { + fetch_more_token: token.to_string(), + }))); + } else { + break Poll::Ready(None); + } + } + }; + } + } + } +} + +/// A `Stream` of timeline of a room +pub struct TimelineStreamForward { + receiver: mpsc::Receiver, + pending: Vec, + event_ids: Arc>>, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for TimelineStreamForward { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TimelineStream") + .field("event_ids", &self.event_ids) + .field("pending", &self.pending) + .finish() + } +} + +impl TimelineStreamForward { + /// Creates a tuple containing `Self` and a `Sender` to send new events to + /// this stream + /// + /// The stream is only terminated when there would be a gap in the timeline. + /// This happens when a `SyncResponse` is obtained with a limited + /// timeline. + /// + /// # Arguments + /// + /// * `event_ids` - A set to store `EventId`s already seen by the stream. + /// This should be shared + /// between the `TimelineStreamBackward` and `TimelineStreamForward` stream. + pub(crate) fn new( + event_ids: Arc>>, + ) -> (Self, mpsc::Sender) { + let (sender, receiver) = mpsc::channel(CHANNEL_LIMIT); + let self_ = Self { event_ids, pending: Vec::new(), receiver }; + + (self_, sender) + } + + fn handle_new_slice(&mut self, slice: TimelineSlice) -> Poll> { + // There is a gap in the timeline. Therefore, terminate the stream. + if slice.limited { + return Poll::Ready(None); + } + + for event in slice.events.into_iter().rev().filter(|event| { + self.event_ids + .insert(event.event_id().expect("Timeline events always have an event id.")) + }) { + self.pending.push(event); + } + + if let Some(event) = self.pending.pop() { + Poll::Ready(Some(event)) + } else { + Poll::Pending + } + } +} + +impl Stream for TimelineStreamForward { + type Item = SyncRoomEvent; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + if let Some(event) = this.pending.pop() { + Poll::Ready(Some(event)) + } else { + loop { + match Pin::new(&mut this.receiver).poll_next(cx) { + Poll::Ready(Some(slice)) => match this.handle_new_slice(slice) { + Poll::Pending => continue, + other => break other, + }, + Poll::Ready(None) => break Poll::Ready(None), + Poll::Pending => break Poll::Pending, + } + } + } + } +} diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index b757e7e8799..158bae0bc12 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -97,6 +97,13 @@ pub struct SyncRoomEvent { pub encryption_info: Option, } +impl SyncRoomEvent { + /// Get the event id of this `SyncRoomEvent` if the event has any valid id. + pub fn event_id(&self) -> Option> { + self.event.get_field::>("event_id").ok().flatten() + } +} + impl From> for SyncRoomEvent { fn from(inner: Raw) -> Self { Self { encryption_info: None, event: inner } @@ -255,6 +262,39 @@ impl Timeline { } } +/// A slice of the timeline in the room. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct TimelineSlice { + /// The `next_batch` or `from` token used to obtain this slice + pub start: String, + + /// The `prev_batch` or `to` token used to obtain this slice + /// If `None` this `TimelineSlice` is the beginning of the room + pub end: Option, + + /// Whether the number of events returned for this slice was limited + /// by a `limit`-filter when requesting + pub limited: bool, + + /// A list of events. + pub events: Vec, + + /// Whether this is a timeline slice obtained from a `SyncResponse` + pub sync: bool, +} + +impl TimelineSlice { + pub fn new( + events: Vec, + start: String, + end: Option, + limited: bool, + sync: bool, + ) -> Self { + Self { start, end, events, limited, sync } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(try_from = "SyncRoomMemberEvent", into = "SyncRoomMemberEvent")] pub struct MemberEvent { diff --git a/crates/matrix-sdk-common/src/wasm_helpers.rs b/crates/matrix-sdk-common/src/wasm_helpers.rs index 49ed23b2d17..d59cd5ffc05 100644 --- a/crates/matrix-sdk-common/src/wasm_helpers.rs +++ b/crates/matrix-sdk-common/src/wasm_helpers.rs @@ -8,13 +8,13 @@ use web_sys::IdbKeyRange; /// Helpers for wasm32/browser environments /// ASCII Group Separator, for elements in the keys -pub const KEY_SEPARATOR: &'static str = "\u{001D}"; +pub const KEY_SEPARATOR: &str = "\u{001D}"; /// ASCII Record Separator is sure smaller than the Key Separator but smaller /// than regular characters -pub const RANGE_END: &'static str = "\u{001E}"; +pub const RANGE_END: &str = "\u{001E}"; /// Using the literal escape character to escape KEY_SEPARATOR in regular keys /// (though super unlikely) -pub const ESCAPED: &'static str = "\u{001E}\u{001D}"; +pub const ESCAPED: &str = "\u{001E}\u{001D}"; /// Encode value as String/JsValue/IdbKeyRange for the JS APIs in a /// safe, escaped manner. @@ -45,7 +45,7 @@ pub trait SafeEncode { &JsValue::from([&key, KEY_SEPARATOR].concat()), &JsValue::from([&key, RANGE_END].concat()), ) - .map_err(|e| e.as_string().unwrap_or(format!("Creating key range failed"))) + .map_err(|e| e.as_string().unwrap_or_else(|| "Creating key range failed".to_string())) } } @@ -175,3 +175,9 @@ impl SafeEncode for MxcUri { self.as_str().as_encoded_string() } } + +impl SafeEncode for usize { + fn as_encoded_string(&self) -> String { + self.to_string() + } +} diff --git a/crates/matrix-sdk-test/src/test_json/events.rs b/crates/matrix-sdk-test/src/test_json/events.rs index cff68b3e2f6..f6feb37ed7d 100644 --- a/crates/matrix-sdk-test/src/test_json/events.rs +++ b/crates/matrix-sdk-test/src/test_json/events.rs @@ -143,6 +143,102 @@ lazy_static! { }); } +lazy_static! { + pub static ref SYNC_ROOM_MESSAGES_BATCH_1: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaaf:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbf:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Ccccf:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_2269", + "start": "t392-516_47314_0_7_1_1_1_11444_1" + }); +} + +lazy_static! { + pub static ref SYNC_ROOM_MESSAGES_BATCH_2: JsonValue = json!({ + "chunk": [ + { + "age": 1042, + "content": { + "body": "hello world", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Caaak:example.com", + "origin_server_ts": 1444812213737i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@alice:example.com", + "type": "m.room.message" + }, + { + "age": 20123, + "content": { + "body": "the world is big", + "msgtype": "m.text" + }, + "event_id": "$1444812213350496Cbbbk:example.com", + "origin_server_ts": 1444812194656i64, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "type": "m.room.message" + }, + { + "age": 50789, + "content": { + "name": "New room name" + }, + "event_id": "$1444812213350496Cccck:example.com", + "origin_server_ts": 1444812163990i64, + "prev_content": { + "name": "Old room name" + }, + "room_id": "!Xq3620DUiqCaoxq:example.com", + "sender": "@bob:example.com", + "state_key": "", + "type": "m.room.name" + } + ], + "end": "t47409-4357353_219380_26003_2270", + "start": "t47409-4357353_219380_26003_2269" + }); +} + lazy_static! { pub static ref KEYS_QUERY: JsonValue = json!({ "device_keys": { diff --git a/crates/matrix-sdk-test/src/test_json/mod.rs b/crates/matrix-sdk-test/src/test_json/mod.rs index 536b93295ef..9ccd20cdc48 100644 --- a/crates/matrix-sdk-test/src/test_json/mod.rs +++ b/crates/matrix-sdk-test/src/test_json/mod.rs @@ -16,12 +16,13 @@ pub use events::{ LOGIN_WITH_DISCOVERY, LOGOUT, MEMBER, MEMBER_INVITE, MEMBER_NAME_CHANGE, MEMBER_STRIPPED, MESSAGE_EDIT, MESSAGE_TEXT, NAME, NAME_STRIPPED, POWER_LEVELS, PRESENCE, PUBLIC_ROOMS, PUSH_RULES, REACTION, READ_RECEIPT, READ_RECEIPT_OTHER, REDACTED, REDACTED_INVALID, - REDACTED_STATE, REDACTION, REGISTRATION_RESPONSE_ERR, ROOM_ID, ROOM_MESSAGES, TAG, TOPIC, - TYPING, + REDACTED_STATE, REDACTION, REGISTRATION_RESPONSE_ERR, ROOM_ID, ROOM_MESSAGES, + SYNC_ROOM_MESSAGES_BATCH_1, SYNC_ROOM_MESSAGES_BATCH_2, TAG, TOPIC, TYPING, }; pub use members::MEMBERS; pub use sync::{ - DEFAULT_SYNC_SUMMARY, INVITE_SYNC, LEAVE_SYNC, LEAVE_SYNC_EVENT, MORE_SYNC, SYNC, VOIP_SYNC, + DEFAULT_SYNC_SUMMARY, INVITE_SYNC, LEAVE_SYNC, LEAVE_SYNC_EVENT, MORE_SYNC, MORE_SYNC_2, SYNC, + VOIP_SYNC, }; lazy_static! { diff --git a/crates/matrix-sdk-test/src/test_json/sync.rs b/crates/matrix-sdk-test/src/test_json/sync.rs index cb1dc57c81b..6434da6235a 100644 --- a/crates/matrix-sdk-test/src/test_json/sync.rs +++ b/crates/matrix-sdk-test/src/test_json/sync.rs @@ -697,7 +697,7 @@ lazy_static! { } }, ], - "limited": true, + "limited": false, "prev_batch": "t392-516_47314_0_7_1_1_1_11444_1" }, "unread_notifications": { @@ -717,6 +717,130 @@ lazy_static! { }); } +lazy_static! { + pub static ref MORE_SYNC_2: JsonValue = json!({ + "next_batch": "s526_47314_0_7_1_1_1_11444_3", + "rooms": { + "join": { + "!SVkFJHzfwvuaIEawgC:localhost": { + "timeline": { + "events": [ + { + "content": { + "body": "baba", + "format": "org.matrix.custom.html", + "formatted_body": "baba", + "msgtype": "m.text" + }, + "event_id": "$152037280074GZeOm2:localhost", + "origin_server_ts": 152037280, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { + "age": 598971425 + } + }, + { + "content": { + "body": " * edited message", + "m.new_content": { + "body": "edited message", + "msgtype": "m.text" + }, + "m.relates_to": { + "event_id": "$someeventid:localhost", + "rel_type": "m.replace" + }, + "msgtype": "m.text" + }, + "event_id": "$editevid2:localhost", + "origin_server_ts": 159026265, + "sender": "@alice:matrix.org", + "type": "m.room.message", + "unsigned": { + "age": 85 + } + }, + { + "content": { + "reason": "😀" + }, + "event_id": "$151957878228ssqrJ2:localhost", + "origin_server_ts": 151957878, + "sender": "@example:localhost", + "type": "m.room.redaction", + "redacts": "$151957878228ssqrj:localhost", + "unsigned": { + "age": 85 + } + }, + { + "content": {}, + "event_id": "$15275046980maRLj2:localhost", + "origin_server_ts": 152750469, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { + "age": 19334, + "redacted_because": { + "content": {}, + "event_id": "$15275047031IXQRi:localhost", + "origin_server_ts": 152750470, + "redacts": "$15275046980maRLj:localhost", + "sender": "@example:localhost", + "type": "m.room.redaction", + "unsigned": { + "age": 14523 + } + }, + "redacted_by": "$15275047031IXQRi:localhost" + } + }, + { + "content": { + "m.relates_to": { + "event_id": "$15275047031IXQRi:localhost", + "key": "👍", + "rel_type": "m.annotation" + } + }, + "event_id": "$15275047031IXQRi2:localhost", + "origin_server_ts": 159027581, + "sender": "@alice:matrix.org", + "type": "m.reaction", + "unsigned": { + "age": 85 + } + }, + { + "content": { + "body": "This is a notice", + "format": "org.matrix.custom.html", + "formatted_body": "This is a notice", + "msgtype": "m.notice" + }, + "event_id": "$098237280074GZeOm2:localhost", + "origin_server_ts": 162037280, + "sender": "@bot:localhost", + "type": "m.room.message", + "unsigned": { + "age": 25 + } + }, + ], + "limited": false, + "prev_batch": "s526_47314_0_7_1_1_1_11444_2" + }, + "unread_notifications": { + "highlight_count": 0, + "notification_count": 11 + } + } + }, + }, + }); +} + lazy_static! { pub static ref INVITE_SYNC: JsonValue = json!({ "device_one_time_keys_count": {}, diff --git a/crates/matrix-sdk/src/client.rs b/crates/matrix-sdk/src/client.rs index 8d1e964eb02..271f5b8e6a5 100644 --- a/crates/matrix-sdk/src/client.rs +++ b/crates/matrix-sdk/src/client.rs @@ -2303,6 +2303,7 @@ pub(crate) mod test { use std::{collections::BTreeMap, convert::TryInto, io::Cursor, str::FromStr, time::Duration}; use matrix_sdk_base::media::{MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}; + use matrix_sdk_common::deserialized_responses::SyncRoomEvent; use matrix_sdk_test::{test_json, EventBuilder, EventsJson}; use mockito::{mock, Matcher}; use ruma::{ @@ -3808,4 +3809,145 @@ pub(crate) mod test { matches::assert_matches!(encryption_event, AnySyncStateEvent::RoomEncryption(_)); } + + #[async_test] + async fn room_timeline() { + let client = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let sync = mock("GET", Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string())) + .with_status(200) + .with_body(test_json::SYNC.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let _ = client.sync_once(sync_settings).await.unwrap(); + sync.assert(); + drop(sync); + let room = client.get_joined_room(room_id!("!SVkFJHzfwvuaIEawgC:localhost")).unwrap(); + let (forward_stream, backward_stream) = room.timeline().await.unwrap(); + + let sync_2 = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/sync\?.*since=s526_47314_0_7_1_1_1_11444_1.*".to_string(), + ), + ) + .with_status(200) + .with_body(test_json::MORE_SYNC.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let sync_3 = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/sync\?.*since=s526_47314_0_7_1_1_1_11444_2.*".to_string(), + ), + ) + .with_status(200) + .with_body(test_json::MORE_SYNC_2.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let mocked_messages = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/rooms/.*/messages.*from=t392-516_47314_0_7_1_1_1_11444_1.*" + .to_string(), + ), + ) + .with_status(200) + .with_body(test_json::SYNC_ROOM_MESSAGES_BATCH_1.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + let mocked_messages_2 = mock( + "GET", + Matcher::Regex( + r"^/_matrix/client/r0/rooms/.*/messages.*from=t47409-4357353_219380_26003_2269.*" + .to_string(), + ), + ) + .with_status(200) + .with_body(test_json::SYNC_ROOM_MESSAGES_BATCH_2.to_string()) + .match_header("authorization", "Bearer 1234") + .create(); + + assert_eq!(client.sync_token().await, Some("s526_47314_0_7_1_1_1_11444_1".to_string())); + let sync_settings = SyncSettings::new() + .timeout(Duration::from_millis(3000)) + .token("s526_47314_0_7_1_1_1_11444_1"); + let _ = client.sync_once(sync_settings).await.unwrap(); + sync_2.assert(); + let sync_settings = SyncSettings::new() + .timeout(Duration::from_millis(3000)) + .token("s526_47314_0_7_1_1_1_11444_2"); + let _ = client.sync_once(sync_settings).await.unwrap(); + sync_3.assert(); + + let expected_events = vec![ + "$152037280074GZeOm:localhost", + "$editevid:localhost", + "$151957878228ssqrJ:localhost", + "$15275046980maRLj:localhost", + "$15275047031IXQRi:localhost", + "$098237280074GZeOm:localhost", + "$152037280074GZeOm2:localhost", + "$editevid2:localhost", + "$151957878228ssqrJ2:localhost", + "$15275046980maRLj2:localhost", + "$15275047031IXQRi2:localhost", + "$098237280074GZeOm2:localhost", + ]; + + use futures_util::StreamExt; + let forward_events = + forward_stream.take(expected_events.len()).collect::>().await; + + assert!(forward_events.into_iter().zip(expected_events.iter()).all(|(a, b)| &a + .event_id() + .unwrap() + .as_str() + == b)); + + let expected_events = vec![ + "$152037280074GZeOm2:localhost", + "$editevid2:localhost", + "$151957878228ssqrJ2:localhost", + "$15275046980maRLj2:localhost", + "$15275047031IXQRi2:localhost", + "$098237280074GZeOm2:localhost", + "$152037280074GZeOm:localhost", + "$editevid:localhost", + "$151957878228ssqrJ:localhost", + "$15275046980maRLj:localhost", + "$15275047031IXQRi:localhost", + "$098237280074GZeOm:localhost", + "$1444812213350496Caaaf:example.com", + "$1444812213350496Cbbbf:example.com", + "$1444812213350496Ccccf:example.com", + "$1444812213350496Caaak:example.com", + "$1444812213350496Cbbbk:example.com", + "$1444812213350496Cccck:example.com", + ]; + + let join_handle = tokio::spawn(async move { + let backward_events = backward_stream + .take(expected_events.len()) + .collect::>>() + .await; + + assert!(backward_events.into_iter().zip(expected_events.iter()).all(|(a, b)| &a + .unwrap() + .event_id() + .unwrap() + .as_str() + == b)); + }); + + join_handle.await.unwrap(); + + mocked_messages.assert(); + mocked_messages_2.assert(); + } } diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index f815ac1834f..5c4a7f8269e 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -1,10 +1,14 @@ use std::{ops::Deref, sync::Arc}; -use matrix_sdk_base::deserialized_responses::{MembersResponse, RoomEvent}; +use futures_core::stream::Stream; +use matrix_sdk_base::{ + deserialized_responses::{MembersResponse, RoomEvent, SyncRoomEvent, TimelineSlice}, + TimelineStreamError, +}; use matrix_sdk_common::locks::Mutex; use ruma::{ api::client::r0::{ - filter::RoomEventFilter, + filter::{LazyLoadOptions, RoomEventFilter}, membership::{get_member_events, join_room_by_id, leave_room}, message::get_message_events::{self, Direction}, room::get_room_event, @@ -194,6 +198,247 @@ impl Common { Ok(response) } + /// Get a stream for the timeline of this `Room` + /// + /// The first stream is forward in time and second stream is backward in + /// time. If the `Store` used implements message caching and the + /// timeline is cached no request to the server is made. + /// + /// The streams make sure that no duplicated events are returned. If the + /// event graph changed on the server existing streams will keep the + /// previous event order. Streams created later will use the new event + /// graph, and therefore will create a new local cache. + /// + /// The forward stream will only return `None` when a gapped sync was + /// performed. In this case also the backward stream should be dropped, + /// to make the best use of the message cache. + /// + /// The backward stream returns `None` once the first event of the room was + /// reached. The backward stream may also return an Error when a request + /// to the server failed. If the error is persistent new streams need to + /// be created. + /// + /// With the encryption feature, messages are decrypted if possible. If + /// decryption fails for an individual message, that message is returned + /// undecrypted. + /// + /// # Examples + /// ```no_run + /// # use std::convert::TryFrom; + /// use matrix_sdk::{room::MessagesOptions, Client}; + /// # use matrix_sdk::ruma::{ + /// # api::client::r0::filter::RoomEventFilter, + /// # room_id, + /// # }; + /// # use url::Url; + /// # use futures::StreamExt; + /// # use futures_util::pin_mut; + /// + /// # use futures::executor::block_on; + /// # block_on(async { + /// # let homeserver = Url::parse("http://example.com")?; + /// + /// let mut client = Client::new(homeserver).await?; + /// + /// if let Some(room) = client.get_joined_room(room_id!("!roomid:example.com")) { + /// let (forward_stream, backward_stream) = room.timeline().await?; + /// + /// tokio::spawn(async move { + /// pin_mut!(backward_stream); + /// + /// while let Some(item) = backward_stream.next().await { + /// match item { + /// Ok(event) => println!("{:?}", event), + /// Err(_) => println!("Some error occurred!"), + /// } + /// } + /// }); + /// + /// pin_mut!(forward_stream); + /// + /// while let Some(event) = forward_stream.next().await { + /// println!("{:?}", event); + /// } + /// } + /// + /// # Result::<_, matrix_sdk::Error>::Ok(()) + /// # }); + /// ``` + pub async fn timeline( + &self, + ) -> Result<(impl Stream, impl Stream>)> + { + let (forward_store, backward_store) = self.inner.timeline().await?; + + let room = self.to_owned(); + let backward = async_stream::stream! { + for await item in backward_store { + match item { + Ok(event) => yield Ok(event), + Err(TimelineStreamError::EndCache { fetch_more_token }) => if let Err(error) = room.request_messages(&fetch_more_token).await { + yield Err(error); + }, + Err(TimelineStreamError::Store(error)) => yield Err(error.into()), + } + } + }; + + Ok((forward_store, backward)) + } + + /// Create a stream that returns all events of the room's timeline forward + /// in time. + /// + /// The stream makes sure that no duplicated events are returned. + /// + /// The stream will only return `None` when a gapped sync was + /// performed. + /// + /// With the encryption feature, messages are decrypted if possible. If + /// decryption fails for an individual message, that message is returned + /// undecrypted. + /// + /// If you need also a backward stream you should use + /// [`timeline`][`crate::room::Common::timeline`] + /// + /// # Examples + /// ```no_run + /// # use std::convert::TryFrom; + /// use matrix_sdk::{room::MessagesOptions, Client}; + /// # use matrix_sdk::ruma::{ + /// # api::client::r0::filter::RoomEventFilter, + /// # room_id, + /// # }; + /// # use url::Url; + /// # use futures::StreamExt; + /// # use futures_util::pin_mut; + /// + /// # use futures::executor::block_on; + /// # block_on(async { + /// # let homeserver = Url::parse("http://example.com")?; + /// + /// let mut client = Client::new(homeserver).await?; + /// + /// if let Some(room) = client.get_joined_room(room_id!("!roomid:example.com")) { + /// let forward_stream = room.timeline_forward().await?; + /// + /// pin_mut!(forward_stream); + /// + /// while let Some(event) = forward_stream.next().await { + /// println!("{:?}", event); + /// } + /// } + /// + /// # Result::<_, matrix_sdk::Error>::Ok(()) + /// # }); + /// ``` + + pub async fn timeline_forward(&self) -> Result> { + Ok(self.inner.timeline_forward().await?) + } + + /// Create a stream that returns all events of the room's timeline backward + /// in time. + /// + /// If the `Store` used implements message caching and the + /// timeline is cached no request to the server is made. + /// + /// The stream makes sure that no duplicated events are returned. If the + /// event graph changed on the server existing streams will keep the + /// previous event order. Streams created later will use the new event + /// graph, and therefore will create a new local cache. + /// + /// The stream returns `None` once the first event of the room was + /// reached. The backward stream may also return an Error when a request + /// to the server failed. If the error is persistent a new stream needs to + /// be created. + /// + /// With the encryption feature, messages are decrypted if possible. If + /// decryption fails for an individual message, that message is returned + /// undecrypted. + /// + /// If you need also a backward stream you should use + /// [`timeline`][`crate::room::Common::timeline`] + /// + /// # Examples + /// ```no_run + /// # use std::convert::TryFrom; + /// use matrix_sdk::{room::MessagesOptions, Client}; + /// # use matrix_sdk::ruma::{ + /// # api::client::r0::filter::RoomEventFilter, + /// # room_id, + /// # }; + /// # use url::Url; + /// # use futures::StreamExt; + /// # use futures_util::pin_mut; + /// + /// # use futures::executor::block_on; + /// # block_on(async { + /// # let homeserver = Url::parse("http://example.com")?; + /// + /// let mut client = Client::new(homeserver).await?; + /// + /// if let Some(room) = client.get_joined_room(room_id!("!roomid:example.com")) { + /// let backward_stream = room.timeline_backward().await?; + /// + /// tokio::spawn(async move { + /// pin_mut!(backward_stream); + /// + /// while let Some(item) = backward_stream.next().await { + /// match item { + /// Ok(event) => println!("{:?}", event), + /// Err(_) => println!("Some error occurred!"), + /// } + /// } + /// }); + /// } + /// + /// # Result::<_, matrix_sdk::Error>::Ok(()) + /// # }); + /// ``` + pub async fn timeline_backward(&self) -> Result>> { + let backward_store = self.inner.timeline_backward().await?; + + let room = self.to_owned(); + let backward = async_stream::stream! { + for await item in backward_store { + match item { + Ok(event) => yield Ok(event), + Err(TimelineStreamError::EndCache { fetch_more_token }) => if let Err(error) = room.request_messages(&fetch_more_token).await { + yield Err(error); + }, + Err(TimelineStreamError::Store(error)) => yield Err(error.into()), + } + } + }; + + Ok(backward) + } + + async fn request_messages(&self, token: &str) -> Result<()> { + let filter = assign!(RoomEventFilter::default(), { + lazy_load_options: LazyLoadOptions::Enabled { include_redundant_members: false }, + }); + let options = assign!(MessagesOptions::backward(token), { + limit: uint!(10), + filter, + }); + let messages = self.messages(options).await?; + + let timeline = TimelineSlice::new( + messages.chunk.into_iter().map(SyncRoomEvent::from).collect(), + messages.start, + messages.end, + false, + false, + ); + + self.inner.add_timeline_slice(&timeline).await; + self.client.base_client().receive_messages(self.room_id(), timeline).await?; + + Ok(()) + } + /// Fetch the event with the given `EventId` in this room. pub async fn event(&self, event_id: &EventId) -> Result { let request = get_room_event::Request::new(self.room_id(), event_id);