Skip to content

feat(sdk): Live updates for pinned events timeline with replacements and redactions #3840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 23, 2024
16 changes: 10 additions & 6 deletions benchmarks/benches/room_bench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use matrix_sdk::{
Expand Down Expand Up @@ -157,7 +157,6 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
.respond_with(move |r: &Request| {
let segments: Vec<&str> = r.url.path_segments().expect("Invalid path").collect();
let event_id_str = segments[6];
// let f = EventFactory::new().room(&room_id)
let event_id = EventId::parse(event_id_str).expect("Invalid event id in response");
let event = f
.text_msg(format!("Message {event_id_str}"))
Expand All @@ -170,9 +169,6 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
})
.mount(&server),
);
// runtime.block_on(server.reset());

client.event_cache().subscribe().unwrap();

let room = client.get_room(&room_id).expect("Room not found");
assert!(!room.pinned_event_ids().is_empty());
Expand All @@ -184,6 +180,15 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
group.throughput(Throughput::Elements(count as u64));
group.sample_size(10);

let client = Arc::new(client);

{
let client = client.clone();
runtime.spawn_blocking(move || {
client.event_cache().subscribe().unwrap();
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really a need to do it for each iteration of the benchmark? I don't think so, it should be subscribed to once and for all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like client.event_cache().subscribe().unwrap(); does some async operations, and the benchmark panics with:

there is no reactor running, must be called from the context of a Tokio 1.x runtime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can wrap Client in an Arc and use runtime.spawn_blocking to run that code? Is there a better solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried fixing this in 807c875, please let me know if there's a better solution.

}

group.bench_function(BenchmarkId::new("load_pinned_events", name), |b| {
b.to_async(&runtime).iter(|| async {
assert!(!room.pinned_event_ids().is_empty());
Expand All @@ -207,7 +212,6 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
{
let _guard = runtime.enter();
runtime.block_on(server.reset());
drop(client);
drop(server);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/src/notification_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl NotificationClient {
return Err(Error::UnknownRoom);
};

let response = room.event_with_context(event_id, true, uint!(0)).await?;
let response = room.event_with_context(event_id, true, uint!(0), None).await?;

let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
let state_events = response.state;
Expand Down
18 changes: 7 additions & 11 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,13 @@ impl TimelineBuilder {
RoomEventCacheUpdate::AddTimelineEvents { events, origin } => {
trace!("Received new timeline events.");

// Note: we deliberately choose to not handle
// updates/reactions/redactions for pinned events.
if !is_pinned_events {
inner.add_events_at(
events,
TimelineEnd::Back,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
}
).await;
}
inner.add_events_at(
events,
TimelineEnd::Back,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
}
).await;
}

RoomEventCacheUpdate::AddEphemeralEvents { events } => {
Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
let can_add_to_live = match self.live_timeline_updates_type {
LiveTimelineUpdatesAllowed::PinnedEvents => {
room_data_provider.is_pinned_event(event_id)
|| room_data_provider
.is_replacement_for_pinned_event(&event_kind)
}
LiveTimelineUpdatesAllowed::All => true,
LiveTimelineUpdatesAllowed::None => false,
Expand Down
53 changes: 41 additions & 12 deletions crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
use thiserror::Error;
use tracing::{debug, warn};

use crate::timeline::event_handler::TimelineEventKind;

const MAX_CONCURRENT_REQUESTS: usize = 10;

/// Utility to load the pinned events in a room.
Expand Down Expand Up @@ -73,8 +75,12 @@ impl PinnedEventsLoader {
let new_events = join_all(pinned_event_ids.into_iter().map(|event_id| {
let provider = self.room.clone();
async move {
match provider.load_event(&event_id, request_config).await {
Ok(event) => Some(event),
match provider.load_event_with_relations(&event_id, request_config).await {
Ok((event, related_events)) => {
let mut events = vec![event];
events.extend(related_events);
Some(events)
}
Err(err) => {
warn!("error when loading pinned event: {err}");
None
Expand All @@ -84,7 +90,13 @@ impl PinnedEventsLoader {
}))
.await;

let mut loaded_events = new_events.into_iter().flatten().collect::<Vec<_>>();
let mut loaded_events = new_events
.into_iter()
// Get only the `Some<Vec<_>>` results
.flatten()
// Flatten the `Vec`s into a single one containing all their items
.flatten()
.collect::<Vec<SyncTimelineEvent>>();
if loaded_events.is_empty() {
return Err(PinnedEventsLoaderError::TimelineReloadFailed);
}
Expand All @@ -102,12 +114,13 @@ impl PinnedEventsLoader {
}

pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
/// Load a single room event using the cache or network.
fn load_event<'a>(
/// Load a single room event using the cache or network and any events
/// related to it, if they are cached.
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
) -> BoxFuture<'a, Result<SyncTimelineEvent, PaginatorError>>;
) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec<SyncTimelineEvent>), PaginatorError>>;

/// Get the pinned event ids for a room.
fn pinned_event_ids(&self) -> Vec<OwnedEventId>;
Expand All @@ -117,26 +130,42 @@ pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
/// It avoids having to clone the whole list of event ids to check a single
/// value.
fn is_pinned_event(&self, event_id: &EventId) -> bool;

/// Returns whether the passed `event_kind` is either an edit or a redaction
/// of a pinned event.
fn is_replacement_for_pinned_event(&self, event_kind: &TimelineEventKind) -> bool {
match event_kind {
TimelineEventKind::Message { relations, .. } => {
if let Some(orig_ev) = &relations.replace {
self.is_pinned_event(orig_ev.event_id())
} else {
false
}
}
TimelineEventKind::Redaction { redacts } => self.is_pinned_event(redacts),
_ => false,
}
}
}

impl PinnedEventsRoom for Room {
fn load_event<'a>(
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
) -> BoxFuture<'a, Result<SyncTimelineEvent, PaginatorError>> {
) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec<SyncTimelineEvent>), PaginatorError>> {
async move {
if let Ok((cache, _handles)) = self.event_cache().await {
if let Some(event) = cache.event(event_id).await {
debug!("Loaded pinned event {event_id} from cache");
return Ok(event);
if let Some(ret) = cache.event_with_relations(event_id).await {
debug!("Loaded pinned event {event_id} and related events from cache");
return Ok(ret);
}
}

debug!("Loading pinned event {event_id} from HS");
self.event(event_id, request_config)
.await
.map(|e| e.into())
.map(|e| (e.into(), Vec::new()))
.map_err(|err| PaginatorError::SdkError(Box::new(err)))
}
.boxed()
Expand Down
6 changes: 3 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ impl PaginableRoom for TestRoomDataProvider {
}

impl PinnedEventsRoom for TestRoomDataProvider {
fn load_event<'a>(
fn load_event_with_relations<'a>(
&'a self,
_event_id: &'a EventId,
_config: Option<RequestConfig>,
) -> BoxFuture<'a, Result<SyncTimelineEvent, PaginatorError>> {
_request_config: Option<RequestConfig>,
) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec<SyncTimelineEvent>), PaginatorError>> {
unimplemented!();
}

Expand Down
Loading
Loading