Skip to content

Add two streams to get the timeline of a room and store timeline to SledStore #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/matrix-sdk-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ indexeddb_state_store = ["indexed_db_futures", "wasm-bindgen", "pbkdf2", "hmac",
indexeddb_cryptostore = ["matrix-sdk-crypto/indexeddb_cryptostore"]

[dependencies]
async-stream = "0.3.2"
chacha20poly1305 = { version = "0.9.0", optional = true }
dashmap = "4.0.2"
futures-core = "0.3.15"
futures-util = { version = "0.3.15", default-features = false }
futures-channel = "0.3.15"
hmac = { version = "0.12.0", optional = true }
lru = "0.7.2"
matrix-sdk-common = { version = "0.4.0", path = "../matrix-sdk-common" }
Expand Down
34 changes: 33 additions & 1 deletion crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use matrix_sdk_common::locks::Mutex;
use matrix_sdk_common::{
deserialized_responses::{
AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms,
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline,
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice,
},
instant::Instant,
locks::RwLock,
Expand Down Expand Up @@ -848,6 +848,16 @@ impl BaseClient {
let notification_count = new_info.unread_notifications.into();
room_info.update_notification_count(notification_count);

let timeline_slice = TimelineSlice::new(
timeline.events.clone(),
next_batch.clone(),
timeline.prev_batch.clone(),
timeline.limited,
true,
);

changes.add_timeline(&room_id, timeline_slice);

new_rooms.join.insert(
room_id,
JoinedRoom::new(
Expand Down Expand Up @@ -958,11 +968,33 @@ impl BaseClient {
room.update_summary(room_info.clone())
}
}

for (room_id, room_info) in &changes.stripped_room_infos {
if let Some(room) = self.store.get_stripped_room(room_id) {
room.update_summary(room_info.clone())
}
}

for (room_id, timeline_slice) in &changes.timeline {
if let Some(room) = self.store.get_room(room_id) {
room.add_timeline_slice(timeline_slice).await;
}
}
}

/// Receive a timeline slice obtained from a messages request.
///
/// You should pass only slices requested from the store to this function.
///
/// * `timeline` - The `TimelineSlice`
pub async fn receive_messages(&self, room_id: &RoomId, timeline: TimelineSlice) -> Result<()> {
let mut changes = StateChanges::default();

changes.add_timeline(room_id, timeline);

self.store().save_changes(&changes).await?;

Ok(())
}

/// Receive a get member events response and convert it to a deserialized
Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub use matrix_sdk_common::*;
pub use crate::{
error::{Error, Result},
session::Session,
timeline_stream::TimelineStreamError,
};

mod client;
Expand All @@ -47,6 +48,7 @@ pub mod media;
mod rooms;
mod session;
mod store;
mod timeline_stream;

pub use client::{BaseClient, BaseClientConfig};
#[cfg(feature = "encryption")]
Expand Down
119 changes: 117 additions & 2 deletions crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

use std::sync::{Arc, RwLock as SyncRwLock};

use dashmap::DashSet;
use futures_channel::mpsc;
use futures_core::stream::Stream;
use futures_util::stream::{self, StreamExt};
use matrix_sdk_common::locks::Mutex;
use ruma::{
api::client::r0::sync::sync_events::RoomSummary as RumaSummary,
events::{
Expand All @@ -34,12 +38,13 @@ use ruma::{
EventId, MxcUri, RoomAliasId, RoomId, UserId,
};
use serde::{Deserialize, Serialize};
use tracing::debug;
use tracing::{debug, warn};

use super::{BaseRoomInfo, RoomMember};
use crate::{
deserialized_responses::UnreadNotificationsCount,
deserialized_responses::{SyncRoomEvent, TimelineSlice, UnreadNotificationsCount},
store::{Result as StoreResult, StateStore},
timeline_stream::{TimelineStreamBackward, TimelineStreamError, TimelineStreamForward},
};

/// The underlying room data structure collecting state for joined, left and
Expand All @@ -50,6 +55,8 @@ pub struct Room {
own_user_id: Arc<UserId>,
inner: Arc<SyncRwLock<RoomInfo>>,
store: Arc<dyn StateStore>,
forward_timeline_streams: Arc<Mutex<Vec<mpsc::Sender<TimelineSlice>>>>,
backward_timeline_streams: Arc<Mutex<Vec<mpsc::Sender<TimelineSlice>>>>,
}

/// The room summary containing member counts and members that should be used to
Expand Down Expand Up @@ -107,6 +114,8 @@ impl Room {
room_id: room_info.room_id.clone(),
store,
inner: Arc::new(SyncRwLock::new(room_info)),
forward_timeline_streams: Default::default(),
backward_timeline_streams: Default::default(),
}
}

Expand Down Expand Up @@ -467,6 +476,112 @@ impl Room {
) -> StoreResult<Vec<(Box<UserId>, Receipt)>> {
self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await
}

/// Get two stream into the timeline.
/// First one is forward in time and the second one is backward in time.
pub async fn timeline(
&self,
) -> StoreResult<(
impl Stream<Item = SyncRoomEvent>,
impl Stream<Item = Result<SyncRoomEvent, TimelineStreamError>>,
)> {
// We need to hold the lock while we create the stream so that we don't lose new
// sync responses
let mut forward_timeline_streams = self.forward_timeline_streams.lock().await;
let mut backward_timeline_streams = self.backward_timeline_streams.lock().await;
let sync_token = self.store.get_sync_token().await?;
let event_ids = Arc::new(DashSet::new());

let (backward_stream, backward_sender) = if let Some((stored_events, end_token)) =
self.store.room_timeline(&self.room_id).await?
{
TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events))
} else {
TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None)
};

backward_timeline_streams.push(backward_sender);

let (forward_stream, forward_sender) = TimelineStreamForward::new(event_ids);
forward_timeline_streams.push(forward_sender);

Ok((forward_stream, backward_stream))
}

/// Create a stream that returns all events of the room's timeline forward
/// in time.
///
/// If you need also a backward stream you should use
/// [`timeline`][`crate::Room::timeline`]
pub async fn timeline_forward(&self) -> StoreResult<impl Stream<Item = SyncRoomEvent>> {
let mut forward_timeline_streams = self.forward_timeline_streams.lock().await;
let event_ids = Arc::new(DashSet::new());

let (forward_stream, forward_sender) = TimelineStreamForward::new(event_ids);
forward_timeline_streams.push(forward_sender);

Ok(forward_stream)
}

/// Create a stream that returns all events of the room's timeline backward
/// in time.
///
/// If you need also a forward stream you should use
/// [`timeline`][`crate::Room::timeline`]
pub async fn timeline_backward(
&self,
) -> StoreResult<impl Stream<Item = Result<SyncRoomEvent, TimelineStreamError>>> {
let mut backward_timeline_streams = self.backward_timeline_streams.lock().await;
let sync_token = self.store.get_sync_token().await?;
let event_ids = Arc::new(DashSet::new());

let (backward_stream, backward_sender) = if let Some((stored_events, end_token)) =
self.store.room_timeline(&self.room_id).await?
{
TimelineStreamBackward::new(event_ids.clone(), end_token, Some(stored_events))
} else {
TimelineStreamBackward::new(event_ids.clone(), Some(sync_token.clone().unwrap()), None)
};

backward_timeline_streams.push(backward_sender);

Ok(backward_stream)
}

/// Add a new timeline slice to the timeline streams.
pub async fn add_timeline_slice(&self, timeline: &TimelineSlice) {
if timeline.sync {
let mut streams = self.forward_timeline_streams.lock().await;
let mut remaining_streams = Vec::with_capacity(streams.len());
while let Some(mut forward) = streams.pop() {
if !forward.is_closed() {
if let Err(error) = forward.try_send(timeline.clone()) {
if error.is_full() {
warn!("Drop timeline slice because the limit of the buffer for the forward stream is reached");
}
} else {
remaining_streams.push(forward);
}
}
}
*streams = remaining_streams;
} else {
let mut streams = self.backward_timeline_streams.lock().await;
let mut remaining_streams = Vec::with_capacity(streams.len());
while let Some(mut backward) = streams.pop() {
if !backward.is_closed() {
if let Err(error) = backward.try_send(timeline.clone()) {
if error.is_full() {
warn!("Drop timeline slice because the limit of the buffer for the backward stream is reached");
}
} else {
remaining_streams.push(backward);
}
}
}
*streams = remaining_streams;
}
}
}

/// The underlying pure data structure for joined and left rooms.
Expand Down
Loading