From a5ce16b2bc97df6a9a90bba04668ba8a7b9dd23d Mon Sep 17 00:00:00 2001 From: Benjamin Halsted Date: Tue, 25 Feb 2020 08:49:15 -0800 Subject: [PATCH] docs!: Improving docs, visibility, and constraints. Improves #7 --- Cargo.toml | 2 +- src/error.rs | 5 +- src/halt.rs | 8 +-- src/id_gen.rs | 3 +- src/lib.rs | 63 ++++++++++++++----- src/multiplexer.rs | 40 ++++++------ src/multiplexer_senders.rs | 12 ++-- src/sender.rs | 12 +++- tests/integration.rs | 3 + .../stream_producer/mod.rs | 0 10 files changed, 97 insertions(+), 51 deletions(-) rename src/stream_producer.rs => tests/stream_producer/mod.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index ad841b7..fe27c20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stream_multiplexer" -version = "0.2.0" +version = "0.3.0" authors = ["Benjamin Halsted "] edition = "2018" license = "MIT OR Apache-2.0" diff --git a/src/error.rs b/src/error.rs index 0e98deb..e565010 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,11 +15,8 @@ pub enum MultiplexerError { // #[error("Sending to full stream {0}")] // StreamClosed(StreamId), + /// Wrapper around std::io::Error #[error("IoError")] IoError(#[from] std::io::Error), - - /// Nothing to see here - #[error("Should never happen")] - UnitError, } diff --git a/src/halt.rs b/src/halt.rs index a4dde93..34f2d54 100644 --- a/src/halt.rs +++ b/src/halt.rs @@ -13,20 +13,20 @@ struct Inner { } #[derive(Clone, Debug)] -pub struct HaltRead { +pub(crate) struct HaltRead { inner: Arc, } impl HaltRead { #[tracing::instrument(level = "trace", skip(self))] - pub fn signal(&self) { + pub(crate) fn signal(&self) { tracing::trace!("setting atomic bool, triggering waker"); self.inner.set.store(true, Relaxed); self.inner.waker.wake(); } #[tracing::instrument(level = "trace", skip(read))] - pub fn wrap(read: St) -> (Self, HaltAsyncRead) + pub(crate) fn wrap(read: St) -> (Self, HaltAsyncRead) where St: Stream, { @@ -47,7 +47,7 @@ impl HaltRead { } #[derive(Debug)] -pub struct HaltAsyncRead { +pub(crate) struct HaltAsyncRead { inner: Arc, read: Option, } diff --git a/src/id_gen.rs b/src/id_gen.rs index 8cb3ee3..fb51084 100644 --- a/src/id_gen.rs +++ b/src/id_gen.rs @@ -12,7 +12,8 @@ pub trait IdGen: Default { fn seed(&mut self, _seed: usize) {} } -/// The default IdGen for MultiplexerSenders +/// Generates IDs for incoming streams. Is the default `IdGen` for `MultiplexerSenders`. +/// This implementation simply increments and wraps at the usize boundary. #[derive(Default, Copy, Clone, PartialEq, Debug)] pub struct IncrementIdGen { id: StreamId, diff --git a/src/lib.rs b/src/lib.rs index 98676f3..da7ab6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -/* FIXME #![warn( missing_docs, missing_debug_implementations, @@ -11,9 +10,17 @@ unused_import_braces, unused_qualifications )] +/* FIXME #![cfg_attr(debug_assertions, allow(dead_code))] #![cfg_attr(test, allow(dead_code))] */ +/*! +This crate provides stream multiplexing with channels. + +Channels have their own backpressure that does not affect other channels. + +Incoming streams are by default set to channel 0 and can be moved to other channels via `ControlMessage`s. +*/ mod error; mod halt; mod id_gen; @@ -22,40 +29,53 @@ mod multiplexer_senders; mod send_all_own; mod sender; mod stream_mover; -mod stream_producer; pub use error::*; use halt::*; pub use id_gen::*; pub use multiplexer::*; -pub use multiplexer_senders::*; +use multiplexer_senders::*; use send_all_own::*; use sender::*; use stream_mover::*; -pub use stream_producer::*; type StreamId = usize; /// Produced by the incoming stream -#[derive(Clone, PartialEq, Debug)] pub enum IncomingMessage { /// Value received from a stream Value(V), /// Sent when the stream has gone linkdead Linkdead, } -/// A packet representing a message for a stream -#[derive(Clone, PartialEq, Debug)] +impl std::fmt::Debug for IncomingMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IncomingMessage::Value(_) => write!(f, "IncomingMessage::Value(_)"), + IncomingMessage::Linkdead => write!(f, "IncomingMessage::Linkdead"), + } + } +} +/// A packet representing a message from a stream. pub struct IncomingPacket { id: StreamId, message: IncomingMessage, } +impl std::fmt::Debug for IncomingPacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IncomingPacket") + .field("id", &self.id) + .field("message", &self.message) + .finish() + } +} impl IncomingPacket { + /// Wraps a message that will be sent to the stream with the given `id`. pub fn new(id: StreamId, message: IncomingMessage) -> Self { Self { id, message } } - /// The id the message is from. + /// The id of the stream that the message is from. pub fn id(&self) -> StreamId { self.id } @@ -75,7 +95,7 @@ impl IncomingPacket { } /// The payload of an OutgoingPacket -#[derive(Copy, Clone, PartialEq, Debug)] +#[derive(Clone)] pub enum OutgoingMessage { /// Value to send to the stream Value(V), @@ -85,19 +105,34 @@ pub enum OutgoingMessage { Shutdown, } impl Unpin for OutgoingMessage where V: Unpin {} +impl std::fmt::Debug for OutgoingMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutgoingMessage::Value(_) => write!(f, "OutgoingMessage::Value(_)"), + OutgoingMessage::ChangeChannel(channel) => { + write!(f, "OutgoingMessage::ChangeChannel({})", channel) + } + OutgoingMessage::Shutdown => write!(f, "OutgoingMessage::Shutdown"), + } + } +} /// For sending Value or causing the stream to change to a different channel -#[derive(Clone, PartialEq, Debug)] pub struct OutgoingPacket { /// List of streams this packet is for. ids: Vec, /// The packet payload message: OutgoingMessage, } -impl OutgoingPacket -where - V: std::fmt::Debug + PartialEq + Clone, -{ +impl std::fmt::Debug for OutgoingPacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OutgoingPacket") + .field("ids", &self.ids) + .field("message", &self.message) + .finish() + } +} +impl OutgoingPacket { /// Creates an OutoingPacket message for a list of streams. pub fn new(ids: Vec, message: OutgoingMessage) -> Self { Self { ids, message } diff --git a/src/multiplexer.rs b/src/multiplexer.rs index bd35a0e..ba7943f 100644 --- a/src/multiplexer.rs +++ b/src/multiplexer.rs @@ -20,6 +20,9 @@ type StreamMovers = HashMap< >; /// Manages incoming streams of data and the enqueueing of outgoing data. +/// +/// Outgoing streams have their own buffer of messages and do not affect other streams. +/// Incoming streams have their messages routed into channels that have their own backpressure. pub struct Multiplexer where InSt: Stream, @@ -48,9 +51,7 @@ where OutSi: Sink + Unpin, { // FIXME: Consider taking a function that can determine which channel a packet should be in - /// Initializes with a stream that provides the outgoing packets which will be enqueued to the - /// corresponding streams, and a vector of sinks that represent different "channels" or - /// "categories" of data. + /// Calls `with_id_gen`, giving it an IncrementIdGen as well as the rest of the arguments. pub fn new( sender_buffer_size: usize, outgoing: Out, @@ -67,8 +68,9 @@ where InSt: Stream + Unpin, OutSi: Sink + Unpin, { - /// like `run` and gives the ability to overrid MultiplexerSenders, which is useful when - /// overridding the default IdGen in MultiplexerSenders for testing. + /// Initializes with a stream that provides the outgoing packets which will be enqueued to the + /// corresponding streams, and a vector of sinks that represent different "channels" or + /// "categories" of data. pub fn with_id_gen( sender_buffer_size: usize, id_gen: Id, @@ -178,21 +180,21 @@ where #[tracing::instrument( level = "trace", - skip(self, framed_write_half, framed_read_half, incoming_packet_reader_tx) + skip(self, write_half, read_half, incoming_packet_reader_tx) )] async fn handle_incoming_connection( &mut self, - framed_write_half: OutSi, - framed_read_half: InSt, + write_half: OutSi, + read_half: InSt, incoming_packet_reader_tx: &mut IncomingPacketReaderTx, ) { tracing::trace!("new connection"); // used to re-join the two halves so that we can shut down the reader - let (halt, async_read_halt) = HaltRead::wrap(framed_read_half); + let (halt, async_read_halt) = HaltRead::wrap(read_half); // Keep track of the write_half and generate a stream_id - let sender: Sender = Sender::new(framed_write_half, halt); + let sender: Sender = Sender::new(write_half, halt); let (stream_id_tx, stream_id_rx) = oneshot::channel(); self.senders_channel @@ -224,6 +226,9 @@ where InSt: Stream + Send + Unpin + 'static, InSt::Item: Send, { + /// Awaits incoming channel joins and messages from those streams in the channel. + /// + /// If there is backpressure, joining the channel also slows. fn run_channel( channel: usize, mut reader: SelectAll>>, @@ -235,9 +240,7 @@ where tokio::task::spawn(async move { loop { tracing::trace!(channel, "incoming loop start"); - // We do not have an incoming packet tokio::select! { - // FIXME: This block is duplicated, down below packet_reader = incoming_packet_reader_rx.recv() => { tracing::trace!("incoming socket (none)"); match packet_reader { @@ -281,7 +284,6 @@ where Out: Stream>, OutItem: Clone, OutSi: Sink, - OutSi::Error: std::fmt::Debug, Id: Send + Unpin + 'static, InSt: Send + Unpin + 'static, @@ -290,10 +292,12 @@ where OutItem: Send + Sync + 'static, OutSi: Send + Unpin + 'static, { - #[tracing::instrument(level = "debug", skip(incoming_write_halves, control))] + /// Start the multiplexer. Giving it a stream of incoming connection halves and a stream for + /// ControlMessages. + #[tracing::instrument(level = "debug", skip(incoming_halves, control))] pub async fn run( mut self, - mut incoming_write_halves: V, + mut incoming_halves: V, mut control: U, ) -> JoinHandle> where @@ -351,10 +355,10 @@ where tokio::task::spawn(async move { loop { tokio::select!( - incoming_opt = incoming_write_halves.next() => { + incoming_opt = incoming_halves.next() => { match incoming_opt { - Some(Ok((framed_write_half, framed_read_half))) => { - self.handle_incoming_connection(framed_write_half, framed_read_half, &mut incoming_packet_reader_tx).await; + Some(Ok((write_half, read_half))) => { + self.handle_incoming_connection(write_half, read_half, &mut incoming_packet_reader_tx).await; } Some(Err(error)) => { tracing::error!("ERROR: {}", error); diff --git a/src/multiplexer_senders.rs b/src/multiplexer_senders.rs index e451917..89a3a71 100644 --- a/src/multiplexer_senders.rs +++ b/src/multiplexer_senders.rs @@ -42,7 +42,7 @@ impl SenderPair { } /// Stores Sender and provides a generated ID -pub struct MultiplexerSenders { +pub(crate) struct MultiplexerSenders { sender_buffer_size: usize, id_gen: Id, @@ -59,7 +59,7 @@ impl MultiplexerSenders where Si: Sink + Unpin, { - pub fn new( + pub(crate) fn new( sender_buffer_size: usize, id_gen: Id, senders_channel: mpsc::UnboundedReceiver<(Sender, oneshot::Sender)>, @@ -85,7 +85,6 @@ impl std::fmt::Debug for MultiplexerSenders { impl MultiplexerSenders where Si: Sink + Unpin, - Si::Error: std::fmt::Debug, Id: IdGen, { #[tracing::instrument(level = "trace", skip(self, sender, stream_id_channel))] @@ -116,7 +115,7 @@ where } #[cfg(test)] - pub fn test_lengths(&self) -> (usize, usize) { + pub(crate) fn test_lengths(&self) -> (usize, usize) { let futures_len = self.senders_stream.len(); let sender_pairs_len = self.sender_pairs.len(); (futures_len, sender_pairs_len) @@ -192,8 +191,8 @@ where } } } - (Some(Err(err)), _sender) => { - tracing::error!(?err, "senders produced an error"); + (Some(Err(_err)), _sender) => { + tracing::error!("senders produced an error"); todo!(); } (None, _sender) => todo!(), @@ -245,7 +244,6 @@ where } // FIXME: TODO: -// - Figure out how to remove closed senders (and send_tx) from the hashmaps // - Check to see if the sender should be re-inserted when they come out of the FuturesUnordered // - If the reader is closed, what do we do? diff --git a/src/sender.rs b/src/sender.rs index 49ce3a4..7dd4553 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -1,10 +1,18 @@ use crate::*; -pub struct Sender { +/// Holds the write-half, it's stream id, and a control structure to shutdown the read-half when it is +/// dropped. +pub(crate) struct Sender { + // Optional, because it is set late. stream_id: Option, + + // The write half of the stream sink: Option, + + // Control structure that is used to stop and drop the read half read_halt: HaltRead, } + impl Unpin for Sender where Si: Unpin {} impl std::fmt::Debug for Sender { @@ -15,7 +23,7 @@ impl std::fmt::Debug for Sender { impl Sender { #[tracing::instrument(level = "trace", skip(sink, read_halt))] - pub fn new(sink: Si, read_halt: HaltRead) -> Self { + pub(crate) fn new(sink: Si, read_halt: HaltRead) -> Self { Self { stream_id: None, sink: Some(sink), diff --git a/tests/integration.rs b/tests/integration.rs index de1efb1..b4e92e4 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -3,6 +3,9 @@ * linkdeadsupport * server id **/ +mod stream_producer; +use stream_producer::*; + use stream_multiplexer::*; use bytes::{Bytes, BytesMut}; diff --git a/src/stream_producer.rs b/tests/stream_producer/mod.rs similarity index 100% rename from src/stream_producer.rs rename to tests/stream_producer/mod.rs