From f0d08c7c5e24413cbe1f7f68f5218c2e24206293 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Wed, 12 May 2021 06:37:53 -0400 Subject: [PATCH 1/2] store: Create Timeline struct to hold event chunks --- matrix_sdk_base/src/store/mod.rs | 408 ++++++++++++++++++++++++++++++- matrix_sdk_crypto/Cargo.toml | 2 +- 2 files changed, 407 insertions(+), 3 deletions(-) diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index a9b20dd0a12..74dc7e87675 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -22,13 +22,18 @@ use std::{ use dashmap::DashMap; use matrix_sdk_common::{ - api::r0::push::get_notifications::Notification, + api::r0::{ + message::get_message_events::{ + Direction, Request as MessagesRequest, Response as MessagesResponse, + }, + push::get_notifications::Notification, + }, async_trait, events::{ presence::PresenceEvent, room::member::MemberEventContent, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, }, - identifiers::{RoomId, UserId}, + identifiers::{EventId, RoomId, UserId}, locks::RwLock, AsyncTraitDeps, Raw, }; @@ -463,3 +468,402 @@ impl StateChanges { self.notifications.entry(room_id.to_owned()).or_insert_with(Vec::new).push(notification); } } + +/// A token that represents the last known event before incoming events +/// are added via /sync or /messages. +pub type PrevBatchToken = String; + +/// The ending of a chunk of events from /messages. This will match the +/// next prev_batch token from a /sync. +pub type NextBatchToken = String; + +/// The position of a chunk of events in the event stream. This is +/// based on the ordering of `PrevBatchToken` and `NextBatchToken`. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SliceIdx(u128); + +/// The position of an event within a slice or chunk. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct EventIdx(u128); + +/// Represents a chunk of events from either /sync or /messages. +#[derive(Clone, Debug)] +pub struct TimelineSlice { + start: PrevBatchToken, + end: NextBatchToken, +} + +impl TimelineSlice { + pub fn new(start: PrevBatchToken, end: NextBatchToken) -> Self { + Self { start, end } + } +} + +/// `EventOwnerMap` keeps track of the ordering of all events and allows getting +/// events from a `PrevBatchToken`, `EventId` or `SliceIdx`. +#[derive(Clone, Debug)] +pub struct EventOwnerMap { + slice_map: BTreeMap>, + event_map: BTreeMap, + // Now we can go from an eventId to a sub-slice of it's parent SliceId, I don't think + // we could before? + event_index: BTreeMap, +} + +/// Store the new events from /sync or /messages. +#[derive(Clone, Debug)] +pub struct Timeline { + slices: BTreeMap, + /// Maps a `prev_batch` token to a `SliceIdx` "forwards". + prev_slice_map: BTreeMap, + /// Maps a `next_batch` token to a `SliceIdx` "backwards". + /// + /// This points to the same `SliceIdx` `prev_slice_map` does but with the + /// `next_batch` token. + next_slice_map: BTreeMap, + events: EventOwnerMap, +} + +impl Timeline { + /// Create an empty `Timeline`. + pub fn new() -> Self { + Self { + slices: BTreeMap::new(), + prev_slice_map: BTreeMap::new(), + next_slice_map: BTreeMap::new(), + events: EventOwnerMap { + slice_map: BTreeMap::new(), + event_map: BTreeMap::new(), + event_index: BTreeMap::new(), + }, + } + } + + /// Is this `Timeline` empty it has received no /sync or /message responses. + pub fn is_empty(&self) -> bool { + self.slices.is_empty() && self.prev_slice_map.is_empty() + } + + /// + pub fn handle_sync_timeline( + &mut self, + room_id: &RoomId, + prev_batch: &str, + next_batch: &str, + content: &[AnySyncRoomEvent], + ) { + let timeline_slice = TimelineSlice::new(prev_batch.to_owned(), next_batch.to_owned()); + + // The new `prev_batch` token is the `next_batch` token of previous /sync + if let Some(SliceIdx(idx)) = self.next_slice_map.get(prev_batch) { + let next_idx = SliceIdx(idx + 1); + let last_event_index = self + .events + .slice_map + .values() + .last() + .and_then(|map| map.keys().last()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .unwrap_or_default(); + + self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); + self.next_slice_map.insert(next_batch.to_owned(), next_idx); + self.slices.insert(next_idx, timeline_slice); + + self.update_events_map_forward(content, last_event_index, next_idx); + } else if self.is_empty() { + let next_idx = SliceIdx(u128::MAX / 2); + self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); + self.next_slice_map.insert(next_batch.to_owned(), next_idx); + self.slices.insert(next_idx, timeline_slice); + + self.update_events_map_forward(content, u128::MAX / 2, next_idx); + } else { + todo!("hmmm we got a problem or a gap") + } + } + + /// + pub fn handle_messages_response( + &mut self, + room_id: &RoomId, + resp: &MessagesResponse, + dir: Direction, + ) { + match dir { + // the end token is how to request older events + // events are in reverse-chronological order + Direction::Backward => { + match (&resp.end, &resp.start) { + (Some(prev_batch), Some(end)) => { + let timeline_slice = + TimelineSlice::new(prev_batch.to_owned(), end.to_owned()); + + let old = self.prev_slice_map.get(prev_batch); + let recent = self.prev_slice_map.get(end); + match (old, recent) { + // We have the full chunk already + (Some(_), Some(_)) => {} + (Some(_), None) => { + // We have a gap + // A -> B -> gap -> F + // we know B but we must have gotten an + // incomplete chunk or + // something + } + // we have the recent token but not the older so fill + // backwards + (None, Some(SliceIdx(idx))) => { + let prev_idx = SliceIdx(idx - 1); + let prev_event_index = self + .events + .slice_map + .get(&SliceIdx(*idx)) + .and_then(|map| map.keys().next()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .unwrap_or_default(); + + self.prev_slice_map.insert(prev_batch.to_owned(), prev_idx); + self.next_slice_map.insert(end.to_owned(), prev_idx); + self.slices.insert(prev_idx, timeline_slice); + + // We reverse so our slice is oldest -> most recent + self.update_events_map_backward( + &resp.chunk, + prev_event_index, + prev_idx, + ) + } + (None, None) => {} + } + } + (Some(prev), None) => { + // TODO: is this an incomplete chunk do these have + // meanings like if there is no + // prev/next_batch token? + } + (None, Some(end)) => {} + (None, None) => todo!("problems"), + } + } + // the start token is the oldest events + // events are in chronological order + Direction::Forward => { + match (&resp.start, &resp.end) { + (Some(prev_batch), Some(end)) => { + let timeline_slice = + TimelineSlice::new(prev_batch.to_owned(), end.to_owned()); + + let old = self.next_slice_map.get(prev_batch); + let recent = self.next_slice_map.get(end); + match (old, recent) { + // We have the full chunk already + (Some(_), Some(_)) => {} + (Some(SliceIdx(idx)), None) => { + let next_idx = SliceIdx(idx + 1); + let last_event_index = self + .events + .slice_map + .get(&SliceIdx(*idx)) + .and_then(|map| map.keys().next()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .unwrap_or_default(); + + self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); + self.next_slice_map.insert(end.to_owned(), next_idx); + self.slices.insert(next_idx, timeline_slice); + + self.update_events_map_forward( + // TODO: + // make the methods more general or specific or + // is this hacky conversion ok? + &resp + .chunk + .iter() + .filter_map(|e| { + // Not ideal but all we need is the eventId + serde_json::from_str(e.json().get()).ok() + }) + .collect::>(), + last_event_index, + next_idx, + ); + } + // We have the recent token but not the older so fill + // backwards + (None, Some(SliceIdx(idx))) => { + let prev_idx = SliceIdx(idx - 1); + let prev_event_index = self + .events + .slice_map + .get(&SliceIdx(*idx)) + .and_then(|map| map.keys().next()) + .map(|e| { + let EventIdx(idx) = e; + *idx + }) + .unwrap_or_default(); + + self.prev_slice_map.insert(prev_batch.to_owned(), prev_idx); + self.next_slice_map.insert(end.to_owned(), prev_idx); + self.slices.insert(prev_idx, timeline_slice); + // We reverse so our slice is oldest -> most recent + self.update_events_map_backward( + &resp.chunk, + prev_event_index, + prev_idx, + ) + } + (None, None) => {} + } + } + (Some(prev), None) => {} + (None, Some(end)) => {} + (None, None) => todo!("problems"), + } + } + } + } + + fn update_events_map_forward( + &mut self, + content: &[AnySyncRoomEvent], + last_event_index: u128, + slice_idx: SliceIdx, + ) { + let mut index_event = BTreeMap::new(); + for (i, event) in content.iter().enumerate() { + let e_idx = EventIdx(last_event_index + ((i + 1) as u128)); + let e_id = event.event_id(); + + index_event.insert(e_idx, e_id.clone()); + self.events.event_index.insert(e_id.clone(), e_idx); + self.events.event_map.insert(e_id.clone(), slice_idx); + } + self.events.slice_map.insert(slice_idx, index_event); + } + + fn update_events_map_backward( + &mut self, + content: &[Raw], + last_event_index: u128, + slice_idx: SliceIdx, + ) { + let mut index_event = BTreeMap::new(); + // Reverse so that newer events have a smaller index from enumerate + for (i, event) in content + .iter() + // TODO: don't eat events or is this ok? + .filter_map(|e| e.deserialize().ok()) + .rev() + .enumerate() + { + let e_idx = EventIdx(last_event_index - ((i + 1) as u128)); + let e_id = event.event_id(); + + index_event.insert(e_idx, e_id.clone()); + self.events.event_index.insert(e_id.clone(), e_idx); + self.events.event_map.insert(e_id.clone(), slice_idx); + } + self.events.slice_map.insert(slice_idx, index_event); + } +} + +// TODO: do this in ruma? +trait EventIdExt { + fn event_id(&self) -> &EventId; +} + +impl EventIdExt for AnySyncRoomEvent { + fn event_id(&self) -> &EventId { + match self { + AnySyncRoomEvent::Message(ev) => ev.event_id(), + AnySyncRoomEvent::State(ev) => ev.event_id(), + AnySyncRoomEvent::RedactedMessage(ev) => ev.event_id(), + AnySyncRoomEvent::RedactedState(ev) => ev.event_id(), + } + } +} + +impl EventIdExt for AnyRoomEvent { + fn event_id(&self) -> &EventId { + match self { + AnyRoomEvent::Message(ev) => ev.event_id(), + AnyRoomEvent::State(ev) => ev.event_id(), + AnyRoomEvent::RedactedMessage(ev) => ev.event_id(), + AnyRoomEvent::RedactedState(ev) => ev.event_id(), + } + } +} + +#[cfg(test)] +mod test { + use matrix_sdk_common::{ + api::r0::{ + account::register::Request as RegistrationRequest, + directory::get_public_rooms_filtered::Request as PublicRoomsFilterRequest, + message::get_message_events::{Direction, Response as MessagesResponse}, + sync, + typing::create_typing_event::Typing, + uiaa::AuthData, + }, + assign, + directory::Filter, + events::{ + room::message::MessageEventContent, AnyMessageEventContent, AnyRoomEvent, + AnySyncRoomEvent, + }, + identifiers::{event_id, room_id, user_id}, + thirdparty, Raw, + }; + use matrix_sdk_test::{test_json, EventBuilder, EventsJson}; + use serde_json::json; + + use super::Timeline; + + #[test] + fn messages() { + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + + let messages = serde_json::from_value::>>( + test_json::ROOM_MESSAGES["chunk"].clone(), + ) + .unwrap(); + let sync = serde_json::from_value::>( + test_json::SYNC["rooms"]["join"][room_id.as_str()]["timeline"]["events"].clone(), + ) + .unwrap(); + + let mut timeline = Timeline::new(); + + timeline.handle_sync_timeline( + &room_id, + "t392-516_47314_0_7_1_1_1_11444_1", + "s526_47314_0_7_1_1_1_11444_1", + &sync, + ); + + let mut resp = MessagesResponse::new(); + resp.chunk = messages; + resp.start = Some("s526_47314_0_7_1_1_1_11444_1".to_owned()); + resp.end = Some("s_end__end".to_owned()); + + timeline.handle_messages_response(&room_id, &resp, Direction::Forward); + + println!("{:#?}", timeline); + // end: t3336-1714379051_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + // start: t3356-1714663804_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + + // end: t3316-1714212736_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + // start: t3336-1714379051_757284961_10998365_725145800_588037087_1999191_200821144_689020759_166049 + } +} diff --git a/matrix_sdk_crypto/Cargo.toml b/matrix_sdk_crypto/Cargo.toml index 5767be0956a..4077fe7da5f 100644 --- a/matrix_sdk_crypto/Cargo.toml +++ b/matrix_sdk_crypto/Cargo.toml @@ -22,7 +22,7 @@ docs = ["sled_cryptostore"] [dependencies] matrix-sdk-common = { version = "0.2.0", path = "../matrix_sdk_common" } -olm-rs = { version = "1.0.0", features = ["serde"] } +olm-rs = { version = "1.0.1", features = ["serde"] } getrandom = "0.2.2" serde = { version = "1.0.122", features = ["derive", "rc"] } serde_json = "1.0.61" From 172227ff7c57e28b170be5fa3f78d60a34e74d33 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Tue, 18 May 2021 07:46:15 -0400 Subject: [PATCH 2/2] fixup! store: Create Timeline struct to hold event chunks --- matrix_sdk_base/src/store/mod.rs | 100 +++++++++++++++++++------------ 1 file changed, 62 insertions(+), 38 deletions(-) diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index 74dc7e87675..35902b799ee 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -31,7 +31,8 @@ use matrix_sdk_common::{ async_trait, events::{ presence::PresenceEvent, room::member::MemberEventContent, AnyGlobalAccountDataEvent, - AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, + AnyRoomAccountDataEvent, AnyRoomEvent, AnyStrippedStateEvent, AnySyncRoomEvent, + AnySyncStateEvent, EventContent, EventType, }, identifiers::{EventId, RoomId, UserId}, locks::RwLock, @@ -482,10 +483,38 @@ pub type NextBatchToken = String; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SliceIdx(u128); +impl SliceIdx { + pub fn first() -> Self { + Self(u128::MAX / 2) + } + + pub fn from_prev(prev: u128) -> Self { + Self(prev + 1) + } + + pub fn from_post(prev: u128) -> Self { + Self(prev - 1) + } +} + /// The position of an event within a slice or chunk. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct EventIdx(u128); +impl EventIdx { + pub fn first() -> Self { + Self(u128::MAX / 2) + } + + pub fn from_prev(prev: u128) -> Self { + Self(prev + 1) + } + + pub fn from_post(prev: u128) -> Self { + Self(prev - 1) + } +} + /// Represents a chunk of events from either /sync or /messages. #[derive(Clone, Debug)] pub struct TimelineSlice { @@ -550,11 +579,12 @@ impl Timeline { room_id: &RoomId, prev_batch: &str, next_batch: &str, - content: &[AnySyncRoomEvent], + limited: bool, + events: &[AnySyncRoomEvent], ) { let timeline_slice = TimelineSlice::new(prev_batch.to_owned(), next_batch.to_owned()); - // The new `prev_batch` token is the `next_batch` token of previous /sync + // The new `prev_batch` token is the `next_batch` token of a previous /sync if let Some(SliceIdx(idx)) = self.next_slice_map.get(prev_batch) { let next_idx = SliceIdx(idx + 1); let last_event_index = self @@ -567,20 +597,20 @@ impl Timeline { let EventIdx(idx) = e; *idx }) - .unwrap_or_default(); + .expect("event map is in sync with timeline"); self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); self.next_slice_map.insert(next_batch.to_owned(), next_idx); self.slices.insert(next_idx, timeline_slice); - self.update_events_map_forward(content, last_event_index, next_idx); + self.update_events_map_forward(events, last_event_index, next_idx); } else if self.is_empty() { - let next_idx = SliceIdx(u128::MAX / 2); + let next_idx = SliceIdx::first(); self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); self.next_slice_map.insert(next_batch.to_owned(), next_idx); self.slices.insert(next_idx, timeline_slice); - self.update_events_map_forward(content, u128::MAX / 2, next_idx); + self.update_events_map_forward(events, u128::MAX / 2, next_idx); } else { todo!("hmmm we got a problem or a gap") } @@ -627,7 +657,7 @@ impl Timeline { let EventIdx(idx) = e; *idx }) - .unwrap_or_default(); + .expect("event map is in sync with timeline"); self.prev_slice_map.insert(prev_batch.to_owned(), prev_idx); self.next_slice_map.insert(end.to_owned(), prev_idx); @@ -676,7 +706,7 @@ impl Timeline { let EventIdx(idx) = e; *idx }) - .unwrap_or_default(); + .expect("event map is in sync with timeline"); self.prev_slice_map.insert(prev_batch.to_owned(), next_idx); self.next_slice_map.insert(end.to_owned(), next_idx); @@ -711,7 +741,7 @@ impl Timeline { let EventIdx(idx) = e; *idx }) - .unwrap_or_default(); + .expect("event map is in sync with timeline"); self.prev_slice_map.insert(prev_batch.to_owned(), prev_idx); self.next_slice_map.insert(end.to_owned(), prev_idx); @@ -728,7 +758,7 @@ impl Timeline { } (Some(prev), None) => {} (None, Some(end)) => {} - (None, None) => todo!("problems"), + (None, None) => todo!("can't move forward nor backward, timeline full"), } } } @@ -736,12 +766,12 @@ impl Timeline { fn update_events_map_forward( &mut self, - content: &[AnySyncRoomEvent], + events: &[AnySyncRoomEvent], last_event_index: u128, slice_idx: SliceIdx, ) { let mut index_event = BTreeMap::new(); - for (i, event) in content.iter().enumerate() { + for (i, event) in events.iter().enumerate() { let e_idx = EventIdx(last_event_index + ((i + 1) as u128)); let e_id = event.event_id(); @@ -754,13 +784,13 @@ impl Timeline { fn update_events_map_backward( &mut self, - content: &[Raw], + events: &[Raw], last_event_index: u128, slice_idx: SliceIdx, ) { let mut index_event = BTreeMap::new(); // Reverse so that newer events have a smaller index from enumerate - for (i, event) in content + for (i, event) in events .iter() // TODO: don't eat events or is this ok? .filter_map(|e| e.deserialize().ok()) @@ -808,25 +838,12 @@ impl EventIdExt for AnyRoomEvent { #[cfg(test)] mod test { use matrix_sdk_common::{ - api::r0::{ - account::register::Request as RegistrationRequest, - directory::get_public_rooms_filtered::Request as PublicRoomsFilterRequest, - message::get_message_events::{Direction, Response as MessagesResponse}, - sync, - typing::create_typing_event::Typing, - uiaa::AuthData, - }, - assign, - directory::Filter, - events::{ - room::message::MessageEventContent, AnyMessageEventContent, AnyRoomEvent, - AnySyncRoomEvent, - }, - identifiers::{event_id, room_id, user_id}, - thirdparty, Raw, + api::r0::message::get_message_events::{Direction, Response as MessagesResponse}, + events::{AnyRoomEvent, AnySyncRoomEvent}, + identifiers::room_id, + Raw, }; - use matrix_sdk_test::{test_json, EventBuilder, EventsJson}; - use serde_json::json; + use matrix_sdk_test::test_json; use super::Timeline; @@ -838,10 +855,11 @@ mod test { test_json::ROOM_MESSAGES["chunk"].clone(), ) .unwrap(); - let sync = serde_json::from_value::>( - test_json::SYNC["rooms"]["join"][room_id.as_str()]["timeline"]["events"].clone(), - ) - .unwrap(); + let sync = + serde_json::from_value::( + test_json::SYNC["rooms"]["join"][room_id.as_str()]["timeline"].clone(), + ) + .unwrap(); let mut timeline = Timeline::new(); @@ -849,7 +867,13 @@ mod test { &room_id, "t392-516_47314_0_7_1_1_1_11444_1", "s526_47314_0_7_1_1_1_11444_1", - &sync, + sync.limited, + &sync + .events + .into_iter() + .map(|e| e.deserialize()) + .collect::, _>>() + .unwrap(), ); let mut resp = MessagesResponse::new();