Skip to content

Commit

Permalink
feat(s2n-quic-dc): add mpsc channel (#2503)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Mar 4, 2025
1 parent d88faa4 commit 4d60027
Show file tree
Hide file tree
Showing 8 changed files with 594 additions and 23 deletions.
21 changes: 14 additions & 7 deletions dc/s2n-quic-dc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ exclude = ["corpus.tar.gz"]

[features]
default = ["tokio"]
testing = ["bolero-generator", "s2n-quic-core/testing", "s2n-quic-platform/testing", "tracing-subscriber"]
testing = [
"bolero-generator",
"s2n-quic-core/testing",
"s2n-quic-platform/testing",
"tracing-subscriber",
]
tokio = ["tokio/io-util", "tokio/net", "tokio/rt-multi-thread", "tokio/time"]

[dependencies]
arrayvec = "0.7"
atomic-waker = "1"
aws-lc-rs = "1"
bach = "0.0.10"
bitflags = "2"
bolero-generator = { version = "0.13", default-features = false, optional = true }
bytes = "1"
Expand All @@ -41,7 +47,9 @@ hashbrown = "0.15"
thiserror = "2"
tokio = { version = "1", default-features = false, features = ["sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
], optional = true }
zerocopy = { version = "0.7", features = ["derive"] }
zeroize = "1"
parking_lot = "0.12"
Expand All @@ -53,13 +61,12 @@ bolero-generator = "0.13"
insta = "1"
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] }
s2n-quic-platform = { path = "../../quic/s2n-quic-platform", features = ["testing"] }
s2n-quic-platform = { path = "../../quic/s2n-quic-platform", features = [
"testing",
] }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints.rust.unexpected_cfgs]
level = "warn"
check-cfg = [
'cfg(kani)',
'cfg(todo)',
]
check-cfg = ['cfg(kani)', 'cfg(todo)']
2 changes: 1 addition & 1 deletion dc/s2n-quic-dc/src/stream/server/tokio/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
application::{Builder as StreamBuilder, Stream},
environment::{tokio::Environment, Environment as _},
},
sync::channel,
sync::mpmc as channel,
};
use core::time::Duration;
use s2n_quic_core::time::Clock;
Expand Down
2 changes: 1 addition & 1 deletion dc/s2n-quic-dc/src/stream/server/tokio/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{event::Subscriber, sync::channel as chan};
use crate::{event::Subscriber, sync::mpmc as chan};
use core::{
sync::atomic::{AtomicU64, Ordering},
time::Duration,
Expand Down
3 changes: 2 additions & 1 deletion dc/s2n-quic-dc/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod channel;
pub mod mpmc;
pub mod mpsc;
pub mod ring_deque;
149 changes: 145 additions & 4 deletions dc/s2n-quic-dc/src/sync/channel.rs → dc/s2n-quic-dc/src/sync/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl<T> Channel<T> {
}
}

/// A message sender
///
/// Note that this channel implementation does not allow for backpressure on the
/// sending rate. Instead, the queue is rotated to make room for new items and
/// returned to the sender.
pub struct Sender<T> {
channel: Arc<Channel<T>>,
}
Expand Down Expand Up @@ -122,8 +127,6 @@ pin_project! {
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
// Inner channel state.
channel: Arc<Channel<T>>,
Expand All @@ -148,7 +151,6 @@ pin_project! {
}
}

#[allow(dead_code)] // TODO remove this once the module is public
impl<T> Receiver<T> {
/// Attempts to receive a message from the front of the channel.
///
Expand Down Expand Up @@ -204,6 +206,12 @@ impl<T> Receiver<T> {
channel: Arc::downgrade(&self.channel),
}
}

/// Closes the channel for receiving
#[inline]
pub fn close(&self) -> Result<(), Closed> {
self.channel.close()
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down Expand Up @@ -232,7 +240,6 @@ pub struct WeakReceiver<T> {
channel: Weak<Channel<T>>,
}

#[allow(dead_code)] // TODO remove this once the module is public
impl<T> WeakReceiver<T> {
#[inline]
pub fn pop_front_if<F>(&self, priority: Priority, f: F) -> Result<Option<T>, Closed>
Expand Down Expand Up @@ -316,3 +323,137 @@ impl<T> EventListenerFuture for RecvInner<'_, T> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{ext::*, sim, task};
use std::time::Duration;

#[test]
fn test_unlimited() {
sim(|| {
let (tx, rx) = new(2);

async move {
for v in 0u64.. {
if tx.send_back(v).is_err() {
return;
};
// let the receiver read from the task
task::yield_now().await;
}
}
.primary()
.spawn();

async move {
for expected in 0u64..10 {
let actual = rx.recv_front().await.unwrap();
assert_eq!(actual, expected);
}
}
.primary()
.spawn();
});
}

#[test]
fn test_send_limited() {
sim(|| {
let (tx, rx) = new(2);

async move {
for v in 0u64.. {
if tx.send_back(v).is_err() {
return;
};
Duration::from_millis(1).sleep().await;
}
}
.primary()
.spawn();

async move {
for expected in 0u64..10 {
let actual = rx.recv_front().await.unwrap();
assert_eq!(actual, expected);
}
}
.primary()
.spawn();
});
}

#[test]
fn test_recv_limited() {
sim(|| {
let (tx, rx) = new(2);

async move {
for v in 0u64.. {
match tx.send_back(v) {
Ok(Some(_old)) => {
// the channel doesn't provide backpressure so we'll need to sleep
Duration::from_millis(1).sleep().await;
}
Ok(None) => {
continue;
}
Err(_) => {
// the receiver is done
return;
}
}
}
}
.primary()
.spawn();

async move {
let mut min = 0;
for _ in 0u64..10 {
let actual = rx.recv_front().await.unwrap();
assert!(actual > min || actual == 0);
min = actual;
Duration::from_millis(1).sleep().await;
}
}
.primary()
.spawn();
});
}

#[test]
fn test_multi_recv() {
sim(|| {
let (tx, rx) = new(2);

async move {
for v in 0u64.. {
if tx.send_back(v).is_err() {
return;
};
// let the receiver read from the task
task::yield_now().await;
}
}
.primary()
.spawn();

for _ in 0..2 {
let rx = rx.clone();
async move {
let mut min = 0;
for _ in 0u64..10 {
let actual = rx.recv_front().await.unwrap();
assert!(actual > min || actual == 0, "{actual} > {min}");
min = actual;
}
}
.primary()
.spawn();
}
});
}
}
Loading

0 comments on commit 4d60027

Please sign in to comment.