@@ -4,9 +4,19 @@ macro_rules! statestore_integration_tests {
4
4
( $( $name: ident) * ) => {
5
5
$(
6
6
mod $name {
7
+
8
+ use futures_util:: StreamExt ;
9
+ use http:: Response ;
7
10
use matrix_sdk_test:: { async_test, test_json} ;
8
11
use ruma:: {
9
- api:: client:: r0:: media:: get_content_thumbnail:: Method ,
12
+ api:: {
13
+ client:: r0:: {
14
+ media:: get_content_thumbnail:: Method ,
15
+ message:: get_message_events:: Response as MessageResponse ,
16
+ sync:: sync_events:: Response as SyncResponse ,
17
+ } ,
18
+ IncomingResponse ,
19
+ } ,
10
20
device_id, event_id,
11
21
events:: {
12
22
presence:: PresenceEvent , EventContent ,
@@ -30,7 +40,7 @@ macro_rules! statestore_integration_tests {
30
40
31
41
use crate :: {
32
42
RoomType , Session ,
33
- deserialized_responses:: { MemberEvent , StrippedMemberEvent } ,
43
+ deserialized_responses:: { MemberEvent , StrippedMemberEvent , RoomEvent , SyncRoomEvent , TimelineSlice } ,
34
44
media:: { MediaFormat , MediaRequest , MediaThumbnailSize , MediaType } ,
35
45
store:: {
36
46
Store ,
@@ -519,7 +529,155 @@ macro_rules! statestore_integration_tests {
519
529
assert_eq!( store. get_stripped_room_infos( ) . await ?. len( ) , 0 ) ;
520
530
Ok ( ( ) )
521
531
}
532
+
533
+ #[ async_test]
534
+ async fn test_room_timeline( ) {
535
+ let store = get_store( ) . await . unwrap( ) ;
536
+ let mut stored_events = Vec :: new( ) ;
537
+ let room_id = room_id!( "!SVkFJHzfwvuaIEawgC:localhost" ) ;
538
+
539
+ // Before the first sync the timeline should be empty
540
+ assert!( store. room_timeline( room_id) . await . unwrap( ) . is_none( ) ) ;
541
+
542
+ // Add sync response
543
+ let sync = SyncResponse :: try_from_http_response(
544
+ Response :: builder( ) . body( serde_json:: to_vec( & * test_json:: MORE_SYNC ) . unwrap( ) ) . unwrap( ) ,
545
+ )
546
+ . unwrap( ) ;
547
+
548
+ let timeline = & sync. rooms. join[ room_id] . timeline;
549
+ let events: Vec <SyncRoomEvent > = timeline. events. iter( ) . cloned( ) . map( Into :: into) . collect( ) ;
550
+
551
+ stored_events. append( & mut events. clone( ) ) ;
552
+
553
+ let timeline_slice = TimelineSlice :: new(
554
+ events,
555
+ sync. next_batch. clone( ) ,
556
+ timeline. prev_batch. clone( ) ,
557
+ false ,
558
+ true ,
559
+ ) ;
560
+ let mut changes = StateChanges :: new( sync. next_batch. clone( ) ) ;
561
+ changes. add_timeline( room_id, timeline_slice) ;
562
+ store. save_changes( & changes) . await . unwrap( ) ;
563
+
564
+ check_timeline_events( room_id, & store, & stored_events, timeline. prev_batch. as_deref( ) )
565
+ . await ;
566
+
567
+ // Add message response
568
+ let messages = MessageResponse :: try_from_http_response(
569
+ Response :: builder( )
570
+ . body( serde_json:: to_vec( & * test_json:: SYNC_ROOM_MESSAGES_BATCH_1 ) . unwrap( ) )
571
+ . unwrap( ) ,
572
+ )
573
+ . unwrap( ) ;
574
+
575
+ let events: Vec <SyncRoomEvent > = messages
576
+ . chunk
577
+ . iter( )
578
+ . cloned( )
579
+ . map( |event| RoomEvent { event, encryption_info: None } . into( ) )
580
+ . collect( ) ;
581
+
582
+ stored_events. append( & mut events. clone( ) ) ;
583
+
584
+ let timeline_slice =
585
+ TimelineSlice :: new( events, messages. start. clone( ) , messages. end. clone( ) , false , false ) ;
586
+ let mut changes = StateChanges :: default ( ) ;
587
+ changes. add_timeline( room_id, timeline_slice) ;
588
+ store. save_changes( & changes) . await . unwrap( ) ;
589
+
590
+ check_timeline_events( room_id, & store, & stored_events, messages. end. as_deref( ) ) . await ;
591
+
592
+ // Add second message response
593
+ let messages = MessageResponse :: try_from_http_response(
594
+ Response :: builder( )
595
+ . body( serde_json:: to_vec( & * test_json:: SYNC_ROOM_MESSAGES_BATCH_2 ) . unwrap( ) )
596
+ . unwrap( ) ,
597
+ )
598
+ . unwrap( ) ;
599
+
600
+ let events: Vec <SyncRoomEvent > = messages
601
+ . chunk
602
+ . iter( )
603
+ . cloned( )
604
+ . map( |event| RoomEvent { event, encryption_info: None } . into( ) )
605
+ . collect( ) ;
606
+
607
+ stored_events. append( & mut events. clone( ) ) ;
608
+
609
+ let timeline_slice =
610
+ TimelineSlice :: new( events, messages. start. clone( ) , messages. end. clone( ) , false , false ) ;
611
+ let mut changes = StateChanges :: default ( ) ;
612
+ changes. add_timeline( room_id, timeline_slice) ;
613
+ store. save_changes( & changes) . await . unwrap( ) ;
614
+
615
+ check_timeline_events( room_id, & store, & stored_events, messages. end. as_deref( ) ) . await ;
616
+
617
+ // Add second sync response
618
+ let sync = SyncResponse :: try_from_http_response(
619
+ Response :: builder( )
620
+ . body( serde_json:: to_vec( & * test_json:: MORE_SYNC_2 ) . unwrap( ) )
621
+ . unwrap( ) ,
622
+ )
623
+ . unwrap( ) ;
624
+
625
+ let timeline = & sync. rooms. join[ room_id] . timeline;
626
+ let events: Vec <SyncRoomEvent > = timeline. events. iter( ) . cloned( ) . map( Into :: into) . collect( ) ;
627
+
628
+ let mut prev_stored_events = stored_events;
629
+ stored_events = events. clone( ) ;
630
+ stored_events. append( & mut prev_stored_events) ;
631
+
632
+ let timeline_slice = TimelineSlice :: new(
633
+ events,
634
+ sync. next_batch. clone( ) ,
635
+ timeline. prev_batch. clone( ) ,
636
+ false ,
637
+ true ,
638
+ ) ;
639
+ let mut changes = StateChanges :: new( sync. next_batch. clone( ) ) ;
640
+ changes. add_timeline( room_id, timeline_slice) ;
641
+ store. save_changes( & changes) . await . unwrap( ) ;
642
+
643
+ check_timeline_events( room_id, & store, & stored_events, messages. end. as_deref( ) ) . await ;
644
+
645
+ // Check if limited sync removes the stored timeline
646
+ let end_token = Some ( "end token" . to_string( ) ) ;
647
+ let timeline_slice = TimelineSlice :: new(
648
+ Vec :: new( ) ,
649
+ "start token" . to_string( ) ,
650
+ end_token. clone( ) ,
651
+ true ,
652
+ true ,
653
+ ) ;
654
+ let mut changes = StateChanges :: default ( ) ;
655
+ changes. add_timeline( room_id, timeline_slice) ;
656
+ store. save_changes( & changes) . await . unwrap( ) ;
657
+
658
+ check_timeline_events( room_id, & store, & Vec :: new( ) , end_token. as_deref( ) ) . await ;
659
+ }
660
+
661
+ async fn check_timeline_events(
662
+ room_id: & RoomId ,
663
+ store: & dyn StateStore ,
664
+ stored_events: & [ SyncRoomEvent ] ,
665
+ expected_end_token: Option <& str >,
666
+ ) {
667
+ let ( timeline_iter, end_token) = store. room_timeline( room_id) . await . unwrap( ) . unwrap( ) ;
668
+
669
+ assert_eq!( end_token. as_deref( ) , expected_end_token) ;
670
+
671
+ let timeline = timeline_iter. collect:: <Vec <Result <SyncRoomEvent >>>( ) . await ;
672
+
673
+ assert!( timeline
674
+ . into_iter( )
675
+ . zip( stored_events. iter( ) )
676
+ . all( |( a, b) | a. unwrap( ) . event_id( ) == b. event_id( ) ) ) ;
677
+ }
678
+
522
679
}
680
+
523
681
) *
524
682
}
525
683
}
0 commit comments