Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disconnected flavor which immediately fails #1047

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ pub fn never<T>() -> Receiver<T> {
}
}

/// Creates a receiver that is disconnected always.
Copy link
Contributor Author

@ryoqun ryoqun Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to expand this doccomment with an example, which will be based on the one in the pr description with polishes, once it's confirmed that this pr has some hope to be merged.

pub fn disconnected<T>() -> Receiver<T> {
Receiver {
flavor: ReceiverFlavor::Disconnected(flavors::disconnected::Channel::new()),
}
}

/// Creates a receiver that delivers messages periodically.
///
/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
Expand Down Expand Up @@ -731,6 +738,9 @@ enum ReceiverFlavor<T> {

/// The never flavor.
Never(flavors::never::Channel<T>),

/// The disconnected flavor.
Disconnected(flavors::disconnected::Channel<T>),
}

unsafe impl<T: Send> Send for Receiver<T> {}
Expand Down Expand Up @@ -784,6 +794,7 @@ impl<T> Receiver<T> {
}
}
ReceiverFlavor::Never(chan) => chan.try_recv(),
ReceiverFlavor::Disconnected(chan) => chan.try_recv(),
}
}

Expand Down Expand Up @@ -839,6 +850,7 @@ impl<T> Receiver<T> {
}
}
ReceiverFlavor::Never(chan) => chan.recv(None),
ReceiverFlavor::Disconnected(chan) => chan.recv(None),
}
.map_err(|_| RecvError)
}
Expand Down Expand Up @@ -950,6 +962,7 @@ impl<T> Receiver<T> {
}
}
ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::Disconnected(chan) => chan.recv(Some(deadline)),
}
}

Expand All @@ -976,6 +989,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.is_empty(),
ReceiverFlavor::Tick(chan) => chan.is_empty(),
ReceiverFlavor::Never(chan) => chan.is_empty(),
ReceiverFlavor::Disconnected(chan) => chan.is_empty(),
}
}

Expand All @@ -1002,6 +1016,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.is_full(),
ReceiverFlavor::Tick(chan) => chan.is_full(),
ReceiverFlavor::Never(chan) => chan.is_full(),
ReceiverFlavor::Disconnected(chan) => chan.is_full(),
}
}

Expand All @@ -1027,6 +1042,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.len(),
ReceiverFlavor::Tick(chan) => chan.len(),
ReceiverFlavor::Never(chan) => chan.len(),
ReceiverFlavor::Disconnected(chan) => chan.len(),
}
}

Expand Down Expand Up @@ -1054,6 +1070,7 @@ impl<T> Receiver<T> {
ReceiverFlavor::At(chan) => chan.capacity(),
ReceiverFlavor::Tick(chan) => chan.capacity(),
ReceiverFlavor::Never(chan) => chan.capacity(),
ReceiverFlavor::Disconnected(chan) => chan.capacity(),
}
}

Expand Down Expand Up @@ -1165,6 +1182,7 @@ impl<T> Drop for Receiver<T> {
ReceiverFlavor::At(_) => {}
ReceiverFlavor::Tick(_) => {}
ReceiverFlavor::Never(_) => {}
ReceiverFlavor::Disconnected(_) => {}
}
}
}
Expand All @@ -1179,6 +1197,9 @@ impl<T> Clone for Receiver<T> {
ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
ReceiverFlavor::Disconnected(_) => {
ReceiverFlavor::Disconnected(flavors::disconnected::Channel::new())
}
};

Self { flavor }
Expand Down Expand Up @@ -1428,6 +1449,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.try_select(token),
ReceiverFlavor::Tick(chan) => chan.try_select(token),
ReceiverFlavor::Never(chan) => chan.try_select(token),
ReceiverFlavor::Disconnected(chan) => chan.try_select(token),
}
}

Expand All @@ -1439,6 +1461,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.deadline(),
ReceiverFlavor::Tick(chan) => chan.deadline(),
ReceiverFlavor::Never(chan) => chan.deadline(),
ReceiverFlavor::Disconnected(chan) => chan.deadline(),
}
}

Expand All @@ -1450,6 +1473,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.register(oper, cx),
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
ReceiverFlavor::Disconnected(chan) => chan.register(oper, cx),
}
}

Expand All @@ -1461,6 +1485,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.unregister(oper),
ReceiverFlavor::Tick(chan) => chan.unregister(oper),
ReceiverFlavor::Never(chan) => chan.unregister(oper),
ReceiverFlavor::Disconnected(chan) => chan.unregister(oper),
}
}

Expand All @@ -1472,6 +1497,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.accept(token, cx),
ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
ReceiverFlavor::Never(chan) => chan.accept(token, cx),
ReceiverFlavor::Disconnected(chan) => chan.accept(token, cx),
}
}

Expand All @@ -1483,6 +1509,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.is_ready(),
ReceiverFlavor::Tick(chan) => chan.is_ready(),
ReceiverFlavor::Never(chan) => chan.is_ready(),
ReceiverFlavor::Disconnected(chan) => chan.is_ready(),
}
}

Expand All @@ -1494,6 +1521,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.watch(oper, cx),
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
ReceiverFlavor::Disconnected(chan) => chan.watch(oper, cx),
}
}

Expand All @@ -1505,6 +1533,7 @@ impl<T> SelectHandle for Receiver<T> {
ReceiverFlavor::At(chan) => chan.unwatch(oper),
ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
ReceiverFlavor::Never(chan) => chan.unwatch(oper),
ReceiverFlavor::Disconnected(chan) => chan.unwatch(oper),
}
}
}
Expand All @@ -1531,5 +1560,6 @@ pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
}
ReceiverFlavor::Never(chan) => chan.read(token),
ReceiverFlavor::Disconnected(chan) => chan.read(token),
}
}
108 changes: 108 additions & 0 deletions crossbeam-channel/src/flavors/disconnected.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! Channel that is always disconnected.
//!
//! Messages cannot be sent into this kind of channel.

use std::marker::PhantomData;
use std::time::Instant;

use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};

/// This flavor doesn't need a token.
pub(crate) type DisconnectedToken = ();

/// Channel that always delivers messages.
pub(crate) struct Channel<T> {
_marker: PhantomData<T>,
}

impl<T> Channel<T> {
/// Creates a channel that always delivers messages.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops forgot to update..

#[inline]
pub(crate) fn new() -> Self {
Channel {
_marker: PhantomData,
}
}

/// Attempts to receive a message without blocking.
#[inline]
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
Err(TryRecvError::Disconnected)
}

/// Receives a message from the channel.
#[inline]
pub(crate) fn recv(&self, _deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
Err(RecvTimeoutError::Disconnected)
}

/// Reads a message from the channel.
#[inline]
pub(crate) unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> {
Err(())
}

/// Returns `true` if the channel is empty.
#[inline]
pub(crate) fn is_empty(&self) -> bool {
true
}

/// Returns `true` if the channel is full.
#[inline]
pub(crate) fn is_full(&self) -> bool {
true
}

/// Returns the number of messages in the channel.
#[inline]
pub(crate) fn len(&self) -> usize {
0
}

/// Returns the capacity of the channel.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
}

impl<T> SelectHandle for Channel<T> {
#[inline]
fn try_select(&self, _token: &mut Token) -> bool {
true
}

#[inline]
fn deadline(&self) -> Option<Instant> {
None
}

#[inline]
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}

#[inline]
fn unregister(&self, _oper: Operation) {}

#[inline]
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}

#[inline]
fn is_ready(&self) -> bool {
true
}

#[inline]
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}

#[inline]
fn unwatch(&self, _oper: Operation) {}
}
2 changes: 2 additions & 0 deletions crossbeam-channel/src/flavors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
//! 2. `array` - Bounded channel based on a preallocated array.
//! 3. `list` - Unbounded channel implemented as a linked list.
//! 4. `never` - Channel that never delivers messages.
//! 5. `disconnected` - Channel that is always disconnected.
//! 5. `tick` - Channel that delivers messages periodically.
//! 6. `zero` - Zero-capacity channel.

pub(crate) mod array;
pub(crate) mod at;
pub(crate) mod disconnected;
pub(crate) mod list;
pub(crate) mod never;
pub(crate) mod tick;
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ cfg_if! {
pub use crate::select::{select, select_timeout, try_select};
}

pub use crate::channel::{after, at, never, tick};
pub use crate::channel::{after, at, never, disconnected, tick};
pub use crate::channel::{bounded, unbounded};
pub use crate::channel::{IntoIter, Iter, TryIter};
pub use crate::channel::{Receiver, Sender};
Expand Down
2 changes: 2 additions & 0 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct Token {
pub(crate) list: flavors::list::ListToken,
#[allow(dead_code)]
pub(crate) never: flavors::never::NeverToken,
#[allow(dead_code)]
pub(crate) disconnected: flavors::disconnected::DisconnectedToken,
pub(crate) tick: flavors::tick::TickToken,
pub(crate) zero: flavors::zero::ZeroToken,
}
Expand Down