@@ -27,7 +27,7 @@ use matrix_sdk::{
27
27
Room ,
28
28
} ;
29
29
use matrix_sdk_base:: RoomState ;
30
- use ruma:: { events:: AnyMessageLikeEventContent , OwnedTransactionId } ;
30
+ use ruma:: { events:: AnyMessageLikeEventContent , OwnedEventId , OwnedTransactionId } ;
31
31
use tokio:: { select, sync:: mpsc:: Receiver } ;
32
32
use tracing:: { debug, error, info, instrument, trace, warn} ;
33
33
@@ -57,13 +57,13 @@ pub(super) async fn send_queued_messages(
57
57
58
58
loop {
59
59
select ! {
60
- result = & mut send_task => {
60
+ send_result = & mut send_task => {
61
61
trace!( "SendMessageTask finished" ) ;
62
62
63
63
send_task. reset( ) ;
64
64
65
65
handle_send_result(
66
- result ,
66
+ send_result ,
67
67
& mut send_task,
68
68
& mut queue,
69
69
& timeline,
@@ -129,7 +129,7 @@ async fn send_or_queue_msg(
129
129
return ;
130
130
}
131
131
132
- send_task. start ( room, timeline . clone ( ) , msg) ;
132
+ send_task. start ( room, msg) ;
133
133
}
134
134
135
135
async fn handle_send_result (
@@ -139,16 +139,23 @@ async fn handle_send_result(
139
139
timeline : & TimelineInner ,
140
140
) {
141
141
match send_result {
142
- SendMessageResult :: Success { room } => {
142
+ SendMessageResult :: Success { event_id, txn_id } => {
143
+ timeline. update_event_send_state ( & txn_id, EventSendState :: Sent { event_id } ) . await ;
144
+
143
145
// Event was successfully sent, move on to the next queued event.
144
146
if let Some ( msg) = queue. pop_front ( ) {
145
- send_task. start ( room , timeline. clone ( ) , msg) ;
147
+ send_task. start ( timeline. room ( ) . clone ( ) , msg) ;
146
148
}
147
149
}
148
150
149
- SendMessageResult :: SendingFailed => {
150
- // Timeline items are marked as failed / cancelled in this case.
151
- //
151
+ SendMessageResult :: SendingFailed { send_error, txn_id } => {
152
+ timeline
153
+ . update_event_send_state (
154
+ & txn_id,
155
+ EventSendState :: SendingFailed { error : Arc :: new ( send_error) } ,
156
+ )
157
+ . await ;
158
+
152
159
// Clear the queue and wait for the user to explicitly retry (which will
153
160
// re-append the to-be-sent events in the queue).
154
161
queue. clear ( ) ;
@@ -157,36 +164,41 @@ async fn handle_send_result(
157
164
SendMessageResult :: TaskError { join_error, txn_id } => {
158
165
error ! ( "Message-sending task failed: {join_error}" ) ;
159
166
167
+ timeline
168
+ . update_event_send_state (
169
+ & txn_id,
170
+ EventSendState :: SendingFailed {
171
+ // FIXME: Probably not exactly right
172
+ error : Arc :: new ( matrix_sdk:: Error :: InconsistentState ) ,
173
+ } ,
174
+ )
175
+ . await ;
176
+
160
177
// See above comment in the `SendingFailed` arm.
161
178
queue. clear ( ) ;
162
-
163
- let send_state = EventSendState :: SendingFailed {
164
- // FIXME: Probably not exactly right
165
- error : Arc :: new ( matrix_sdk:: Error :: InconsistentState ) ,
166
- } ;
167
-
168
- timeline. update_event_send_state ( & txn_id, send_state) . await ;
169
179
}
170
180
}
171
181
}
172
182
173
183
/// Result of [`SendMessageTask`].
174
184
enum SendMessageResult {
175
- /// The message was sent successfully, and the local echo was updated to
176
- /// indicate this.
185
+ /// The message was sent successfully.
177
186
Success {
178
- /// The joined room object, used to start sending of the next message
179
- /// in the queue, if it isn't empty.
180
- room : Room ,
187
+ /// The event id returned by the server.
188
+ event_id : OwnedEventId ,
189
+ /// The transaction ID of the message that was being sent by the task.
190
+ txn_id : OwnedTransactionId ,
181
191
} ,
182
192
183
- /// Sending failed, and the local echo was updated to indicate this.
184
- SendingFailed ,
193
+ /// Sending failed.
194
+ SendingFailed {
195
+ /// The reason of the sending failure.
196
+ send_error : matrix_sdk:: Error ,
197
+ /// The transaction ID of the message that was being sent by the task.
198
+ txn_id : OwnedTransactionId ,
199
+ } ,
185
200
186
201
/// The [`SendMessageTask`] failed, likely due to a panic.
187
- ///
188
- /// This means that the timeline item was likely not updated yet, which thus
189
- /// becomes the responsibility of the code observing this result.
190
202
TaskError {
191
203
/// The error with which the task failed.
192
204
join_error : JoinError ,
@@ -206,7 +218,7 @@ enum SendMessageTask {
206
218
/// The transaction ID of the message that is being sent.
207
219
txn_id : OwnedTransactionId ,
208
220
/// Handle to the task itself.
209
- task : JoinHandle < Option < Room > > ,
221
+ task : JoinHandle < SendMessageResult > ,
210
222
} ,
211
223
}
212
224
@@ -218,22 +230,20 @@ impl SendMessageTask {
218
230
219
231
/// Spawns a task sending the message to the room, and updating the timeline
220
232
/// once the result has been processed.
221
- fn start ( & mut self , room : Room , timeline : TimelineInner , msg : LocalMessage ) {
233
+ fn start ( & mut self , room : Room , msg : LocalMessage ) {
222
234
debug ! ( "Spawning message-sending task" ) ;
223
235
224
236
let txn_id = msg. txn_id . clone ( ) ;
225
237
226
238
let task = spawn ( async move {
227
- let result = room. send ( msg. content ) . with_transaction_id ( & msg. txn_id ) . await ;
228
-
229
- let ( room, send_state) = match result {
230
- Ok ( response) => ( Some ( room) , EventSendState :: Sent { event_id : response. event_id } ) ,
231
- Err ( error) => ( None , EventSendState :: SendingFailed { error : Arc :: new ( error) } ) ,
232
- } ;
233
-
234
- timeline. update_event_send_state ( & msg. txn_id , send_state) . await ;
235
-
236
- room
239
+ match room. send ( msg. content ) . with_transaction_id ( & msg. txn_id ) . await {
240
+ Ok ( response) => {
241
+ SendMessageResult :: Success { event_id : response. event_id , txn_id : msg. txn_id }
242
+ }
243
+ Err ( error) => {
244
+ SendMessageResult :: SendingFailed { send_error : error, txn_id : msg. txn_id }
245
+ }
246
+ }
237
247
} ) ;
238
248
239
249
* self = Self :: Running { txn_id, task } ;
@@ -251,20 +261,16 @@ impl Future for SendMessageTask {
251
261
match & mut * self {
252
262
SendMessageTask :: Idle => Poll :: Pending ,
253
263
254
- SendMessageTask :: Running { txn_id, task : join_handle } => {
255
- Pin :: new ( join_handle) . poll ( cx) . map ( |result| {
256
- let txn_id = mem:: replace ( txn_id, OwnedTransactionId :: from ( "" ) ) ;
257
- if txn_id. as_str ( ) . is_empty ( ) {
258
- warn ! ( "SendMessageTask polled after returning Poll::Ready!" ) ;
259
- }
260
-
261
- match result {
262
- Ok ( Some ( room) ) => SendMessageResult :: Success { room } ,
263
- Ok ( None ) => SendMessageResult :: SendingFailed ,
264
- Err ( join_error) => SendMessageResult :: TaskError { join_error, txn_id } ,
265
- }
264
+ SendMessageTask :: Running { txn_id, task } => Pin :: new ( task) . poll ( cx) . map ( |result| {
265
+ let txn_id = mem:: replace ( txn_id, OwnedTransactionId :: from ( "" ) ) ;
266
+ if txn_id. as_str ( ) . is_empty ( ) {
267
+ warn ! ( "SendMessageTask polled after returning Poll::Ready!" ) ;
268
+ }
269
+ result. unwrap_or_else ( |error| SendMessageResult :: TaskError {
270
+ join_error : error,
271
+ txn_id,
266
272
} )
267
- }
273
+ } ) ,
268
274
}
269
275
}
270
276
}
0 commit comments