Skip to content

Commit 659825e

Browse files
committed
base: store timeline to the StateStore
1 parent a46a11a commit 659825e

File tree

7 files changed

+1932
-18
lines changed

7 files changed

+1932
-18
lines changed

matrix_sdk_base/src/client.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::{
2525
use matrix_sdk_common::{
2626
deserialized_responses::{
2727
AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms,
28-
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline,
28+
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice,
2929
},
3030
instant::Instant,
3131
locks::RwLock,
@@ -48,7 +48,11 @@ use ruma::{
4848
DeviceId,
4949
};
5050
use ruma::{
51-
api::client::r0::{self as api, push::get_notifications::Notification},
51+
api::client::r0::{
52+
self as api,
53+
message::get_message_events::{Direction, Response as GetMessageEventsResponse},
54+
push::get_notifications::Notification,
55+
},
5256
events::{
5357
room::member::{MemberEventContent, MembershipState},
5458
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
@@ -836,6 +840,15 @@ impl BaseClient {
836840
let notification_count = new_info.unread_notifications.into();
837841
room_info.update_notification_count(notification_count);
838842

843+
changes.add_timeline(
844+
&room_id,
845+
TimelineSlice::new(
846+
timeline.events.iter().cloned().rev().collect(),
847+
next_batch.clone(),
848+
timeline.prev_batch.clone(),
849+
),
850+
);
851+
839852
new_rooms.join.insert(
840853
room_id,
841854
JoinedRoom::new(
@@ -879,6 +892,14 @@ impl BaseClient {
879892
self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
880893
.await;
881894

895+
changes.add_timeline(
896+
&room_id,
897+
TimelineSlice::new(
898+
timeline.events.iter().cloned().rev().collect(),
899+
next_batch.clone(),
900+
timeline.prev_batch.clone(),
901+
),
902+
);
882903
changes.add_room(room_info);
883904
new_rooms
884905
.leave
@@ -949,6 +970,64 @@ impl BaseClient {
949970
}
950971
}
951972

973+
/// Receive a successful /messages response.
974+
///
975+
/// * `response` - The successful response from /messages.
976+
pub async fn receive_messages(
977+
&self,
978+
room_id: &RoomId,
979+
direction: &Direction,
980+
response: &GetMessageEventsResponse,
981+
) -> Result<Vec<SyncRoomEvent>> {
982+
let mut changes = StateChanges::default();
983+
984+
let mut events: Vec<SyncRoomEvent> = vec![];
985+
for event in &response.chunk {
986+
#[allow(unused_mut)]
987+
let mut event: SyncRoomEvent = event.clone().into();
988+
989+
#[cfg(feature = "encryption")]
990+
match hoist_room_event_prev_content(&event.event) {
991+
Ok(e) => {
992+
if let AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted(
993+
encrypted,
994+
)) = e
995+
{
996+
if let Some(olm) = self.olm_machine().await {
997+
if let Ok(decrypted) = olm.decrypt_room_event(&encrypted, room_id).await
998+
{
999+
event = decrypted;
1000+
}
1001+
}
1002+
}
1003+
}
1004+
Err(error) => {
1005+
warn!("Error deserializing event {:?}", error);
1006+
}
1007+
}
1008+
1009+
events.push(event);
1010+
}
1011+
1012+
let (chunk, start, end) = match direction {
1013+
Direction::Backward => {
1014+
(events.clone(), response.start.clone().unwrap(), response.end.clone())
1015+
}
1016+
Direction::Forward => (
1017+
events.iter().rev().cloned().collect(),
1018+
response.end.clone().unwrap(),
1019+
response.start.clone(),
1020+
),
1021+
};
1022+
1023+
let timeline = TimelineSlice::new(chunk, start, end);
1024+
changes.add_timeline(room_id, timeline);
1025+
1026+
self.store().save_changes(&changes).await?;
1027+
1028+
Ok(events)
1029+
}
1030+
9521031
/// Receive a get member events response and convert it to a deserialized
9531032
/// `MembersResponse`
9541033
///

matrix_sdk_base/src/store/memory_store.rs

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use dashmap::{DashMap, DashSet};
2121
use lru::LruCache;
2222
use matrix_sdk_common::{async_trait, instant::Instant, locks::Mutex};
2323
use ruma::{
24+
api::client::r0::message::get_message_events::Direction,
2425
events::{
2526
presence::PresenceEvent,
2627
receipt::Receipt,
@@ -34,9 +35,9 @@ use ruma::{
3435
};
3536
use tracing::info;
3637

37-
use super::{Result, RoomInfo, StateChanges, StateStore};
38+
use super::{Result, RoomInfo, StateChanges, StateStore, StoredTimelineSlice};
3839
use crate::{
39-
deserialized_responses::{MemberEvent, StrippedMemberEvent},
40+
deserialized_responses::{MemberEvent, StrippedMemberEvent, TimelineSlice},
4041
media::{MediaRequest, UniqueKey},
4142
};
4243

@@ -66,6 +67,8 @@ pub struct MemoryStore {
6667
room_event_receipts:
6768
Arc<DashMap<RoomId, DashMap<String, DashMap<EventId, DashMap<UserId, Receipt>>>>>,
6869
media: Arc<Mutex<LruCache<String, Vec<u8>>>>,
70+
timeline_slices: Arc<DashMap<RoomId, DashMap<String, TimelineSlice>>>,
71+
event_id_to_timeline_slice: Arc<DashMap<RoomId, DashMap<EventId, String>>>,
6972
}
7073

7174
impl MemoryStore {
@@ -90,6 +93,8 @@ impl MemoryStore {
9093
room_user_receipts: DashMap::new().into(),
9194
room_event_receipts: DashMap::new().into(),
9295
media: Arc::new(Mutex::new(LruCache::new(100))),
96+
timeline_slices: DashMap::new().into(),
97+
event_id_to_timeline_slice: DashMap::new().into(),
9398
}
9499
}
95100

@@ -271,6 +276,23 @@ impl MemoryStore {
271276
}
272277
}
273278

279+
for (room, timeline) in &changes.timeline {
280+
// FIXME: make sure that we don't add events already known
281+
for event in &timeline.events {
282+
let event_id = event.event_id();
283+
284+
self.event_id_to_timeline_slice
285+
.entry(room.clone())
286+
.or_insert_with(DashMap::new)
287+
.insert(event_id, timeline.start.clone());
288+
}
289+
290+
self.timeline_slices
291+
.entry(room.clone())
292+
.or_insert_with(DashMap::new)
293+
.insert(timeline.start.clone(), timeline.clone());
294+
}
295+
274296
info!("Saved changes in {:?}", now.elapsed());
275297

276298
Ok(())
@@ -424,6 +446,93 @@ impl MemoryStore {
424446

425447
Ok(())
426448
}
449+
450+
async fn get_timeline(
451+
&self,
452+
room_id: &RoomId,
453+
start: Option<&EventId>,
454+
end: Option<&EventId>,
455+
limit: Option<usize>,
456+
direction: Direction,
457+
) -> Result<Option<StoredTimelineSlice>> {
458+
let event_id_to_timeline_slice = self.event_id_to_timeline_slice.get(room_id).unwrap();
459+
let timeline_slices = self.timeline_slices.get(room_id).unwrap();
460+
461+
let slice = if let Some(start) = start {
462+
event_id_to_timeline_slice.get(start).and_then(|s| timeline_slices.get(&*s))
463+
} else {
464+
self.get_sync_token().await?.and_then(|s| timeline_slices.get(&*s))
465+
};
466+
467+
let mut slice = if let Some(slice) = slice {
468+
slice
469+
} else {
470+
return Ok(None);
471+
};
472+
473+
let mut timeline = StoredTimelineSlice::new(vec![], None);
474+
475+
match direction {
476+
Direction::Forward => {
477+
loop {
478+
timeline.events.append(&mut slice.events.iter().rev().cloned().collect());
479+
480+
timeline.token = Some(slice.start.clone());
481+
482+
let found_end = end.map_or(false, |end| {
483+
slice.events.iter().any(|event| &event.event_id() == end)
484+
});
485+
486+
if found_end {
487+
break;
488+
}
489+
490+
if let Some(next_slice) = timeline_slices.get(&slice.start) {
491+
slice = next_slice;
492+
} else {
493+
// No more timeline slices or a gap in the known timeline
494+
break;
495+
}
496+
497+
if let Some(limit) = limit {
498+
if timeline.events.len() >= limit {
499+
break;
500+
}
501+
}
502+
}
503+
}
504+
Direction::Backward => {
505+
loop {
506+
timeline.events.append(&mut slice.events.clone());
507+
timeline.token = slice.end.clone();
508+
509+
let found_end = end.map_or(false, |end| {
510+
slice.events.iter().any(|event| &event.event_id() == end)
511+
});
512+
513+
if found_end {
514+
break;
515+
}
516+
517+
if let Some(prev_slice) =
518+
slice.end.as_deref().and_then(|end| timeline_slices.get(end))
519+
{
520+
slice = prev_slice;
521+
} else {
522+
// No more timeline slices or we have a gap in the known timeline
523+
break;
524+
}
525+
526+
if let Some(limit) = limit {
527+
if timeline.events.len() >= limit {
528+
break;
529+
}
530+
}
531+
}
532+
}
533+
}
534+
Ok(Some(timeline))
535+
}
427536
}
428537

429538
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -555,6 +664,16 @@ impl StateStore for MemoryStore {
555664
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
556665
self.remove_media_content_for_uri(uri).await
557666
}
667+
async fn get_timeline(
668+
&self,
669+
room_id: &RoomId,
670+
start: Option<&EventId>,
671+
end: Option<&EventId>,
672+
limit: Option<usize>,
673+
direction: Direction,
674+
) -> Result<Option<StoredTimelineSlice>> {
675+
self.get_timeline(room_id, start, end, limit, direction).await
676+
}
558677
}
559678

560679
#[cfg(test)]

matrix_sdk_base/src/store/mod.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use std::{
2323
use dashmap::DashMap;
2424
use matrix_sdk_common::{async_trait, locks::RwLock, AsyncTraitDeps};
2525
use ruma::{
26-
api::client::r0::push::get_notifications::Notification,
26+
api::client::r0::{
27+
message::get_message_events::Direction, push::get_notifications::Notification,
28+
},
2729
events::{
2830
presence::PresenceEvent,
2931
receipt::{Receipt, ReceiptEventContent},
@@ -39,7 +41,7 @@ use ruma::{
3941
use sled::Db;
4042

4143
use crate::{
42-
deserialized_responses::{MemberEvent, StrippedMemberEvent},
44+
deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent, TimelineSlice},
4345
media::MediaRequest,
4446
rooms::{RoomInfo, RoomType},
4547
Room, Session,
@@ -280,6 +282,29 @@ pub trait StateStore: AsyncTraitDeps {
280282
///
281283
/// * `uri` - The `MxcUri` of the media files.
282284
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()>;
285+
286+
/// Get a slice of the timeline of a room.
287+
///
288+
/// # Arguments
289+
///
290+
/// * `room_id` - The id of the room for which the timeline should be
291+
/// fetched.
292+
///
293+
/// * `start` - The start point from which events should be returned.
294+
///
295+
/// * `end` - The end point to which events should be returned.
296+
///
297+
/// * `limit` - The maximum number of events to return.
298+
///
299+
/// * `direction` - The direction events should be returned.
300+
async fn get_timeline(
301+
&self,
302+
room_id: &RoomId,
303+
start: Option<&EventId>,
304+
end: Option<&EventId>,
305+
limit: Option<usize>,
306+
direction: Direction,
307+
) -> Result<Option<StoredTimelineSlice>>;
283308
}
284309

285310
/// A state store wrapper for the SDK.
@@ -453,6 +478,9 @@ pub struct StateChanges {
453478
pub ambiguity_maps: BTreeMap<RoomId, BTreeMap<String, BTreeSet<UserId>>>,
454479
/// A map of `RoomId` to a vector of `Notification`s
455480
pub notifications: BTreeMap<RoomId, Vec<Notification>>,
481+
482+
/// A mapping of `RoomId` to a `TimelineSlice`
483+
pub timeline: BTreeMap<RoomId, TimelineSlice>,
456484
}
457485

458486
impl StateChanges {
@@ -537,4 +565,27 @@ impl StateChanges {
537565
pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
538566
self.receipts.insert(room_id.to_owned(), event);
539567
}
568+
569+
/// Update the `StateChanges` struct with the given room with a new
570+
/// `TimelineSlice`.
571+
pub fn add_timeline(&mut self, room_id: &RoomId, timeline: TimelineSlice) {
572+
self.timeline.insert(room_id.to_owned(), timeline);
573+
}
574+
}
575+
576+
/// Store state changes and pass them to the StateStore.
577+
#[derive(Debug, Default)]
578+
pub struct StoredTimelineSlice {
579+
/// A start token to fetch more events if the requested slice isn't fully
580+
/// known.
581+
pub token: Option<String>,
582+
583+
/// The requested events
584+
pub events: Vec<SyncRoomEvent>,
585+
}
586+
587+
impl StoredTimelineSlice {
588+
pub fn new(events: Vec<SyncRoomEvent>, token: Option<String>) -> Self {
589+
Self { events, token }
590+
}
540591
}

0 commit comments

Comments
 (0)