@@ -52,7 +52,7 @@ impl SendingQueue {
52
52
}
53
53
54
54
fn for_room ( & self , room : Room ) -> RoomSendingQueue {
55
- let data = & self . client . inner . sending_queue ;
55
+ let data = & self . client . inner . sending_queue_data ;
56
56
57
57
let mut map = data. rooms . write ( ) . unwrap ( ) ;
58
58
@@ -63,31 +63,31 @@ impl SendingQueue {
63
63
64
64
let owned_room_id = room_id. to_owned ( ) ;
65
65
let room_q = RoomSendingQueue :: new (
66
- data. enabled . clone ( ) ,
67
- data. shutting_down . clone ( ) ,
66
+ data. globally_enabled . clone ( ) ,
67
+ data. is_dropping . clone ( ) ,
68
68
& self . client ,
69
69
owned_room_id. clone ( ) ,
70
70
) ;
71
71
map. insert ( owned_room_id, room_q. clone ( ) ) ;
72
72
room_q
73
73
}
74
74
75
- /// Enables the sending queue for the entire client, i.e. all rooms.
75
+ /// Enable the sending queue for the entire client, i.e. all rooms.
76
76
///
77
- /// This may wake up backgrounds tasks and resume sending of events in the
77
+ /// This may wake up background tasks and resume sending of events in the
78
78
/// background.
79
79
pub fn enable ( & self ) {
80
- if self . client . inner . sending_queue . enabled . set_if_not_eq ( true ) . is_some ( ) {
80
+ if self . client . inner . sending_queue_data . globally_enabled . set_if_not_eq ( true ) . is_some ( ) {
81
81
debug ! ( "globally enabling sending queue" ) ;
82
- let rooms = self . client . inner . sending_queue . rooms . read ( ) . unwrap ( ) ;
82
+ let rooms = self . client . inner . sending_queue_data . rooms . read ( ) . unwrap ( ) ;
83
83
// Wake up the rooms, in case events have been queued in the meanwhile.
84
84
for room in rooms. values ( ) {
85
85
room. inner . notifier . notify_one ( ) ;
86
86
}
87
87
}
88
88
}
89
89
90
- /// Disables the sending queue for the entire client, i.e. all rooms.
90
+ /// Disable the sending queue for the entire client, i.e. all rooms.
91
91
///
92
92
/// If requests were being sent, they're not aborted, and will continue
93
93
/// until a status resolves (error responses will keep the events in the
@@ -101,19 +101,19 @@ impl SendingQueue {
101
101
// - or they were not, and it's not worth it waking them to let them they're
102
102
// disabled, which causes them to go to sleep again.
103
103
debug ! ( "globally disabling sending queue" ) ;
104
- self . client . inner . sending_queue . enabled . set ( false ) ;
104
+ self . client . inner . sending_queue_data . globally_enabled . set ( false ) ;
105
105
}
106
106
107
107
/// Returns whether the sending queue is enabled, at a client-wide
108
108
/// granularity.
109
109
pub fn is_enabled ( & self ) -> bool {
110
- self . client . inner . sending_queue . enabled . get ( )
110
+ self . client . inner . sending_queue_data . globally_enabled . get ( )
111
111
}
112
112
113
113
/// A subscriber to the enablement status (enabled or disabled) of the
114
114
/// sending queue.
115
115
pub fn subscribe_status ( & self ) -> Subscriber < bool > {
116
- self . client . inner . sending_queue . enabled . subscribe ( )
116
+ self . client . inner . sending_queue_data . globally_enabled . subscribe ( )
117
117
}
118
118
}
119
119
@@ -130,19 +130,19 @@ pub(super) struct SendingQueueData {
130
130
rooms : SyncRwLock < BTreeMap < OwnedRoomId , RoomSendingQueue > > ,
131
131
132
132
/// Is the whole mechanism enabled or disabled?
133
- enabled : SharedObservable < bool > ,
133
+ globally_enabled : SharedObservable < bool > ,
134
134
135
- /// Are we shutting down the entire queue ?
136
- shutting_down : Arc < AtomicBool > ,
135
+ /// Are we currently dropping the Client ?
136
+ is_dropping : Arc < AtomicBool > ,
137
137
}
138
138
139
139
impl SendingQueueData {
140
140
/// Create the data for a sending queue, in the given enabled state.
141
- pub fn new ( enabled : bool ) -> Self {
141
+ pub fn new ( globally_enabled : bool ) -> Self {
142
142
Self {
143
143
rooms : Default :: default ( ) ,
144
- enabled : SharedObservable :: new ( enabled ) ,
145
- shutting_down : Arc :: new ( false . into ( ) ) ,
144
+ globally_enabled : SharedObservable :: new ( globally_enabled ) ,
145
+ is_dropping : Arc :: new ( false . into ( ) ) ,
146
146
}
147
147
}
148
148
}
@@ -151,8 +151,8 @@ impl Drop for SendingQueueData {
151
151
fn drop ( & mut self ) {
152
152
// Mark the whole sending queue as shutting down, then wake up all the room
153
153
// queues so they're stopped too.
154
- debug ! ( "globally shutting down the sending queue" ) ;
155
- self . shutting_down . store ( true , Ordering :: SeqCst ) ;
154
+ debug ! ( "globally dropping the sending queue" ) ;
155
+ self . is_dropping . store ( true , Ordering :: SeqCst ) ;
156
156
157
157
let rooms = self . rooms . read ( ) . unwrap ( ) ;
158
158
for room in rooms. values ( ) {
@@ -184,14 +184,14 @@ impl std::fmt::Debug for RoomSendingQueue {
184
184
185
185
impl RoomSendingQueue {
186
186
fn new (
187
- enabled : SharedObservable < bool > ,
188
- shutting_down : Arc < AtomicBool > ,
187
+ globally_enabled : SharedObservable < bool > ,
188
+ is_dropping : Arc < AtomicBool > ,
189
189
client : & Client ,
190
190
room_id : OwnedRoomId ,
191
191
) -> Self {
192
192
let ( updates_sender, _) = broadcast:: channel ( 32 ) ;
193
193
194
- let queue = SharedQueue :: new ( ) ;
194
+ let queue = QueueStorage :: new ( ) ;
195
195
let notifier = Arc :: new ( Notify :: new ( ) ) ;
196
196
197
197
let weak_room = WeakRoom :: new ( WeakClient :: from_client ( client) , room_id) ;
@@ -201,8 +201,8 @@ impl RoomSendingQueue {
201
201
queue. clone ( ) ,
202
202
notifier. clone ( ) ,
203
203
updates_sender. clone ( ) ,
204
- enabled ,
205
- shutting_down ,
204
+ globally_enabled ,
205
+ is_dropping ,
206
206
) ) ;
207
207
208
208
Self {
@@ -242,6 +242,7 @@ impl RoomSendingQueue {
242
242
243
243
let transaction_id = self . inner . queue . push ( content. clone ( ) ) . await ;
244
244
trace ! ( %transaction_id, "manager sends an event to the background task" ) ;
245
+
245
246
self . inner . notifier . notify_one ( ) ;
246
247
247
248
let _ = self . inner . updates . send ( RoomSendingQueueUpdate :: NewLocalEvent ( LocalEcho {
@@ -261,29 +262,29 @@ impl RoomSendingQueue {
261
262
#[ instrument( skip_all, fields( room_id = %room. room_id( ) ) ) ]
262
263
async fn sending_task (
263
264
room : WeakRoom ,
264
- queue : SharedQueue ,
265
+ queue : QueueStorage ,
265
266
notifier : Arc < Notify > ,
266
267
updates : broadcast:: Sender < RoomSendingQueueUpdate > ,
267
- enabled : SharedObservable < bool > ,
268
- shutting_down : Arc < AtomicBool > ,
268
+ globally_enabled : SharedObservable < bool > ,
269
+ is_dropping : Arc < AtomicBool > ,
269
270
) {
270
271
info ! ( "spawned the sending task" ) ;
271
272
272
273
loop {
273
274
// A request to shut down should be preferred above everything else.
274
- if shutting_down . load ( Ordering :: SeqCst ) {
275
+ if is_dropping . load ( Ordering :: SeqCst ) {
275
276
trace ! ( "shutting down!" ) ;
276
277
break ;
277
278
}
278
279
279
- if !enabled . get ( ) {
280
+ if !globally_enabled . get ( ) {
280
281
trace ! ( "not enabled, sleeping" ) ;
281
282
// Wait for an explicit wakeup.
282
283
notifier. notified ( ) . await ;
283
284
continue ;
284
285
}
285
286
286
- let Some ( queued_event) = queue. pop_next_to_send ( ) . await else {
287
+ let Some ( queued_event) = queue. peek_next_to_send ( ) . await else {
287
288
trace ! ( "queue is empty, sleeping" ) ;
288
289
// Wait for an explicit wakeup.
289
290
notifier. notified ( ) . await ;
@@ -293,7 +294,7 @@ impl RoomSendingQueue {
293
294
trace ! ( "received an event to send!" ) ;
294
295
295
296
let Some ( room) = room. get ( ) else {
296
- if shutting_down . load ( Ordering :: SeqCst ) {
297
+ if is_dropping . load ( Ordering :: SeqCst ) {
297
298
break ;
298
299
}
299
300
error ! ( "the weak room couldn't be upgraded but we're not shutting down?" ) ;
@@ -322,7 +323,7 @@ impl RoomSendingQueue {
322
323
323
324
// Disable the queue after an error.
324
325
// See comment in [`SendingQueue::disable()`].
325
- enabled . set ( false ) ;
326
+ globally_enabled . set ( false ) ;
326
327
327
328
// In this case, we intentionally keep the event in the queue, but mark it as
328
329
// not being sent anymore.
@@ -360,10 +361,10 @@ struct RoomSendingQueueInner {
360
361
/// content / deleting entries, all that will be required will be to
361
362
/// manipulate the on-disk storage. In other words, the storage will become
362
363
/// the one source of truth.
363
- queue : SharedQueue ,
364
+ queue : QueueStorage ,
364
365
365
366
/// A notifier that's updated any time common data is touched (stopped or
366
- /// enabled statuses), or the associated room [`SharedQueue `].
367
+ /// enabled statuses), or the associated room [`QueueStorage `].
367
368
notifier : Arc < Notify > ,
368
369
369
370
/// Handle to the actual sending task. Unused, but kept alive along this
@@ -375,13 +376,18 @@ struct RoomSendingQueueInner {
375
376
struct QueuedEvent {
376
377
event : AnyMessageLikeEventContent ,
377
378
transaction_id : OwnedTransactionId ,
379
+
380
+ /// Flag to indicate if an event has been scheduled for sending.
381
+ ///
382
+ /// Useful to indicate if cancelling could happen or if it was too late and
383
+ /// the event had already been sent.
378
384
is_being_sent : bool ,
379
385
}
380
386
381
387
#[ derive( Clone ) ]
382
- struct SharedQueue ( Arc < RwLock < VecDeque < QueuedEvent > > > ) ;
388
+ struct QueueStorage ( Arc < RwLock < VecDeque < QueuedEvent > > > ) ;
383
389
384
- impl SharedQueue {
390
+ impl QueueStorage {
385
391
/// Create a new synchronized queue for queuing events to be sent later.
386
392
fn new ( ) -> Self {
387
393
Self ( Arc :: new ( RwLock :: new ( VecDeque :: with_capacity ( 16 ) ) ) )
@@ -402,21 +408,23 @@ impl SharedQueue {
402
408
transaction_id
403
409
}
404
410
405
- /// Pops the next event to be sent, marking it as being sent.
411
+ /// Peeks the next event to be sent, marking it as being sent.
406
412
///
407
413
/// It is required to call [`Self::mark_as_sent`] after it's been
408
414
/// effectively sent.
409
- async fn pop_next_to_send ( & self ) -> Option < QueuedEvent > {
415
+ async fn peek_next_to_send ( & self ) -> Option < QueuedEvent > {
410
416
let mut q = self . 0 . write ( ) . await ;
411
417
if let Some ( event) = q. front_mut ( ) {
418
+ // TODO: This flag should probably live in memory when we have an actual
419
+ // storage.
412
420
event. is_being_sent = true ;
413
421
Some ( event. clone ( ) )
414
422
} else {
415
423
None
416
424
}
417
425
}
418
426
419
- /// Marks an event popped with [`Self::pop_next_to_send `] and identified
427
+ /// Marks an event popped with [`Self::peek_next_to_send `] and identified
420
428
/// with the given transaction id as not being sent anymore, so it can
421
429
/// be removed from the queue later.
422
430
async fn mark_as_not_being_sent ( & self , transaction_id : & TransactionId ) {
@@ -432,7 +440,11 @@ impl SharedQueue {
432
440
/// transaction id as sent by removing it from the local queue.
433
441
async fn mark_as_sent ( & self , transaction_id : & TransactionId ) {
434
442
let mut q = self . 0 . write ( ) . await ;
435
- q. retain ( |item| item. transaction_id != transaction_id) ;
443
+ if let Some ( index) = q. iter ( ) . position ( |item| item. transaction_id == transaction_id) {
444
+ q. remove ( index) ;
445
+ } else {
446
+ warn ! ( "couldn't find item to mark as sent with transaction id {transaction_id}" ) ;
447
+ }
436
448
}
437
449
438
450
/// Cancel a sending command for an event that has been sent with
0 commit comments