diff --git a/benchmarks/benches/room_bench.rs b/benchmarks/benches/room_bench.rs index 3ee804b7085..7c163552f02 100644 --- a/benchmarks/benches/room_bench.rs +++ b/benchmarks/benches/room_bench.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use matrix_sdk::{ @@ -157,7 +157,6 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { .respond_with(move |r: &Request| { let segments: Vec<&str> = r.url.path_segments().expect("Invalid path").collect(); let event_id_str = segments[6]; - // let f = EventFactory::new().room(&room_id) let event_id = EventId::parse(event_id_str).expect("Invalid event id in response"); let event = f .text_msg(format!("Message {event_id_str}")) @@ -170,9 +169,6 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { }) .mount(&server), ); - // runtime.block_on(server.reset()); - - client.event_cache().subscribe().unwrap(); let room = client.get_room(&room_id).expect("Room not found"); assert!(!room.pinned_event_ids().is_empty()); @@ -184,6 +180,15 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { group.throughput(Throughput::Elements(count as u64)); group.sample_size(10); + let client = Arc::new(client); + + { + let client = client.clone(); + runtime.spawn_blocking(move || { + client.event_cache().subscribe().unwrap(); + }); + } + group.bench_function(BenchmarkId::new("load_pinned_events", name), |b| { b.to_async(&runtime).iter(|| async { assert!(!room.pinned_event_ids().is_empty()); @@ -207,7 +212,6 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { { let _guard = runtime.enter(); runtime.block_on(server.reset()); - drop(client); drop(server); } diff --git a/crates/matrix-sdk-ui/src/notification_client.rs b/crates/matrix-sdk-ui/src/notification_client.rs index 2760d235415..f82fe12ba1e 100644 --- a/crates/matrix-sdk-ui/src/notification_client.rs +++ b/crates/matrix-sdk-ui/src/notification_client.rs @@ -478,7 +478,7 @@ impl NotificationClient { return Err(Error::UnknownRoom); }; - let response = room.event_with_context(event_id, true, uint!(0)).await?; + let response = room.event_with_context(event_id, true, uint!(0), None).await?; let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?; let state_events = response.state; diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 69878368e60..21d0102c252 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -261,17 +261,13 @@ impl TimelineBuilder { RoomEventCacheUpdate::AddTimelineEvents { events, origin } => { trace!("Received new timeline events."); - // Note: we deliberately choose to not handle - // updates/reactions/redactions for pinned events. - if !is_pinned_events { - inner.add_events_at( - events, - TimelineEnd::Back, - match origin { - EventsOrigin::Sync => RemoteEventOrigin::Sync, - } - ).await; - } + inner.add_events_at( + events, + TimelineEnd::Back, + match origin { + EventsOrigin::Sync => RemoteEventOrigin::Sync, + } + ).await; } RoomEventCacheUpdate::AddEphemeralEvents { events } => { diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 0528854b30b..3f0d2bde840 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -346,6 +346,8 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let can_add_to_live = match self.live_timeline_updates_type { LiveTimelineUpdatesAllowed::PinnedEvents => { room_data_provider.is_pinned_event(event_id) + || room_data_provider + .is_replacement_for_pinned_event(&event_kind) } LiveTimelineUpdatesAllowed::All => true, LiveTimelineUpdatesAllowed::None => false, diff --git a/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs b/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs index e20b7f6d0e0..8d140202013 100644 --- a/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs +++ b/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs @@ -24,6 +24,8 @@ use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId}; use thiserror::Error; use tracing::{debug, warn}; +use crate::timeline::event_handler::TimelineEventKind; + const MAX_CONCURRENT_REQUESTS: usize = 10; /// Utility to load the pinned events in a room. @@ -73,8 +75,12 @@ impl PinnedEventsLoader { let new_events = join_all(pinned_event_ids.into_iter().map(|event_id| { let provider = self.room.clone(); async move { - match provider.load_event(&event_id, request_config).await { - Ok(event) => Some(event), + match provider.load_event_with_relations(&event_id, request_config).await { + Ok((event, related_events)) => { + let mut events = vec![event]; + events.extend(related_events); + Some(events) + } Err(err) => { warn!("error when loading pinned event: {err}"); None @@ -84,7 +90,13 @@ impl PinnedEventsLoader { })) .await; - let mut loaded_events = new_events.into_iter().flatten().collect::>(); + let mut loaded_events = new_events + .into_iter() + // Get only the `Some>` results + .flatten() + // Flatten the `Vec`s into a single one containing all their items + .flatten() + .collect::>(); if loaded_events.is_empty() { return Err(PinnedEventsLoaderError::TimelineReloadFailed); } @@ -102,12 +114,13 @@ impl PinnedEventsLoader { } pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm { - /// Load a single room event using the cache or network. - fn load_event<'a>( + /// Load a single room event using the cache or network and any events + /// related to it, if they are cached. + fn load_event_with_relations<'a>( &'a self, event_id: &'a EventId, request_config: Option, - ) -> BoxFuture<'a, Result>; + ) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec), PaginatorError>>; /// Get the pinned event ids for a room. fn pinned_event_ids(&self) -> Vec; @@ -117,26 +130,42 @@ pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm { /// It avoids having to clone the whole list of event ids to check a single /// value. fn is_pinned_event(&self, event_id: &EventId) -> bool; + + /// Returns whether the passed `event_kind` is either an edit or a redaction + /// of a pinned event. + fn is_replacement_for_pinned_event(&self, event_kind: &TimelineEventKind) -> bool { + match event_kind { + TimelineEventKind::Message { relations, .. } => { + if let Some(orig_ev) = &relations.replace { + self.is_pinned_event(orig_ev.event_id()) + } else { + false + } + } + TimelineEventKind::Redaction { redacts } => self.is_pinned_event(redacts), + _ => false, + } + } } impl PinnedEventsRoom for Room { - fn load_event<'a>( + fn load_event_with_relations<'a>( &'a self, event_id: &'a EventId, request_config: Option, - ) -> BoxFuture<'a, Result> { + ) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec), PaginatorError>> { async move { if let Ok((cache, _handles)) = self.event_cache().await { - if let Some(event) = cache.event(event_id).await { - debug!("Loaded pinned event {event_id} from cache"); - return Ok(event); + if let Some(ret) = cache.event_with_relations(event_id).await { + debug!("Loaded pinned event {event_id} and related events from cache"); + return Ok(ret); } } debug!("Loading pinned event {event_id} from HS"); self.event(event_id, request_config) .await - .map(|e| e.into()) + .map(|e| (e.into(), Vec::new())) .map_err(|err| PaginatorError::SdkError(Box::new(err))) } .boxed() diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index c641fbd618b..d38efaf93c8 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -319,11 +319,11 @@ impl PaginableRoom for TestRoomDataProvider { } impl PinnedEventsRoom for TestRoomDataProvider { - fn load_event<'a>( + fn load_event_with_relations<'a>( &'a self, _event_id: &'a EventId, - _config: Option, - ) -> BoxFuture<'a, Result> { + _request_config: Option, + ) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec), PaginatorError>> { unimplemented!(); } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs index 0ba48fc7453..89a4f91409b 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs @@ -2,16 +2,23 @@ use std::time::Duration; use assert_matches::assert_matches; use eyeball_im::VectorDiff; -use futures_util::StreamExt; use matrix_sdk::{ + assert_next_matches_with_timeout, config::SyncSettings, sync::SyncResponse, test_utils::{events::EventFactory, logged_in_client_with_server}, Client, }; +use matrix_sdk_base::deserialized_responses::TimelineEvent; use matrix_sdk_test::{async_test, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, BOB}; -use matrix_sdk_ui::{timeline::TimelineFocus, Timeline}; -use ruma::{event_id, owned_room_id, OwnedEventId, OwnedRoomId}; +use matrix_sdk_ui::{ + timeline::{TimelineFocus, TimelineItemContent}, + Timeline, +}; +use ruma::{ + event_id, events::room::message::RoomMessageEventContentWithoutRelation, owned_room_id, + MilliSecondsSinceUnixEpoch, OwnedRoomId, +}; use serde_json::json; use stream_assert::assert_pending; use wiremock::MockServer; @@ -27,11 +34,16 @@ async fn test_new_pinned_events_are_added_on_sync() { let _ = test_helper.setup_initial_sync_response().await; test_helper.server.reset().await; + let f = EventFactory::new().room(&room_id).sender(*BOB); + let event_1 = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + // Load initial timeline items: a text message and a `m.room.pinned_events` with // events $1 and $2 pinned - let _ = test_helper - .setup_sync_response(vec![("$1", "in the end", false)], Some(vec!["$1", "$2"])) - .await; + let _ = test_helper.setup_sync_response(vec![(event_1, false)], Some(vec!["$1", "$2"])).await; let room = test_helper.client.get_room(&room_id).unwrap(); let timeline = Timeline::builder(&room) @@ -57,22 +69,32 @@ async fn test_new_pinned_events_are_added_on_sync() { // Load new pinned event contents from sync, $2 was pinned but wasn't available // before - let _ = test_helper - .setup_sync_response( - vec![("$2", "pinned message!", true), ("$3", "normal message", true)], - None, - ) - .await; - + let event_2 = f + .text_msg("pinned message!") + .event_id(event_id!("$2")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + let event_3 = f + .text_msg("normal message") + .event_id(event_id!("$3")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + let _ = test_helper.setup_sync_response(vec![(event_2, true), (event_3, true)], None).await; + + // The item is added automatically + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$2")); + }); // The list is reloaded, so it's reset - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::Clear); - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::PushBack { value } => { + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); + // Then the loaded list items are added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$1")); }); - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::PushBack { value } => { + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$2")); }); - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::PushFront { value } => { + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushFront { value } => { assert!(value.is_day_divider()); }); test_helper.server.reset().await; @@ -87,11 +109,23 @@ async fn test_new_pinned_event_ids_reload_the_timeline() { let _ = test_helper.setup_initial_sync_response().await; test_helper.server.reset().await; + let f = EventFactory::new().room(&room_id).sender(*BOB); + let event_1 = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + let event_2 = f + .text_msg("it doesn't even matter") + .event_id(event_id!("$2")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + // Load initial timeline items: 2 text messages and a `m.room.pinned_events` // with event $1 and $2 pinned let _ = test_helper .setup_sync_response( - vec![("$1", "in the end", false), ("$2", "it doesn't even matter", true)], + vec![(event_1.clone(), false), (event_2.clone(), true)], Some(vec!["$1"]), ) .await; @@ -119,19 +153,19 @@ async fn test_new_pinned_event_ids_reload_the_timeline() { // Reload timeline with new pinned event ids let _ = test_helper .setup_sync_response( - vec![("$1", "in the end", false), ("$2", "it doesn't even matter", false)], + vec![(event_1.clone(), false), (event_2.clone(), false)], Some(vec!["$1", "$2"]), ) .await; - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::Clear); - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::PushBack { value } => { + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$1")); }); - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::PushBack { value } => { + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.as_event().unwrap().event_id().unwrap(), event_id!("$2")); }); - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::PushFront { value } => { + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushFront { value } => { assert!(value.is_day_divider()); }); assert_pending!(timeline_stream); @@ -139,13 +173,10 @@ async fn test_new_pinned_event_ids_reload_the_timeline() { // Reload timeline with no pinned event ids let _ = test_helper - .setup_sync_response( - vec![("$1", "in the end", false), ("$2", "it doesn't even matter", false)], - Some(Vec::new()), - ) + .setup_sync_response(vec![(event_1, false), (event_2, false)], Some(Vec::new())) .await; - assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::Clear); + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); assert_pending!(timeline_stream); test_helper.server.reset().await; } @@ -159,11 +190,17 @@ async fn test_max_events_to_load_is_honored() { let _ = test_helper.setup_initial_sync_response().await; test_helper.server.reset().await; + let f = EventFactory::new().room(&room_id).sender(*BOB); + let pinned_event = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + // Load initial timeline items: a text message and a `m.room.pinned_events` // with event $1 and $2 pinned - let _ = test_helper - .setup_sync_response(vec![("$1", "in the end", false)], Some(vec!["$1", "$2"])) - .await; + let _ = + test_helper.setup_sync_response(vec![(pinned_event, false)], Some(vec!["$1", "$2"])).await; let room = test_helper.client.get_room(&room_id).unwrap(); let ret = Timeline::builder(&room) @@ -191,11 +228,17 @@ async fn test_cached_events_are_kept_for_different_room_instances() { let _ = test_helper.setup_initial_sync_response().await; test_helper.server.reset().await; + let f = EventFactory::new().room(&room_id).sender(*BOB); + let pinned_event = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + // Load initial timeline items: a text message and a `m.room.pinned_events` // with event $1 and $2 pinned - let _ = test_helper - .setup_sync_response(vec![("$1", "in the end", false)], Some(vec!["$1", "$2"])) - .await; + let _ = + test_helper.setup_sync_response(vec![(pinned_event, false)], Some(vec!["$1", "$2"])).await; let room = test_helper.client.get_room(&room_id).unwrap(); let (room_cache, _drop_handles) = room.event_cache().await.unwrap(); @@ -319,6 +362,279 @@ async fn test_pinned_timeline_with_no_pinned_event_ids_is_just_empty() { test_helper.server.reset().await; } +#[async_test] +async fn test_edited_events_are_reflected_in_sync() { + let mut test_helper = TestHelper::new().await; + let room_id = test_helper.room_id.clone(); + + // Join the room + let _ = test_helper.setup_initial_sync_response().await; + test_helper.server.reset().await; + + let f = EventFactory::new().room(&room_id).sender(*BOB); + let pinned_event = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load initial timeline items: a text message and a `m.room.pinned_events` with + // event $1 + let _ = test_helper.setup_sync_response(vec![(pinned_event, false)], Some(vec!["$1"])).await; + + let room = test_helper.client.get_room(&room_id).unwrap(); + let timeline = Timeline::builder(&room) + .with_focus(TimelineFocus::PinnedEvents { max_events_to_load: 100 }) + .build() + .await + .unwrap(); + test_helper.server.reset().await; + + assert!( + timeline.live_back_pagination_status().await.is_none(), + "there should be no live back-pagination status for a focused timeline" + ); + + // Load timeline items + let (items, mut timeline_stream) = timeline.subscribe().await; + + assert_eq!(items.len(), 1 + 1); // event item + a day divider + assert!(items[0].is_day_divider()); + assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "in the end"); + assert_pending!(timeline_stream); + test_helper.server.reset().await; + + let edited_event = f + .text_msg("edited message!") + .edit( + event_id!("$1"), + RoomMessageEventContentWithoutRelation::text_plain("* edited message!"), + ) + .event_id(event_id!("$2")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load new pinned event contents from sync, where $2 is and edit on $1 + let _ = test_helper.setup_sync_response(vec![(edited_event, true)], None).await; + + // The list is reloaded, so it's reset + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); + // Then the loaded list items are added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + let event = value.as_event().unwrap(); + assert_eq!(event.event_id().unwrap(), event_id!("$1")); + }); + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushFront { value } => { + assert!(value.is_day_divider()); + }); + // The edit replaces the original event + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Set { index, value } => { + assert_eq!(index, 1); + match value.as_event().unwrap().content() { + TimelineItemContent::Message(m) => { + assert_eq!(m.body(), "* edited message!") + } + _ => panic!("Should be a message event"), + } + }); + assert_pending!(timeline_stream); + test_helper.server.reset().await; +} + +#[async_test] +async fn test_redacted_events_are_reflected_in_sync() { + let mut test_helper = TestHelper::new().await; + let room_id = test_helper.room_id.clone(); + + // Join the room + let _ = test_helper.setup_initial_sync_response().await; + test_helper.server.reset().await; + + let f = EventFactory::new().room(&room_id).sender(*BOB); + let pinned_event = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load initial timeline items: a text message and a `m.room.pinned_events` with + // event $1 + let _ = test_helper.setup_sync_response(vec![(pinned_event, false)], Some(vec!["$1"])).await; + + let room = test_helper.client.get_room(&room_id).unwrap(); + let timeline = Timeline::builder(&room) + .with_focus(TimelineFocus::PinnedEvents { max_events_to_load: 100 }) + .build() + .await + .unwrap(); + test_helper.server.reset().await; + + assert!( + timeline.live_back_pagination_status().await.is_none(), + "there should be no live back-pagination status for a focused timeline" + ); + + // Load timeline items + let (items, mut timeline_stream) = timeline.subscribe().await; + + assert_eq!(items.len(), 1 + 1); // event item + a day divider + assert!(items[0].is_day_divider()); + assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "in the end"); + assert_pending!(timeline_stream); + test_helper.server.reset().await; + + let redaction_event = f + .redaction(event_id!("$1")) + .event_id(event_id!("$2")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load new pinned event contents from sync, where $1 is now redacted + let _ = test_helper.setup_sync_response(vec![(redaction_event, true)], None).await; + + // The list is reloaded, so it's reset + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); + // Then the loaded list items are added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + let event = value.as_event().unwrap(); + assert_eq!(event.event_id().unwrap(), event_id!("$1")); + }); + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushFront { value } => { + assert!(value.is_day_divider()); + }); + // The redaction replaces the original event + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Set { index, value } => { + assert_eq!(index, 1); + assert_matches!(value.as_event().unwrap().content(), TimelineItemContent::RedactedMessage); + }); + assert_pending!(timeline_stream); + test_helper.server.reset().await; +} + +#[async_test] +async fn test_edited_events_survive_pinned_event_ids_change() { + let mut test_helper = TestHelper::new().await; + let room_id = test_helper.room_id.clone(); + + // Join the room + let _ = test_helper.setup_initial_sync_response().await; + test_helper.server.reset().await; + + let f = EventFactory::new().room(&room_id).sender(*BOB); + let pinned_event = f + .text_msg("in the end") + .event_id(event_id!("$1")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load initial timeline items: a text message and a `m.room.pinned_events` with + // event $1 + let _ = test_helper.setup_sync_response(vec![(pinned_event, false)], Some(vec!["$1"])).await; + + let room = test_helper.client.get_room(&room_id).unwrap(); + let timeline = Timeline::builder(&room) + .with_focus(TimelineFocus::PinnedEvents { max_events_to_load: 100 }) + .build() + .await + .unwrap(); + test_helper.server.reset().await; + + assert!( + timeline.live_back_pagination_status().await.is_none(), + "there should be no live back-pagination status for a focused timeline" + ); + + // Load timeline items + let (items, mut timeline_stream) = timeline.subscribe().await; + + assert_eq!(items.len(), 1 + 1); // event item + a day divider + assert!(items[0].is_day_divider()); + assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "in the end"); + assert_pending!(timeline_stream); + test_helper.server.reset().await; + + let edited_pinned_event = f + .text_msg("* edited message!") + .edit( + event_id!("$1"), + RoomMessageEventContentWithoutRelation::text_plain("edited message!"), + ) + .event_id(event_id!("$2")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load new pinned event contents from sync, $2 was pinned but wasn't available + // before + let _ = test_helper.setup_sync_response(vec![(edited_pinned_event, true)], None).await; + test_helper.server.reset().await; + + // The list is reloaded, so it's reset + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); + // Then the loaded list items are added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + let event = value.as_event().unwrap(); + assert_eq!(event.event_id().unwrap(), event_id!("$1")); + }); + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushFront { value } => { + assert!(value.is_day_divider()); + }); + // The edit replaces the original event + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Set { index, value } => { + assert_eq!(index, 1); + match value.as_event().unwrap().content() { + TimelineItemContent::Message(m) => { + assert_eq!(m.body(), "edited message!") + } + _ => panic!("Should be a message event"), + } + }); + assert_pending!(timeline_stream); + + let new_pinned_event = f + .text_msg("new message") + .event_id(event_id!("$3")) + .server_ts(MilliSecondsSinceUnixEpoch::now()) + .into_timeline(); + + // Load new pinned event contents from sync: $3 + let _ = test_helper + .setup_sync_response(vec![(new_pinned_event, true)], Some(vec!["$1", "$3"])) + .await; + test_helper.server.reset().await; + + // New item gets added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + let event = value.as_event().unwrap(); + assert_eq!(event.event_id().unwrap(), event_id!("$3")); + }); + // The list is reloaded, so it's reset + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Clear); + // Then the loaded list items are added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + let event = value.as_event().unwrap(); + assert_eq!(event.event_id().unwrap(), event_id!("$1")); + }); + // The edit replaces the original event + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::Set { index, value } => { + assert_eq!(index, 0); + match value.as_event().unwrap().content() { + TimelineItemContent::Message(m) => { + assert_eq!(m.body(), "edited message!") + } + _ => panic!("Should be a message event"), + } + }); + // The new pinned event is added + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => { + let event = value.as_event().unwrap(); + assert_eq!(event.event_id().unwrap(), event_id!("$3")); + }); + assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushFront { value } => { + assert!(value.is_day_divider()); + }); + assert_pending!(timeline_stream); +} + struct TestHelper { pub client: Client, pub server: MockServer, @@ -355,20 +671,23 @@ impl TestHelper { async fn setup_sync_response( &mut self, - text_messages: Vec<(&str, &str, bool)>, + text_messages: Vec<(TimelineEvent, bool)>, pinned_event_ids: Option>, ) -> Result { let mut joined_room_builder = JoinedRoomBuilder::new(&self.room_id); - for (id, txt, add_to_timeline) in text_messages { - let event_id: OwnedEventId = id.try_into().unwrap(); - let f = EventFactory::new().room(&self.room_id); - let event_builder = f.text_msg(txt).event_id(&event_id).sender(*BOB); - mock_event(&self.server, &self.room_id, &event_id, event_builder.into_timeline()).await; + for (timeline_event, add_to_timeline) in text_messages { + let deserialized_event = timeline_event.event.deserialize()?; + mock_event( + &self.server, + &self.room_id, + deserialized_event.event_id(), + timeline_event.clone(), + ) + .await; if add_to_timeline { - let event_builder = f.text_msg(txt).event_id(&event_id).sender(*BOB); - joined_room_builder = joined_room_builder - .add_timeline_event(event_builder.into_raw_timeline().cast()); + joined_room_builder = + joined_room_builder.add_timeline_event(timeline_event.event.cast()); } } diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 0238ffd5e4e..bbedf546fee 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -570,8 +570,9 @@ impl RoomEventCache { /// Save a single event in the event cache, for further retrieval with /// [`Self::event`]. - // This doesn't insert the event into the linked chunk. In the future there'll - // be no distinction between the linked chunk and the separate cache. + // TODO: This doesn't insert the event into the linked chunk. In the future + // there'll be no distinction between the linked chunk and the separate + // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886. pub(crate) async fn save_event(&self, event: SyncTimelineEvent) { if let Some(event_id) = event.event_id() { let mut cache = self.inner.all_events_cache.write().await; @@ -582,6 +583,24 @@ impl RoomEventCache { warn!("couldn't save event without event id in the event cache"); } } + + /// Save some events in the event cache, for further retrieval with + /// [`Self::event`]. This function will save them using a single lock, + /// as opposed to [`Self::save_event`]. + // TODO: This doesn't insert the event into the linked chunk. In the future + // there'll be no distinction between the linked chunk and the separate + // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886. + pub(crate) async fn save_events(&self, events: impl IntoIterator) { + let mut cache = self.inner.all_events_cache.write().await; + for event in events { + if let Some(event_id) = event.event_id() { + self.inner.append_related_event(&mut cache, &event); + cache.events.insert(event_id, (self.inner.room_id.clone(), event)); + } else { + warn!("couldn't save event without event id in the event cache"); + } + } + } } /// The (non-clonable) details of the `RoomEventCache`. diff --git a/crates/matrix-sdk/src/event_cache/paginator.rs b/crates/matrix-sdk/src/event_cache/paginator.rs index 228fd219fb3..010bc327383 100644 --- a/crates/matrix-sdk/src/event_cache/paginator.rs +++ b/crates/matrix-sdk/src/event_cache/paginator.rs @@ -467,25 +467,25 @@ impl PaginableRoom for Room { lazy_load_members: bool, num_events: UInt, ) -> Result { - let response = match self.event_with_context(event_id, lazy_load_members, num_events).await - { - Ok(result) => result, - - Err(err) => { - // If the error was a 404, then the event wasn't found on the server; - // special case this to make it easy to react to - // such an error. - if let Some(error) = err.as_client_api_error() { - if error.status_code == 404 { - // Event not found - return Err(PaginatorError::EventNotFound(event_id.to_owned())); + let response = + match self.event_with_context(event_id, lazy_load_members, num_events, None).await { + Ok(result) => result, + + Err(err) => { + // If the error was a 404, then the event wasn't found on the server; + // special case this to make it easy to react to + // such an error. + if let Some(error) = err.as_client_api_error() { + if error.status_code == 404 { + // Event not found + return Err(PaginatorError::EventNotFound(event_id.to_owned())); + } } - } - // Otherwise, just return a wrapped error. - return Err(PaginatorError::SdkError(Box::new(err))); - } - }; + // Otherwise, just return a wrapped error. + return Err(PaginatorError::SdkError(Box::new(err))); + } + }; Ok(response) } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 6ddab10c864..38e999f0caf 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -22,7 +22,7 @@ use matrix_sdk_base::{ ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey, StateStoreDataValue, }; -use matrix_sdk_common::timeout::timeout; +use matrix_sdk_common::{deserialized_responses::SyncTimelineEvent, timeout::timeout}; use mime::Mime; #[cfg(feature = "e2e-encryption")] use ruma::events::{ @@ -417,6 +417,7 @@ impl Room { event_id: &EventId, lazy_load_members: bool, context_size: UInt, + request_config: Option, ) -> Result { let mut request = context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned()); @@ -428,7 +429,7 @@ impl Room { LazyLoadOptions::Enabled { include_redundant_members: false }; } - let response = self.client.send(request, None).await?; + let response = self.client.send(request, request_config).await?; let target_event = if let Some(event) = response.event { Some(self.try_decrypt_event(event).await?) @@ -445,6 +446,24 @@ impl Room { ) .await?; + // Save the loaded events into the event cache, if it's set up. + if let Ok((cache, _handles)) = self.event_cache().await { + let mut events_to_save: Vec = Vec::new(); + if let Some(event) = &target_event { + events_to_save.push(event.clone().into()); + } + + for event in &events_before { + events_to_save.push(event.clone().into()); + } + + for event in &events_after { + events_to_save.push(event.clone().into()); + } + + cache.save_events(events_to_save).await; + } + Ok(EventWithContextResponse { event: target_event, events_before, diff --git a/crates/matrix-sdk/src/test_utils.rs b/crates/matrix-sdk/src/test_utils.rs index 7834d42c674..d99808a3ba1 100644 --- a/crates/matrix-sdk/src/test_utils.rs +++ b/crates/matrix-sdk/src/test_utils.rs @@ -2,7 +2,11 @@ #![allow(dead_code)] +use std::{fmt::Debug, time::Duration}; + use assert_matches2::assert_let; +use futures_core::Stream; +use futures_util::StreamExt; use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, SessionMeta}; use ruma::{ api::MatrixVersion, @@ -10,6 +14,7 @@ use ruma::{ events::{room::message::MessageType, AnySyncMessageLikeEvent, AnySyncTimelineEvent}, user_id, }; +use tokio::time::timeout; use url::Url; pub mod events; @@ -99,3 +104,48 @@ pub async fn logged_in_client_with_server() -> (Client, wiremock::MockServer) { let client = logged_in_client(Some(server.uri().to_string())).await; (client, server) } + +/// Loads the next item in a [`Stream`] with a timeout in milliseconds. +pub async fn next_with_timeout( + stream: &mut (impl Stream + Unpin), + timeout_ms: u64, +) -> I { + timeout(Duration::from_millis(timeout_ms), stream.next()) + .await + .expect("Next event timed out") + .expect("No next event received") +} + +/// Asserts the next item in a [`Stream`] can be loaded in the given timeout in +/// the given timeout in milliseconds. +#[macro_export] +macro_rules! assert_next_with_timeout { + ($stream:expr, $timeout_ms:expr) => { + $crate::test_utils::next_with_timeout($stream, $timeout_ms).await + }; +} + +/// Assert the next item in a [`Stream`] matches the provided pattern in the +/// given timeout in milliseconds. +/// +/// If no timeout is provided, a default `100ms` value will be used. +#[macro_export] +macro_rules! assert_next_matches_with_timeout { + ($stream:expr, $pat:pat) => { + $crate::assert_next_matches_with_timeout!($stream, $pat => {}) + }; + ($stream:expr, $pat:pat => $arm:expr) => { + $crate::assert_next_matches_with_timeout!($stream, 100, $pat => $arm) + }; + ($stream:expr, $timeout_ms:expr, $pat:pat => $arm:expr) => { + match $crate::assert_next_with_timeout!(&mut $stream, $timeout_ms) { + $pat => $arm, + val => { + ::core::panic!( + "assertion failed: `{:?}` does not match `{}`", + val, ::core::stringify!($pat) + ); + } + } + }; +} diff --git a/crates/matrix-sdk/tests/integration/room/common.rs b/crates/matrix-sdk/tests/integration/room/common.rs index 75a083817dd..50403965b66 100644 --- a/crates/matrix-sdk/tests/integration/room/common.rs +++ b/crates/matrix-sdk/tests/integration/room/common.rs @@ -1,7 +1,11 @@ use std::{iter, time::Duration}; use assert_matches2::assert_let; -use matrix_sdk::{config::SyncSettings, room::RoomMember, DisplayName, RoomMemberships}; +use js_int::uint; +use matrix_sdk::{ + config::SyncSettings, room::RoomMember, test_utils::events::EventFactory, DisplayName, + RoomMemberships, +}; use matrix_sdk_test::{ async_test, bulk_room_members, sync_state_event, sync_timeline_event, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, LeftRoomBuilder, StateTestEvent, @@ -10,14 +14,14 @@ use matrix_sdk_test::{ use ruma::{ event_id, events::{ - room::member::MembershipState, AnyStateEvent, AnySyncStateEvent, AnyTimelineEvent, - StateEventType, + room::{member::MembershipState, message::RoomMessageEventContent}, + AnyStateEvent, AnySyncStateEvent, AnyTimelineEvent, StateEventType, }, room_id, }; use serde_json::json; use wiremock::{ - matchers::{body_json, header, method, path_regex}, + matchers::{body_json, header, method, path, path_regex}, Mock, ResponseTemplate, }; @@ -620,6 +624,9 @@ async fn test_event() { let event_id = event_id!("$foun39djjod0f"); let (client, server) = logged_in_client_with_server().await; + let cache = client.event_cache(); + let _ = cache.subscribe(); + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let mut sync_builder = SyncResponseBuilder::new(); @@ -668,6 +675,76 @@ async fn test_event() { let push_actions = timeline_event.push_actions.unwrap(); assert!(push_actions.iter().any(|a| a.is_highlight())); assert!(push_actions.iter().any(|a| a.should_notify())); + + // Requested event was saved to the cache + assert!(cache.event(event_id).await.is_some()); +} + +#[async_test] +async fn test_event_with_context() { + let event_id = event_id!("$cur1234"); + let prev_event_id = event_id!("$prev1234"); + let next_event_id = event_id!("$next_1234"); + + let (client, server) = logged_in_client_with_server().await; + let cache = client.event_cache(); + let _ = cache.subscribe(); + + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut sync_builder = SyncResponseBuilder::new(); + sync_builder + // We need the member event and power levels locally so the push rules processor works. + .add_joined_room( + JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID) + .add_state_event(StateTestEvent::Member) + .add_state_event(StateTestEvent::PowerLevels), + ); + + mock_sync(&server, sync_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap(); + let room_id = room.room_id(); + + let f = EventFactory::new().room(room_id).sender(*BOB); + let event = + f.event(RoomMessageEventContent::text_plain("The requested message")).event_id(event_id); + let event_before = + f.event(RoomMessageEventContent::text_plain("A previous message")).event_id(prev_event_id); + let event_next = + f.event(RoomMessageEventContent::text_plain("A newer message")).event_id(next_event_id); + Mock::given(method("GET")) + .and(path(format!("/_matrix/client/r0/rooms/{room_id}/context/{event_id}"))) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "events_before": [event_before.into_raw_timeline()], + "event": event.into_raw_timeline(), + "events_after": [event_next.into_raw_timeline()], + "state": [], + }))) + .mount(&server) + .await; + + let context_ret = room.event_with_context(event_id, false, uint!(1), None).await.unwrap(); + + assert_let!(Some(timeline_event) = context_ret.event); + assert_let!(Ok(event) = timeline_event.event.deserialize()); + assert_eq!(event.event_id(), event_id); + + assert_eq!(1, context_ret.events_before.len()); + assert_let!(Ok(event) = context_ret.events_before[0].event.deserialize()); + assert_eq!(event.event_id(), prev_event_id); + + assert_eq!(1, context_ret.events_after.len()); + assert_let!(Ok(event) = context_ret.events_after[0].event.deserialize()); + assert_eq!(event.event_id(), next_event_id); + + // Requested event and their context ones were saved to the cache + assert!(cache.event(event_id).await.is_some()); + assert!(cache.event(prev_event_id).await.is_some()); + assert!(cache.event(next_event_id).await.is_some()); } #[async_test] diff --git a/testing/matrix-sdk-integration-testing/src/tests/room.rs b/testing/matrix-sdk-integration-testing/src/tests/room.rs index 04a7a7026f9..811e1729820 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/room.rs @@ -94,7 +94,7 @@ async fn test_event_with_context() -> Result<()> { { // First /context query: only the target event, no context around it. - let response = room.event_with_context(&event_id, false, uint!(0)).await?; + let response = room.event_with_context(&event_id, false, uint!(0), None).await?; let target = response .event @@ -113,7 +113,8 @@ async fn test_event_with_context() -> Result<()> { { // Next query: an event that doesn't exist (hopefully!). - let response = room.event_with_context(event_id!("$lolololol"), false, uint!(0)).await; + let response = + room.event_with_context(event_id!("$lolololol"), false, uint!(0), None).await; // Servers answers with 404. assert_let!(Err(err) = response); @@ -123,7 +124,7 @@ async fn test_event_with_context() -> Result<()> { { // Next query: target event with a context of 3 events. There // should be some previous and next tokens. - let response = room.event_with_context(&event_id, false, uint!(3)).await?; + let response = room.event_with_context(&event_id, false, uint!(3), None).await?; let target = response .event