Skip to content

Commit 3c49480

Browse files
committed
Use StateStore to fulfill messages requests
1 parent 659825e commit 3c49480

File tree

2 files changed

+286
-19
lines changed

2 files changed

+286
-19
lines changed

matrix_sdk/src/client.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2731,6 +2731,7 @@ mod test {
27312731
},
27322732
media::get_content_thumbnail::Method,
27332733
membership::Invite3pidInit,
2734+
message::get_message_events::Direction,
27342735
session::get_login_types::LoginType,
27352736
uiaa::{AuthData, UiaaResponse},
27362737
},
@@ -3895,4 +3896,150 @@ mod test {
38953896

38963897
assert_eq!(client.whoami().await.unwrap().user_id, user_id);
38973898
}
3899+
3900+
#[tokio::test]
3901+
async fn messages() {
3902+
let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
3903+
let client = logged_in_client().await;
3904+
3905+
let _m = mock("GET", Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()))
3906+
.with_status(200)
3907+
.with_body(test_json::MORE_SYNC.to_string())
3908+
.match_header("authorization", "Bearer 1234")
3909+
.create();
3910+
3911+
let mocked_messages = mock(
3912+
"GET",
3913+
Matcher::Regex(
3914+
r"^/_matrix/client/r0/rooms/.*/messages.*from=t392-516_47314_0_7_1_1_1_11444_1.*"
3915+
.to_string(),
3916+
),
3917+
)
3918+
.with_status(200)
3919+
.with_body(test_json::SYNC_ROOM_MESSAGES_BATCH_1.to_string())
3920+
.match_header("authorization", "Bearer 1234")
3921+
.create();
3922+
3923+
let mocked_context =
3924+
mock("GET", Matcher::Regex(r"^/_matrix/client/r0/rooms/.*/context/.*".to_string()))
3925+
.with_status(200)
3926+
.with_body(test_json::CONTEXT_MESSAGE.to_string())
3927+
.match_header("authorization", "Bearer 1234")
3928+
.create();
3929+
3930+
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
3931+
3932+
let _ = client.sync_once(sync_settings).await.unwrap();
3933+
3934+
let room = client.get_joined_room(&room_id).unwrap();
3935+
3936+
// Try to get the timeline starting at an event not known to the store.
3937+
let events = room
3938+
.messages(&event_id!("$f3h4d129462ha:example.com"), None, 3, Direction::Backward)
3939+
.await
3940+
.unwrap()
3941+
.unwrap();
3942+
3943+
let expected_events = [
3944+
event_id!("$f3h4d129462ha:example.com"),
3945+
event_id!("$143273582443PhrSnbefore1:example.org"),
3946+
event_id!("$143273582443PhrSnbefore2:example.org"),
3947+
];
3948+
3949+
assert_eq!(events.len(), expected_events.len());
3950+
assert!(!events
3951+
.iter()
3952+
.map(|event| event.event_id())
3953+
.zip(&expected_events)
3954+
.any(|(a, b)| &a != b));
3955+
3956+
mocked_context.assert();
3957+
3958+
let expected_events = [
3959+
event_id!("$152037280074GZeOm:localhost"),
3960+
event_id!("$1444812213350496Caaaf:example.com"),
3961+
event_id!("$1444812213350496Cbbbf:example.com"),
3962+
event_id!("$1444812213350496Ccccf:example.com"),
3963+
];
3964+
3965+
let events = room
3966+
.messages(
3967+
&event_id!("$152037280074GZeOm:localhost"),
3968+
None,
3969+
expected_events.len(),
3970+
Direction::Backward,
3971+
)
3972+
.await
3973+
.unwrap()
3974+
.unwrap();
3975+
3976+
assert_eq!(events.len(), expected_events.len());
3977+
assert!(!events
3978+
.iter()
3979+
.map(|event| event.event_id())
3980+
.zip(&expected_events)
3981+
.any(|(a, b)| &a != b));
3982+
3983+
let events = room
3984+
.messages(
3985+
&event_id!("$1444812213350496Ccccf:example.com"),
3986+
None,
3987+
expected_events.len(),
3988+
Direction::Forward,
3989+
)
3990+
.await
3991+
.unwrap()
3992+
.unwrap();
3993+
3994+
assert_eq!(events.len(), expected_events.len());
3995+
assert!(!events
3996+
.iter()
3997+
.rev()
3998+
.map(|event| event.event_id())
3999+
.zip(&expected_events)
4000+
.any(|(a, b)| &a != b));
4001+
4002+
let end_event = event_id!("$1444812213350496Cbbbf:example.com");
4003+
let events = room
4004+
.messages(
4005+
&event_id!("$152037280074GZeOm:localhost"),
4006+
Some(&end_event),
4007+
expected_events.len(),
4008+
Direction::Backward,
4009+
)
4010+
.await
4011+
.unwrap()
4012+
.unwrap();
4013+
4014+
assert_eq!(events.len(), 3);
4015+
assert_eq!(events.last().unwrap().event_id(), end_event);
4016+
assert!(!events
4017+
.iter()
4018+
.map(|event| event.event_id())
4019+
.zip(&expected_events[0..4])
4020+
.any(|(a, b)| &a != b));
4021+
4022+
let end_event = event_id!("$1444812213350496Cbbbf:example.com");
4023+
let events = room
4024+
.messages(
4025+
&event_id!("$1444812213350496Ccccf:example.com"),
4026+
Some(&end_event),
4027+
expected_events.len(),
4028+
Direction::Forward,
4029+
)
4030+
.await
4031+
.unwrap()
4032+
.unwrap();
4033+
4034+
assert_eq!(events.len(), 2);
4035+
assert_eq!(events.last().unwrap().event_id(), end_event);
4036+
assert!(!events
4037+
.iter()
4038+
.rev()
4039+
.map(|event| event.event_id())
4040+
.zip(&expected_events[2..])
4041+
.any(|(a, b)| &a != b));
4042+
4043+
mocked_messages.assert();
4044+
}
38984045
}

matrix_sdk/src/room/common.rs

Lines changed: 139 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1-
use std::{ops::Deref, sync::Arc};
1+
use std::{cmp::min, convert::TryFrom, ops::Deref, sync::Arc};
22

3-
use matrix_sdk_base::deserialized_responses::MembersResponse;
3+
use matrix_sdk_base::deserialized_responses::{MembersResponse, SyncRoomEvent};
44
use matrix_sdk_common::locks::Mutex;
55
use ruma::{
66
api::client::r0::{
7+
context::get_context,
78
media::{get_content, get_content_thumbnail},
89
membership::{get_member_events, join_room_by_id, leave_room},
9-
message::get_message_events,
10+
message::{get_message_events, get_message_events::Direction},
1011
},
11-
UserId,
12+
events::AnyRoomEvent,
13+
serde::Raw,
14+
EventId, UserId,
1215
};
1316

14-
use crate::{BaseRoom, Client, Result, RoomMember};
17+
use crate::{BaseRoom, Client, Result, RoomMember, UInt};
1518

1619
/// A struct containing methods that are common for Joined, Invited and Left
1720
/// Rooms
@@ -110,43 +113,160 @@ impl Common {
110113
}
111114
}
112115

113-
/// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
114-
/// returns a `get_message_events::Response` that contains a chunk of
115-
/// room and state events (`AnyRoomEvent` and `AnyStateEvent`).
116+
/// Gets a slice of the timeline of this room
117+
///
118+
/// Returns a slice of the timeline between `start` and `end`, no longer
119+
/// then `limit`. If the number of events is fewer then `limit` it means
120+
/// that in the given direction no more events exist.
121+
/// If the timeline doesn't contain an event with the given `start` `None`
122+
/// is returned.
116123
///
117124
/// # Arguments
118125
///
119-
/// * `request` - The easiest way to create this request is using the
120-
/// `get_message_events::Request` itself.
126+
/// * `start` - An `EventId` that indicates the start of the slice.
127+
///
128+
/// * `end` - An `EventId` that indicates the end of the slice.
129+
///
130+
/// * `limit` - The maximum number of events that should be returned.
131+
///
132+
/// * `direction` - The direction of the search and returned events.
121133
///
122134
/// # Examples
123135
/// ```no_run
124136
/// # use std::convert::TryFrom;
125137
/// use matrix_sdk::Client;
126-
/// # use matrix_sdk::identifiers::room_id;
127-
/// # use matrix_sdk::api::r0::filter::RoomEventFilter;
128-
/// # use matrix_sdk::api::r0::message::get_message_events::Request as MessagesRequest;
138+
/// # use matrix_sdk::identifiers::{event_id, room_id};
139+
/// # use matrix_sdk::api::r0::message::get_message_events::Direction;
129140
/// # use url::Url;
130141
///
131142
/// # let homeserver = Url::parse("http://example.com").unwrap();
132143
/// let room_id = room_id!("!roomid:example.com");
133-
/// let request = MessagesRequest::backward(&room_id, "t47429-4392820_219380_26003_2265");
134144
///
135145
/// let mut client = Client::new(homeserver).unwrap();
136146
/// # let room = client
137147
/// # .get_joined_room(&room_id)
138148
/// # .unwrap();
139149
/// # use futures::executor::block_on;
140150
/// # block_on(async {
141-
/// assert!(room.messages(request).await.is_ok());
151+
/// assert!(room.messages(&event_id!("$xxxxxx:example.org"), None, 10, Direction::Backward).await.is_ok());
142152
/// # });
143153
/// ```
144154
pub async fn messages(
145155
&self,
146-
request: impl Into<get_message_events::Request<'_>>,
147-
) -> Result<get_message_events::Response> {
148-
let request = request.into();
149-
self.client.send(request, None).await
156+
start: &EventId,
157+
end: Option<&EventId>,
158+
limit: usize,
159+
direction: Direction,
160+
) -> Result<Option<Vec<SyncRoomEvent>>> {
161+
let room_id = self.inner.room_id();
162+
let events = if let Some(mut stored) = self
163+
.client
164+
.store()
165+
.get_timeline(room_id, Some(start), end, Some(limit), direction.clone())
166+
.await?
167+
{
168+
// We found a gab or the end of the stored timeline.
169+
if let Some(token) = stored.token {
170+
let mut request = get_message_events::Request::new(
171+
self.inner.room_id(),
172+
&token,
173+
direction.clone(),
174+
);
175+
request.limit =
176+
UInt::try_from((limit - stored.events.len()) as u64).unwrap_or(UInt::MAX);
177+
178+
let response = self.client.send(request, None).await?;
179+
180+
// FIXME: we may recevied an invalied server response that ruma considers valid
181+
// See https://github.com/ruma/ruma/issues/644
182+
if response.end.is_none() && response.start.is_none() {
183+
return Ok(Some(stored.events));
184+
}
185+
186+
let response_events = self
187+
.client
188+
.base_client
189+
.receive_messages(room_id, &direction, &response)
190+
.await?;
191+
192+
let mut response_events = if let Some(end) = end {
193+
if let Some(position) =
194+
response_events.iter().position(|event| &event.event_id() == end)
195+
{
196+
response_events.into_iter().take(position + 1).collect()
197+
} else {
198+
response_events
199+
}
200+
} else {
201+
response_events
202+
};
203+
204+
match direction {
205+
Direction::Forward => {
206+
response_events.append(&mut stored.events);
207+
stored.events = response_events;
208+
}
209+
Direction::Backward => stored.events.append(&mut response_events),
210+
}
211+
}
212+
stored.events
213+
} else {
214+
// Fallback to context API because we don't know the start event
215+
let mut request = get_context::Request::new(room_id, start);
216+
217+
// We need to take limit twice because the context api returns events before
218+
// and after the given event
219+
request.limit = UInt::try_from((limit * 2) as u64).unwrap_or(UInt::MAX);
220+
221+
let mut context = self.client.send(request, None).await?;
222+
223+
let event = if let Some(event) = context.event {
224+
event
225+
} else {
226+
return Ok(None);
227+
};
228+
229+
let mut response = get_message_events::Response::new();
230+
response.start = context.start;
231+
response.end = context.end;
232+
let before_length = context.events_before.len();
233+
let after_length = context.events_after.len();
234+
let mut events: Vec<Raw<AnyRoomEvent>> =
235+
context.events_after.into_iter().rev().collect();
236+
events.push(event);
237+
events.append(&mut context.events_before);
238+
response.chunk = events;
239+
response.state = context.state;
240+
let response_events = self
241+
.client
242+
.base_client
243+
.receive_messages(room_id, &Direction::Backward, &response)
244+
.await?;
245+
246+
let response_events: Vec<SyncRoomEvent> = match direction {
247+
Direction::Forward => {
248+
let lower_bound = if before_length > limit { before_length - limit } else { 0 };
249+
response_events[lower_bound..=before_length].to_vec()
250+
}
251+
Direction::Backward => response_events
252+
[after_length..min(response_events.len(), after_length + limit)]
253+
.to_vec(),
254+
};
255+
256+
if let Some(end) = end {
257+
if let Some(position) =
258+
response_events.iter().position(|event| &event.event_id() == end)
259+
{
260+
response_events.into_iter().take(position + 1).collect()
261+
} else {
262+
response_events
263+
}
264+
} else {
265+
response_events
266+
}
267+
};
268+
269+
Ok(Some(events))
150270
}
151271

152272
pub(crate) async fn request_members(&self) -> Result<Option<MembersResponse>> {

0 commit comments

Comments
 (0)