80
80
81
81
use futures_core:: stream:: Stream ;
82
82
use futures_core:: task:: { LocalWaker , Waker , Poll } ;
83
+ use futures_core:: task:: __internal:: AtomicWaker ;
83
84
use std:: any:: Any ;
84
85
use std:: error:: Error ;
85
86
use std:: fmt;
@@ -291,7 +292,7 @@ struct Inner<T> {
291
292
num_senders : AtomicUsize ,
292
293
293
294
// Handle to the receiver's task.
294
- recv_task : Mutex < ReceiverTask > ,
295
+ recv_task : AtomicWaker ,
295
296
}
296
297
297
298
// Struct representation of `Inner::state`.
@@ -304,18 +305,6 @@ struct State {
304
305
num_messages : usize ,
305
306
}
306
307
307
- #[ derive( Debug ) ]
308
- struct ReceiverTask {
309
- unparked : bool ,
310
- task : Option < Waker > ,
311
- }
312
-
313
- // Returned from Receiver::try_park()
314
- enum TryPark {
315
- Parked ,
316
- NotEmpty ,
317
- }
318
-
319
308
// The `is_open` flag is stored in the left-most bit of `Inner::state`
320
309
const OPEN_MASK : usize = usize:: MAX - ( usize:: MAX >> 1 ) ;
321
310
@@ -394,10 +383,7 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
394
383
message_queue : Queue :: new ( ) ,
395
384
parked_queue : Queue :: new ( ) ,
396
385
num_senders : AtomicUsize :: new ( 1 ) ,
397
- recv_task : Mutex :: new ( ReceiverTask {
398
- unparked : false ,
399
- task : None ,
400
- } ) ,
386
+ recv_task : AtomicWaker :: new ( ) ,
401
387
} ) ;
402
388
403
389
let tx = Sender {
@@ -512,7 +498,7 @@ impl<T> Sender<T> {
512
498
513
499
// Signal to the receiver that a message has been enqueued. If the
514
500
// receiver is parked, this will unpark the task.
515
- self . signal ( ) ;
501
+ self . inner . recv_task . wake ( ) ;
516
502
}
517
503
518
504
// Increment the number of queued messages. Returns the resulting number.
@@ -545,35 +531,6 @@ impl<T> Sender<T> {
545
531
}
546
532
}
547
533
548
- // Signal to the receiver task that a message has been enqueued
549
- fn signal ( & self ) {
550
- // TODO
551
- // This logic can probably be improved by guarding the lock with an
552
- // atomic.
553
- //
554
- // Do this step first so that the lock is dropped when
555
- // `unpark` is called
556
- let task = {
557
- let mut recv_task = self . inner . recv_task . lock ( ) . unwrap ( ) ;
558
-
559
- // If the receiver has already been unparked, then there is nothing
560
- // more to do
561
- if recv_task. unparked {
562
- return ;
563
- }
564
-
565
- // Setting this flag enables the receiving end to detect that
566
- // an unpark event happened in order to avoid unnecessarily
567
- // parking.
568
- recv_task. unparked = true ;
569
- recv_task. task . take ( )
570
- } ;
571
-
572
- if let Some ( task) = task {
573
- task. wake ( ) ;
574
- }
575
- }
576
-
577
534
fn park ( & mut self , lw : Option < & LocalWaker > ) {
578
535
// TODO: clean up internal state if the task::current will fail
579
536
@@ -633,7 +590,7 @@ impl<T> Sender<T> {
633
590
// that stuff from `do_send`.
634
591
635
592
self . inner . set_closed ( ) ;
636
- self . signal ( ) ;
593
+ self . inner . recv_task . wake ( ) ;
637
594
}
638
595
639
596
fn poll_unparked ( & mut self , lw : Option < & LocalWaker > ) -> Poll < ( ) > {
@@ -680,7 +637,7 @@ impl<T> UnboundedSender<T> {
680
637
/// Closes this channel from the sender side, preventing any new messages.
681
638
pub fn close_channel ( & self ) {
682
639
self . 0 . inner . set_closed ( ) ;
683
- self . 0 . signal ( ) ;
640
+ self . 0 . inner . recv_task . wake ( ) ;
684
641
}
685
642
686
643
// Do the send without parking current task.
@@ -847,21 +804,6 @@ impl<T> Receiver<T> {
847
804
}
848
805
}
849
806
850
- // Try to park the receiver task
851
- fn try_park ( & self , lw : & LocalWaker ) -> TryPark {
852
- // First, track the task in the `recv_task` slot
853
- let mut recv_task = self . inner . recv_task . lock ( ) . unwrap ( ) ;
854
-
855
- if recv_task. unparked {
856
- // Consume the `unpark` signal without actually parking
857
- recv_task. unparked = false ;
858
- return TryPark :: NotEmpty ;
859
- }
860
-
861
- recv_task. task = Some ( lw. clone ( ) . into_waker ( ) ) ;
862
- TryPark :: Parked
863
- }
864
-
865
807
fn dec_num_messages ( & self ) {
866
808
// OPEN_MASK is highest bit, so it's unaffected by subtraction
867
809
// unless there's underflow, and we know there's no underflow
@@ -880,31 +822,17 @@ impl<T> Stream for Receiver<T> {
880
822
mut self : Pin < & mut Self > ,
881
823
lw : & LocalWaker ,
882
824
) -> Poll < Option < T > > {
883
- loop {
884
825
// Try to read a message off of the message queue.
885
- let msg = match self . next_message ( ) {
886
- Poll :: Ready ( msg) => msg,
887
- Poll :: Pending => {
888
- // There are no messages to read, in this case, attempt to
889
- // park. The act of parking will verify that the channel is
890
- // still empty after the park operation has completed.
891
- match self . try_park ( lw) {
892
- TryPark :: Parked => {
893
- // The task was parked, and the channel is still
894
- // empty, return Pending.
895
- return Poll :: Pending ;
896
- }
897
- TryPark :: NotEmpty => {
898
- // A message has been sent while attempting to
899
- // park. Loop again, the next iteration is
900
- // guaranteed to get the message.
901
- continue ;
902
- }
903
- }
904
- }
905
- } ;
906
- // Return the message
907
- return Poll :: Ready ( msg) ;
826
+ match self . next_message ( ) {
827
+ Poll :: Ready ( msg) => Poll :: Ready ( msg) ,
828
+ Poll :: Pending => {
829
+ // There are no messages to read, in this case, park.
830
+ self . inner . recv_task . register ( lw) ;
831
+ // Check queue again after parking to prevent race condition:
832
+ // a message could be added to the queue after previous `next_message`
833
+ // before `register` call.
834
+ self . next_message ( )
835
+ }
908
836
}
909
837
}
910
838
}
0 commit comments