Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-core): add stream state enums #2132

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quic/s2n-quic-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod iter;
pub mod limits;
#[cfg(feature = "alloc")]
pub mod ops;
pub mod state;
mod type_;

pub use error::*;
Expand Down
63 changes: 63 additions & 0 deletions quic/s2n-quic-core/src/stream/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::ensure;
use core::fmt;

pub type Result<T> = core::result::Result<(), Error<T>>;

macro_rules! transition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in a non-stream specific module? it could be used for some of the BBR enums that have state transitions

($state:ident, $valid:pat => $target:expr) => {{
ensure!(*$state != $target, Err(Error::NoOp { current: $target }));
ensure!(
matches!($state, $valid),
Err(Error::InvalidTransition {
current: $state.clone(),
target: $target
})
);
#[cfg(feature = "tracing")]
{
tracing::debug!(prev = ?$state, next = ?$target);
}
*$state = $target;
Ok(())
}};
}

macro_rules! is {
($($state:ident)|+, $function:ident) => {
#[inline]
pub fn $function(&self) -> bool {
matches!(self, $(Self::$state)|*)
}
};
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Error<T> {
NoOp { current: T },
InvalidTransition { current: T, target: T },
}
Comment on lines +38 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the state transitions for streams would be influenced by the peer, so you'd need to return an error and close the connection (versus just panicking in a debug assertion). This is different than in BBR for example where if a an invalid state transition it was a software bug and we can just panic. So maybe not directly useable by BBR as is, but maybe somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah most likely could be used elsewhere, including BBR where we just have a variant that does debug_assertion instead of return an error.

One nice pattern about returning an error though is you don't have to check the state before transitioning. So something like

fn on_packet_sent(&mut self, packet: &[u8], is_fin: bool) {
    // ignore if we're already in this state, just make sure we transition from Ready to Send
    let _ = self.state.on_send();

    if is_fin && self.state.on_send_fin().is_ok() {
        println!("first transmission of fin bit");
    }
}


impl<T: fmt::Debug> fmt::Display for Error<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoOp { current } => {
write!(f, "state is already set to {current:?}")
}
Self::InvalidTransition { current, target } => {
write!(f, "invalid transition from {current:?} to {target:?}",)
}
}
}
}

#[cfg(feature = "std")]
impl<T: fmt::Debug> std::error::Error for Error<T> {}

mod recv;
mod send;

pub use recv::Receiver;
pub use send::Sender;
124 changes: 124 additions & 0 deletions quic/s2n-quic-core/src/stream/state/recv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

//= https://www.rfc-editor.org/rfc/rfc9000#section-3.2
//# o
//# | Recv STREAM / STREAM_DATA_BLOCKED / RESET_STREAM
//# | Create Bidirectional Stream (Sending)
//# | Recv MAX_STREAM_DATA / STOP_SENDING (Bidirectional)
//# | Create Higher-Numbered Stream
//# v
//# +-------+
//# | Recv | Recv RESET_STREAM
//# | |-----------------------.
//# +-------+ |
//# | |
//# | Recv STREAM + FIN |
//# v |
//# +-------+ |
//# | Size | Recv RESET_STREAM |
//# | Known |---------------------->|
//# +-------+ |
//# | |
//# | Recv All Data |
//# v v
//# +-------+ Recv RESET_STREAM +-------+
//# | Data |--- (optional) --->| Reset |
//# | Recvd | Recv All Data | Recvd |
//# +-------+<-- (optional) ----+-------+
//# | |
//# | App Read All Data | App Read Reset
//# v v
//# +-------+ +-------+
//# | Data | | Reset |
//# | Read | | Read |
//# +-------+ +-------+

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum Receiver {
#[default]
Recv,
SizeKnown,
DataRecvd,
DataRead,
ResetRecvd,
ResetRead,
}

impl Receiver {
is!(Recv, is_receiving);
is!(SizeKnown, is_size_known);
is!(DataRecvd, is_data_received);
is!(DataRead, is_data_read);
is!(ResetRecvd, is_reset_received);
is!(ResetRead, is_reset_read);
is!(DataRead | ResetRead, is_terminal);
Comment on lines +51 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like something that would be useful at some point to implement as a derive macro on the Enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah possibly. I'm sure there's one that already exists. The question is it worth the dependency 😄


#[inline]
pub fn on_receive_fin(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, Recv => SizeKnown)
}

#[inline]
pub fn on_receive_all_data(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, SizeKnown => DataRecvd)
}

#[inline]
pub fn on_app_read_all_data(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, DataRecvd => DataRead)
}

#[inline]
pub fn on_reset(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, Recv | SizeKnown => ResetRecvd)
}

#[inline]
pub fn on_app_read_reset(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, ResetRecvd => ResetRead)
}
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_debug_snapshot;

#[test]
#[cfg_attr(miri, ignore)]
fn snapshots() {
let mut outcomes = vec![];
let states = [
Receiver::Recv,
Receiver::SizeKnown,
Receiver::DataRecvd,
Receiver::DataRead,
Receiver::ResetRecvd,
Receiver::ResetRead,
];
for state in states {
macro_rules! push {
($event:ident) => {
let mut target = state.clone();
let result = target.$event().map(|_| target);
outcomes.push((state.clone(), stringify!($event), result));
};
}
push!(on_receive_fin);
push!(on_receive_all_data);
push!(on_app_read_all_data);
push!(on_reset);
push!(on_app_read_reset);
}

assert_debug_snapshot!(outcomes);
}
}
137 changes: 137 additions & 0 deletions quic/s2n-quic-core/src/stream/state/send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

//= https://www.rfc-editor.org/rfc/rfc9000#section-3.1
//# o
//# | Create Stream (Sending)
//# | Peer Creates Bidirectional Stream
//# v
//# +-------+
//# | Ready | Send RESET_STREAM
//# | |-----------------------.
//# +-------+ |
//# | |
//# | Send STREAM / |
//# | STREAM_DATA_BLOCKED |
//# v |
//# +-------+ |
//# | Send | Send RESET_STREAM |
//# | |---------------------->|
//# +-------+ |
//# | |
//# | Send STREAM + FIN |
//# v v
//# +-------+ +-------+
//# | Data | Send RESET_STREAM | Reset |
//# | Sent |------------------>| Sent |
//# +-------+ +-------+
//# | |
//# | Recv All ACKs | Recv ACK
//# v v
//# +-------+ +-------+
//# | Data | | Reset |
//# | Recvd | | Recvd |
//# +-------+ +-------+

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum Sender {
#[default]
Ready,
Send,
DataSent,
DataRecvd,
/// An additional state for implementations to separate queueing a RESET_STREAM from actually
/// sending it
ResetQueued,
ResetSent,
ResetRecvd,
}

impl Sender {
is!(Ready, is_ready);
is!(Send, is_sending);
is!(DataSent, is_data_sent);
is!(DataRecvd, is_data_received);
is!(ResetQueued, is_reset_queued);
is!(ResetSent, is_reset_sent);
is!(ResetRecvd, is_reset_received);
is!(DataRecvd | ResetRecvd, is_terminal);

#[inline]
pub fn on_send_stream(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, Ready => Send)
}

#[inline]
pub fn on_send_fin(&mut self) -> Result<Self> {
use Sender::*;
// we can jump from Ready to DataSent even though the
// diagram doesn't explicitly highlight this transition
transition!(self, Ready | Send => DataSent)
}

#[inline]
pub fn on_queue_reset(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, Ready | Send | DataSent => ResetQueued)
}

#[inline]
pub fn on_send_reset(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, Ready | Send | DataSent | ResetQueued => ResetSent)
}

#[inline]
pub fn on_recv_all_acks(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, DataSent | ResetQueued => DataRecvd)
}

#[inline]
pub fn on_recv_reset_ack(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, ResetSent => ResetRecvd)
}
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_debug_snapshot;

#[test]
#[cfg_attr(miri, ignore)]
fn snapshots() {
let mut outcomes = vec![];
let states = [
Sender::Ready,
Sender::Send,
Sender::DataSent,
Sender::DataRecvd,
Sender::ResetQueued,
Sender::ResetSent,
Sender::ResetRecvd,
];
for state in states {
macro_rules! push {
($event:ident) => {
let mut target = state.clone();
let result = target.$event().map(|_| target);
outcomes.push((state.clone(), stringify!($event), result));
};
}
push!(on_send_stream);
push!(on_send_fin);
push!(on_queue_reset);
push!(on_send_reset);
push!(on_recv_all_acks);
push!(on_recv_reset_ack);
}

assert_debug_snapshot!(outcomes);
}
}
Loading
Loading