Skip to content

Commit

Permalink
docs!: Improving docs, visibility, and constraints.
Browse files Browse the repository at this point in the history
Improves #7
  • Loading branch information
halzy committed Feb 25, 2020
1 parent 6d621b1 commit a5ce16b
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stream_multiplexer"
version = "0.2.0"
version = "0.3.0"
authors = ["Benjamin Halsted <[email protected]>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand Down
5 changes: 1 addition & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ pub enum MultiplexerError {

// #[error("Sending to full stream {0}")]
// StreamClosed(StreamId),

/// Wrapper around std::io::Error
#[error("IoError")]
IoError(#[from] std::io::Error),

/// Nothing to see here
#[error("Should never happen")]
UnitError,
}
8 changes: 4 additions & 4 deletions src/halt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ struct Inner {
}

#[derive(Clone, Debug)]
pub struct HaltRead {
pub(crate) struct HaltRead {
inner: Arc<Inner>,
}

impl HaltRead {
#[tracing::instrument(level = "trace", skip(self))]
pub fn signal(&self) {
pub(crate) fn signal(&self) {
tracing::trace!("setting atomic bool, triggering waker");
self.inner.set.store(true, Relaxed);
self.inner.waker.wake();
}

#[tracing::instrument(level = "trace", skip(read))]
pub fn wrap<St>(read: St) -> (Self, HaltAsyncRead<St>)
pub(crate) fn wrap<St>(read: St) -> (Self, HaltAsyncRead<St>)
where
St: Stream,
{
Expand All @@ -47,7 +47,7 @@ impl HaltRead {
}

#[derive(Debug)]
pub struct HaltAsyncRead<St> {
pub(crate) struct HaltAsyncRead<St> {
inner: Arc<Inner>,
read: Option<St>,
}
Expand Down
3 changes: 2 additions & 1 deletion src/id_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ pub trait IdGen: Default {
fn seed(&mut self, _seed: usize) {}
}

/// The default IdGen for MultiplexerSenders
/// Generates IDs for incoming streams. Is the default `IdGen` for `MultiplexerSenders`.
/// This implementation simply increments and wraps at the usize boundary.
#[derive(Default, Copy, Clone, PartialEq, Debug)]
pub struct IncrementIdGen {
id: StreamId,
Expand Down
63 changes: 49 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* FIXME
#![warn(
missing_docs,
missing_debug_implementations,
Expand All @@ -11,9 +10,17 @@
unused_import_braces,
unused_qualifications
)]
/* FIXME
#![cfg_attr(debug_assertions, allow(dead_code))]
#![cfg_attr(test, allow(dead_code))]
*/
/*!
This crate provides stream multiplexing with channels.
Channels have their own backpressure that does not affect other channels.
Incoming streams are by default set to channel 0 and can be moved to other channels via `ControlMessage`s.
*/
mod error;
mod halt;
mod id_gen;
Expand All @@ -22,40 +29,53 @@ mod multiplexer_senders;
mod send_all_own;
mod sender;
mod stream_mover;
mod stream_producer;

pub use error::*;
use halt::*;
pub use id_gen::*;
pub use multiplexer::*;
pub use multiplexer_senders::*;
use multiplexer_senders::*;
use send_all_own::*;
use sender::*;
use stream_mover::*;
pub use stream_producer::*;

type StreamId = usize;

/// Produced by the incoming stream
#[derive(Clone, PartialEq, Debug)]
pub enum IncomingMessage<V> {
/// Value received from a stream
Value(V),
/// Sent when the stream has gone linkdead
Linkdead,
}
/// A packet representing a message for a stream
#[derive(Clone, PartialEq, Debug)]
impl<V> std::fmt::Debug for IncomingMessage<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IncomingMessage::Value(_) => write!(f, "IncomingMessage::Value(_)"),
IncomingMessage::Linkdead => write!(f, "IncomingMessage::Linkdead"),
}
}
}
/// A packet representing a message from a stream.
pub struct IncomingPacket<V> {
id: StreamId,
message: IncomingMessage<V>,
}
impl<V> std::fmt::Debug for IncomingPacket<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IncomingPacket")
.field("id", &self.id)
.field("message", &self.message)
.finish()
}
}
impl<V> IncomingPacket<V> {
/// Wraps a message that will be sent to the stream with the given `id`.
pub fn new(id: StreamId, message: IncomingMessage<V>) -> Self {
Self { id, message }
}

/// The id the message is from.
/// The id of the stream that the message is from.
pub fn id(&self) -> StreamId {
self.id
}
Expand All @@ -75,7 +95,7 @@ impl<V> IncomingPacket<V> {
}

/// The payload of an OutgoingPacket
#[derive(Copy, Clone, PartialEq, Debug)]
#[derive(Clone)]
pub enum OutgoingMessage<V> {
/// Value to send to the stream
Value(V),
Expand All @@ -85,19 +105,34 @@ pub enum OutgoingMessage<V> {
Shutdown,
}
impl<V> Unpin for OutgoingMessage<V> where V: Unpin {}
impl<V> std::fmt::Debug for OutgoingMessage<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingMessage::Value(_) => write!(f, "OutgoingMessage::Value(_)"),
OutgoingMessage::ChangeChannel(channel) => {
write!(f, "OutgoingMessage::ChangeChannel({})", channel)
}
OutgoingMessage::Shutdown => write!(f, "OutgoingMessage::Shutdown"),
}
}
}

/// For sending Value or causing the stream to change to a different channel
#[derive(Clone, PartialEq, Debug)]
pub struct OutgoingPacket<V> {
/// List of streams this packet is for.
ids: Vec<StreamId>,
/// The packet payload
message: OutgoingMessage<V>,
}
impl<V> OutgoingPacket<V>
where
V: std::fmt::Debug + PartialEq + Clone,
{
impl<V> std::fmt::Debug for OutgoingPacket<V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OutgoingPacket")
.field("ids", &self.ids)
.field("message", &self.message)
.finish()
}
}
impl<V> OutgoingPacket<V> {
/// Creates an OutoingPacket message for a list of streams.
pub fn new(ids: Vec<StreamId>, message: OutgoingMessage<V>) -> Self {
Self { ids, message }
Expand Down
40 changes: 22 additions & 18 deletions src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type StreamMovers<InSt> = HashMap<
>;

/// Manages incoming streams of data and the enqueueing of outgoing data.
///
/// Outgoing streams have their own buffer of messages and do not affect other streams.
/// Incoming streams have their messages routed into channels that have their own backpressure.
pub struct Multiplexer<InSt, Out, OutItem, OutSi, Id>
where
InSt: Stream,
Expand Down Expand Up @@ -48,9 +51,7 @@ where
OutSi: Sink<OutItem> + Unpin,
{
// FIXME: Consider taking a function that can determine which channel a packet should be in
/// Initializes with a stream that provides the outgoing packets which will be enqueued to the
/// corresponding streams, and a vector of sinks that represent different "channels" or
/// "categories" of data.
/// Calls `with_id_gen`, giving it an IncrementIdGen as well as the rest of the arguments.
pub fn new(
sender_buffer_size: usize,
outgoing: Out,
Expand All @@ -67,8 +68,9 @@ where
InSt: Stream + Unpin,
OutSi: Sink<OutItem> + Unpin,
{
/// like `run` and gives the ability to overrid MultiplexerSenders, which is useful when
/// overridding the default IdGen in MultiplexerSenders for testing.
/// Initializes with a stream that provides the outgoing packets which will be enqueued to the
/// corresponding streams, and a vector of sinks that represent different "channels" or
/// "categories" of data.
pub fn with_id_gen(
sender_buffer_size: usize,
id_gen: Id,
Expand Down Expand Up @@ -178,21 +180,21 @@ where

#[tracing::instrument(
level = "trace",
skip(self, framed_write_half, framed_read_half, incoming_packet_reader_tx)
skip(self, write_half, read_half, incoming_packet_reader_tx)
)]
async fn handle_incoming_connection(
&mut self,
framed_write_half: OutSi,
framed_read_half: InSt,
write_half: OutSi,
read_half: InSt,
incoming_packet_reader_tx: &mut IncomingPacketReaderTx<InSt>,
) {
tracing::trace!("new connection");

// used to re-join the two halves so that we can shut down the reader
let (halt, async_read_halt) = HaltRead::wrap(framed_read_half);
let (halt, async_read_halt) = HaltRead::wrap(read_half);

// Keep track of the write_half and generate a stream_id
let sender: Sender<OutSi> = Sender::new(framed_write_half, halt);
let sender: Sender<OutSi> = Sender::new(write_half, halt);

let (stream_id_tx, stream_id_rx) = oneshot::channel();
self.senders_channel
Expand Down Expand Up @@ -224,6 +226,9 @@ where
InSt: Stream + Send + Unpin + 'static,
InSt::Item: Send,
{
/// Awaits incoming channel joins and messages from those streams in the channel.
///
/// If there is backpressure, joining the channel also slows.
fn run_channel(
channel: usize,
mut reader: SelectAll<StreamMover<IncomingPacketReader<InSt>>>,
Expand All @@ -235,9 +240,7 @@ where
tokio::task::spawn(async move {
loop {
tracing::trace!(channel, "incoming loop start");
// We do not have an incoming packet
tokio::select! {
// FIXME: This block is duplicated, down below
packet_reader = incoming_packet_reader_rx.recv() => {
tracing::trace!("incoming socket (none)");
match packet_reader {
Expand Down Expand Up @@ -281,7 +284,6 @@ where
Out: Stream<Item = OutgoingPacket<OutItem>>,
OutItem: Clone,
OutSi: Sink<OutItem>,
OutSi::Error: std::fmt::Debug,

Id: Send + Unpin + 'static,
InSt: Send + Unpin + 'static,
Expand All @@ -290,10 +292,12 @@ where
OutItem: Send + Sync + 'static,
OutSi: Send + Unpin + 'static,
{
#[tracing::instrument(level = "debug", skip(incoming_write_halves, control))]
/// Start the multiplexer. Giving it a stream of incoming connection halves and a stream for
/// ControlMessages.
#[tracing::instrument(level = "debug", skip(incoming_halves, control))]
pub async fn run<V, U>(
mut self,
mut incoming_write_halves: V,
mut incoming_halves: V,
mut control: U,
) -> JoinHandle<IoResult<()>>
where
Expand Down Expand Up @@ -351,10 +355,10 @@ where
tokio::task::spawn(async move {
loop {
tokio::select!(
incoming_opt = incoming_write_halves.next() => {
incoming_opt = incoming_halves.next() => {
match incoming_opt {
Some(Ok((framed_write_half, framed_read_half))) => {
self.handle_incoming_connection(framed_write_half, framed_read_half, &mut incoming_packet_reader_tx).await;
Some(Ok((write_half, read_half))) => {
self.handle_incoming_connection(write_half, read_half, &mut incoming_packet_reader_tx).await;
}
Some(Err(error)) => {
tracing::error!("ERROR: {}", error);
Expand Down
12 changes: 5 additions & 7 deletions src/multiplexer_senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<Si, Item> SenderPair<Si, Item> {
}

/// Stores Sender and provides a generated ID
pub struct MultiplexerSenders<Item, Si, Id> {
pub(crate) struct MultiplexerSenders<Item, Si, Id> {
sender_buffer_size: usize,
id_gen: Id,

Expand All @@ -59,7 +59,7 @@ impl<Item, Si, Id> MultiplexerSenders<Item, Si, Id>
where
Si: Sink<Item> + Unpin,
{
pub fn new(
pub(crate) fn new(
sender_buffer_size: usize,
id_gen: Id,
senders_channel: mpsc::UnboundedReceiver<(Sender<Si>, oneshot::Sender<StreamId>)>,
Expand All @@ -85,7 +85,6 @@ impl<Item, Si, Id> std::fmt::Debug for MultiplexerSenders<Item, Si, Id> {
impl<Item, Si, Id> MultiplexerSenders<Item, Si, Id>
where
Si: Sink<Item> + Unpin,
Si::Error: std::fmt::Debug,
Id: IdGen,
{
#[tracing::instrument(level = "trace", skip(self, sender, stream_id_channel))]
Expand Down Expand Up @@ -116,7 +115,7 @@ where
}

#[cfg(test)]
pub fn test_lengths(&self) -> (usize, usize) {
pub(crate) fn test_lengths(&self) -> (usize, usize) {
let futures_len = self.senders_stream.len();
let sender_pairs_len = self.sender_pairs.len();
(futures_len, sender_pairs_len)
Expand Down Expand Up @@ -192,8 +191,8 @@ where
}
}
}
(Some(Err(err)), _sender) => {
tracing::error!(?err, "senders produced an error");
(Some(Err(_err)), _sender) => {
tracing::error!("senders produced an error");
todo!();
}
(None, _sender) => todo!(),
Expand Down Expand Up @@ -245,7 +244,6 @@ where
}

// FIXME: TODO:
// - Figure out how to remove closed senders (and send_tx) from the hashmaps
// - Check to see if the sender should be re-inserted when they come out of the FuturesUnordered
// - If the reader is closed, what do we do?

Expand Down
12 changes: 10 additions & 2 deletions src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use crate::*;

pub struct Sender<Si> {
/// Holds the write-half, it's stream id, and a control structure to shutdown the read-half when it is
/// dropped.
pub(crate) struct Sender<Si> {
// Optional, because it is set late.
stream_id: Option<StreamId>,

// The write half of the stream
sink: Option<Si>,

// Control structure that is used to stop and drop the read half
read_halt: HaltRead,
}

impl<Si> Unpin for Sender<Si> where Si: Unpin {}

impl<Si> std::fmt::Debug for Sender<Si> {
Expand All @@ -15,7 +23,7 @@ impl<Si> std::fmt::Debug for Sender<Si> {

impl<Si> Sender<Si> {
#[tracing::instrument(level = "trace", skip(sink, read_halt))]
pub fn new(sink: Si, read_halt: HaltRead) -> Self {
pub(crate) fn new(sink: Si, read_halt: HaltRead) -> Self {
Self {
stream_id: None,
sink: Some(sink),
Expand Down
Loading

0 comments on commit a5ce16b

Please sign in to comment.