@@ -253,6 +253,7 @@ pub struct Sender {
253
253
capacity : usize ,
254
254
min_packet_space : usize ,
255
255
max_packet_space : usize ,
256
+ dropped_datagrams : u64 ,
256
257
smoothed_packet_size : f64 ,
257
258
waker : Option < Waker > ,
258
259
max_datagram_payload : u64 ,
@@ -449,43 +450,64 @@ impl Sender {
449
450
pub fn smoothed_packet_space ( & self ) -> usize {
450
451
self . smoothed_packet_size as usize
451
452
}
453
+
454
+ /// Returns the number of datagrams that have been dropped by the sender
455
+ ///
456
+ /// The cause of drops is due to the datagrams being larger than the current path MTU. If this
457
+ /// number is non-zero, applications should try to send smaller datagrams.
458
+ #[ inline]
459
+ pub fn dropped_datagrams ( & self ) -> u64 {
460
+ self . dropped_datagrams
461
+ }
452
462
}
453
463
454
464
impl super :: Sender for Sender {
465
+ #[ inline]
455
466
fn on_transmit < P : Packet > ( & mut self , packet : & mut P ) {
456
467
// Cede space to stream data when datagrams are not prioritized
457
468
if packet. has_pending_streams ( ) && !packet. datagrams_prioritized ( ) {
458
469
return ;
459
470
}
471
+
460
472
self . record_capacity_stats ( packet. remaining_capacity ( ) ) ;
473
+
461
474
let mut has_written = false ;
475
+
462
476
while packet. remaining_capacity ( ) > 0 {
463
- if let Some ( datagram) = self . queue . pop_front ( ) {
464
- // Ensure there is enough space in the packet to send a datagram
465
- if packet. remaining_capacity ( ) >= datagram. data . len ( ) {
466
- match packet. write_datagram ( & datagram. data ) {
467
- Ok ( ( ) ) => has_written = true ,
468
- Err ( _error) => {
469
- continue ;
470
- }
471
- }
472
- // Since a datagram was popped off the queue, wake the
473
- // stored waker if we have one to let the application know
474
- // that there is space on the queue for more datagrams.
475
- if let Some ( w) = self . waker . take ( ) {
476
- w. wake ( ) ;
477
- }
478
- } else {
479
- // This check keeps us from popping all the datagrams off the
480
- // queue when packet space remaining is smaller than the datagram.
481
- if has_written {
482
- self . queue . push_front ( datagram) ;
483
- return ;
484
- }
477
+ let Some ( datagram) = self . queue . pop_front ( ) else {
478
+ break ;
479
+ } ;
480
+
481
+ // Ensure there is enough space in the packet to send a datagram
482
+ if packet. remaining_capacity ( ) < datagram. data . len ( ) {
483
+ // This check keeps us from popping all the datagrams off the
484
+ // queue when packet space remaining is smaller than the datagram.
485
+ if has_written {
486
+ self . queue . push_front ( datagram) ;
487
+ break ;
488
+ }
489
+
490
+ // the datagram is too large for the current packet and unlikely to ever fit so
491
+ // record a metric and try the next datagram in the queue
492
+ self . dropped_datagrams += 1 ;
493
+ continue ;
494
+ }
495
+
496
+ match packet. write_datagram ( & datagram. data ) {
497
+ Ok ( ( ) ) => has_written = true ,
498
+ Err ( _error) => {
499
+ // TODO log this
500
+ self . dropped_datagrams += 1 ;
501
+ continue ;
485
502
}
486
- } else {
487
- // If there are no datagrams on the queue we return
488
- return ;
503
+ }
504
+ }
505
+
506
+ // If we now have additional capacity wake the stored waker if we have one to
507
+ // let the application know that there is space on the queue for more datagrams.
508
+ if self . capacity > self . queue . len ( ) {
509
+ if let Some ( w) = self . waker . take ( ) {
510
+ w. wake ( ) ;
489
511
}
490
512
}
491
513
}
@@ -540,6 +562,7 @@ impl SenderBuilder {
540
562
queue : VecDeque :: with_capacity ( self . queue_capacity ) ,
541
563
capacity : self . queue_capacity ,
542
564
max_datagram_payload : self . max_datagram_payload ,
565
+ dropped_datagrams : 0 ,
543
566
max_packet_space : 0 ,
544
567
min_packet_space : 0 ,
545
568
smoothed_packet_size : 0.0 ,
@@ -780,6 +803,50 @@ mod tests {
780
803
assert ! ( !default_sender. queue. is_empty( ) ) ;
781
804
}
782
805
806
+ /// Ensures the application waker is called when capacity becomes available
807
+ #[ test]
808
+ fn wake_with_capacity ( ) {
809
+ let ( waker, wake_count) = new_count_waker ( ) ;
810
+ let mut cx = Context :: from_waker ( & waker) ;
811
+
812
+ let conn_info = ConnectionInfo :: new ( 100 , waker. clone ( ) ) ;
813
+
814
+ let mut default_sender = Sender :: builder ( )
815
+ . with_capacity ( 1 )
816
+ . with_connection_info ( & conn_info)
817
+ . build ( )
818
+ . unwrap ( ) ;
819
+
820
+ let datagram = bytes:: Bytes :: from_static ( & [ 1 , 2 , 3 ] ) ;
821
+
822
+ assert ! ( default_sender
823
+ . poll_send_datagram( & mut datagram. clone( ) , & mut cx)
824
+ . is_ready( ) ) ;
825
+ assert ! ( default_sender
826
+ . poll_send_datagram( & mut datagram. clone( ) , & mut cx)
827
+ . is_pending( ) ) ;
828
+
829
+ assert_eq ! ( wake_count. get( ) , 0 ) ;
830
+
831
+ // Packet size is just enough to write the first datagram with some
832
+ // room left over, but not enough to write the second.
833
+ let mut packet = MockPacket {
834
+ remaining_capacity : 2 ,
835
+ has_pending_streams : false ,
836
+ datagrams_prioritized : false ,
837
+ } ;
838
+ crate :: datagram:: Sender :: on_transmit ( & mut default_sender, & mut packet) ;
839
+
840
+ // Packet capacity has not changed
841
+ assert_eq ! ( packet. remaining_capacity, 2 ) ;
842
+ // Send queue is completely depleted
843
+ assert ! ( default_sender. queue. is_empty( ) ) ;
844
+ // The waker was called since we now have capacity
845
+ assert_eq ! ( wake_count. get( ) , 1 ) ;
846
+ // The sender should record the number of dropped datagrams
847
+ assert_eq ! ( default_sender. dropped_datagrams( ) , 1 ) ;
848
+ }
849
+
783
850
fn fake_receive_context ( ) -> crate :: datagram:: ReceiveContext < ' static > {
784
851
crate :: datagram:: ReceiveContext {
785
852
path : crate :: event:: api:: Path {
0 commit comments