Skip to content

Commit

Permalink
refactor(s2n-quic-dc): thread event::Subscriber through streams (#2387)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Dec 2, 2024
1 parent 96d2e22 commit b52d7b9
Show file tree
Hide file tree
Showing 21 changed files with 645 additions and 298 deletions.
2 changes: 1 addition & 1 deletion dc/s2n-quic-dc/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3046,7 +3046,7 @@ mod traits {
#[doc = r" }"]
#[doc = r" }"]
#[doc = r" ```"]
type ConnectionContext: 'static + Send;
type ConnectionContext: 'static + Send + Sync;
#[doc = r" Creates a context to be passed to each connection-related event"]
fn create_connection_context(
&self,
Expand Down
110 changes: 67 additions & 43 deletions dc/s2n-quic-dc/src/stream/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
event,
event::{self, EndpointPublisher as _},
stream::{
recv::application::{self as recv, Reader},
send::application::{self as send, Writer},
Expand All @@ -14,21 +14,21 @@ use core::{fmt, time::Duration};
use s2n_quic_core::{buffer, time::Timestamp};
use std::{io, net::SocketAddr};

pub struct Builder {
pub read: recv::Builder,
pub write: send::Builder,
pub shared: ArcShared,
pub struct Builder<Sub: event::Subscriber> {
pub read: recv::Builder<Sub>,
pub write: send::Builder<Sub>,
pub shared: ArcShared<Sub>,
pub sockets: Box<dyn socket::application::Builder>,
pub queue_time: Timestamp,
}

impl Builder {
impl<Sub> Builder<Sub>
where
Sub: event::Subscriber,
{
/// Builds the stream and emits an event indicating that the stream was built
#[inline]
pub(crate) fn build<Pub>(self, publisher: &Pub) -> io::Result<(Stream, Duration)>
where
Pub: event::EndpointPublisher,
{
pub(crate) fn accept(self) -> io::Result<(Stream<Sub>, Duration)> {
let sojourn_time = {
let remote_address = self.shared.read_remote_addr();
let remote_address = &remote_address;
Expand All @@ -37,22 +37,30 @@ impl Builder {
let now = self.shared.common.clock.get_time();
let sojourn_time = now.saturating_duration_since(self.queue_time);

publisher.on_acceptor_stream_dequeued(event::builder::AcceptorStreamDequeued {
remote_address,
credential_id,
stream_id,
sojourn_time,
});
self.shared
.endpoint_publisher(now)
.on_acceptor_stream_dequeued(event::builder::AcceptorStreamDequeued {
remote_address,
credential_id,
stream_id,
sojourn_time,
});

// TODO emit event

sojourn_time
};

self.build_without_event()
.map(|stream| (stream, sojourn_time))
self.build().map(|stream| (stream, sojourn_time))
}

#[inline]
pub(crate) fn connect(self) -> io::Result<Stream<Sub>> {
self.build()
}

#[inline]
pub(crate) fn build_without_event(self) -> io::Result<Stream> {
pub(crate) fn build(self) -> io::Result<Stream<Sub>> {
let Self {
read,
write,
Expand All @@ -61,6 +69,8 @@ impl Builder {
queue_time: _,
} = self;

// TODO emit event

let sockets = sockets.build()?;
let read = read.build(shared.clone(), sockets.clone());
let write = write.build(shared, sockets);
Expand All @@ -69,35 +79,40 @@ impl Builder {

/// Emits an event indicating that the stream was pruned
#[inline]
pub(crate) fn prune<Pub>(
self,
reason: event::builder::AcceptorStreamPruneReason,
publisher: &Pub,
) where
Pub: event::EndpointPublisher,
{
pub(crate) fn prune(self, reason: event::builder::AcceptorStreamPruneReason) {
let now = self.shared.clock.get_time();
let remote_address = self.shared.read_remote_addr();
let remote_address = &remote_address;
let credential_id = &*self.shared.credentials().id;
let stream_id = self.shared.application().stream_id.into_varint().as_u64();
let sojourn_time = now.saturating_duration_since(self.queue_time);
publisher.on_acceptor_stream_pruned(event::builder::AcceptorStreamPruned {
remote_address,
credential_id,
stream_id,
sojourn_time,
reason,
});

self.shared
.endpoint_publisher(now)
.on_acceptor_stream_pruned(event::builder::AcceptorStreamPruned {
remote_address,
credential_id,
stream_id,
sojourn_time,
reason,
});

// TODO emit event
}
}

pub struct Stream {
read: Reader,
write: Writer,
pub struct Stream<Sub>
where
Sub: event::Subscriber,
{
read: Reader<Sub>,
write: Writer<Sub>,
}

impl fmt::Debug for Stream {
impl<Sub> fmt::Debug for Stream<Sub>
where
Sub: event::Subscriber,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Stream")
.field("peer_addr", &self.peer_addr().unwrap())
Expand All @@ -106,7 +121,10 @@ impl fmt::Debug for Stream {
}
}

impl Stream {
impl<Sub> Stream<Sub>
where
Sub: event::Subscriber,
{
#[inline]
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.read.peer_addr()
Expand Down Expand Up @@ -139,26 +157,29 @@ impl Stream {
}

#[inline]
pub fn split(&mut self) -> (&mut Reader, &mut Writer) {
pub fn split(&mut self) -> (&mut Reader<Sub>, &mut Writer<Sub>) {
(&mut self.read, &mut self.write)
}

#[inline]
pub fn into_split(self) -> (Reader, Writer) {
pub fn into_split(self) -> (Reader<Sub>, Writer<Sub>) {
(self.read, self.write)
}
}

#[cfg(feature = "tokio")]
mod tokio_impl {
use super::Stream;
use super::{event, Stream};
use core::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};

impl AsyncRead for Stream {
impl<Sub> AsyncRead for Stream<Sub>
where
Sub: event::Subscriber,
{
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
Expand All @@ -169,7 +190,10 @@ mod tokio_impl {
}
}

impl AsyncWrite for Stream {
impl<Sub> AsyncWrite for Stream<Sub>
where
Sub: event::Subscriber,
{
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
Expand Down
50 changes: 34 additions & 16 deletions dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
event,
path::secret,
stream::{
application::Stream,
Expand All @@ -15,21 +16,29 @@ use tokio::net::TcpStream;

/// Connects using the UDP transport layer
#[inline]
pub async fn connect_udp<H>(
pub async fn connect_udp<H, Sub>(
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
) -> io::Result<Stream>
env: &Environment<Sub>,
subscriber: Sub,
) -> io::Result<Stream<Sub>>
where
H: core::future::Future<Output = io::Result<secret::map::Peer>>,
Sub: event::Subscriber,
{
// ensure we have a secret for the peer
let peer = handshake.await?;

let stream = endpoint::open_stream(env, peer, env::UdpUnbound(acceptor_addr.into()), None)?;
let stream = endpoint::open_stream(
env,
peer,
env::UdpUnbound(acceptor_addr.into()),
subscriber,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build_without_event()?;
let mut stream = stream.connect()?;

debug_assert_eq!(stream.protocol(), Protocol::Udp);

Expand All @@ -40,21 +49,23 @@ where

/// Connects using the TCP transport layer
#[inline]
pub async fn connect_tcp<H>(
pub async fn connect_tcp<H, Sub>(
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
) -> io::Result<Stream>
env: &Environment<Sub>,
subscriber: Sub,
) -> io::Result<Stream<Sub>>
where
H: core::future::Future<Output = io::Result<secret::map::Peer>>,
Sub: event::Subscriber,
{
// Race TCP handshake with the TLS handshake
let (socket, peer) = tokio::try_join!(TcpStream::connect(acceptor_addr), handshake,)?;

let stream = endpoint::open_stream(env, peer, env::TcpRegistered(socket), None)?;
let stream = endpoint::open_stream(env, peer, env::TcpRegistered(socket), subscriber, None)?;

// build the stream inside the application context
let mut stream = stream.build_without_event()?;
let mut stream = stream.connect()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

Expand All @@ -69,15 +80,19 @@ where
///
/// The provided `map` must contain a shared secret for the `handshake_addr`
#[inline]
pub async fn connect_tcp_with(
pub async fn connect_tcp_with<Sub>(
peer: secret::map::Peer,
stream: TcpStream,
env: &Environment,
) -> io::Result<Stream> {
let stream = endpoint::open_stream(env, peer, env::TcpRegistered(stream), None)?;
env: &Environment<Sub>,
subscriber: Sub,
) -> io::Result<Stream<Sub>>
where
Sub: event::Subscriber,
{
let stream = endpoint::open_stream(env, peer, env::TcpRegistered(stream), subscriber, None)?;

// build the stream inside the application context
let mut stream = stream.build_without_event()?;
let mut stream = stream.connect()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

Expand All @@ -87,7 +102,10 @@ pub async fn connect_tcp_with(
}

#[inline]
async fn write_prelude(stream: &mut Stream) -> io::Result<()> {
async fn write_prelude<Sub>(stream: &mut Stream<Sub>) -> io::Result<()>
where
Sub: event::Subscriber,
{
// TODO should we actually write the prelude here or should we do late sealer binding on
// the first packet to reduce secret reordering on the peer

Expand Down
Loading

0 comments on commit b52d7b9

Please sign in to comment.