Skip to content

Commit

Permalink
Allocate Incoming response buffers as needed
Browse files Browse the repository at this point in the history
Threading buffers through alongside the `Incoming` does not reduce the
number of allocations.
  • Loading branch information
Ralith authored and djc committed Apr 7, 2024
1 parent 8fbcf08 commit cc0d2e9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 59 deletions.
35 changes: 13 additions & 22 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,9 @@ pub(crate) struct EndpointInner {
}

impl EndpointInner {
pub(crate) fn accept(
&self,
incoming: proto::Incoming,
mut response_buffer: BytesMut,
) -> Result<Connecting, ConnectionError> {
pub(crate) fn accept(&self, incoming: proto::Incoming) -> Result<Connecting, ConnectionError> {
let mut state = self.state.lock().unwrap();
let mut response_buffer = BytesMut::new();
match state
.inner
.accept(incoming, Instant::now(), &mut response_buffer)
Expand All @@ -383,25 +380,19 @@ impl EndpointInner {
}
}

pub(crate) fn refuse(&self, incoming: proto::Incoming, mut response_buffer: BytesMut) {
pub(crate) fn refuse(&self, incoming: proto::Incoming) {
let mut state = self.state.lock().unwrap();
let mut response_buffer = BytesMut::new();
let transmit = state.inner.refuse(incoming, &mut response_buffer);
state.transmit_state.respond(transmit, response_buffer);
}

pub(crate) fn retry(
&self,
incoming: proto::Incoming,
mut response_buffer: BytesMut,
) -> Result<(), (proto::RetryError, BytesMut)> {
pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> {
let mut state = self.state.lock().unwrap();
match state.inner.retry(incoming, &mut response_buffer) {
Ok(transmit) => {
state.transmit_state.respond(transmit, response_buffer);
Ok(())
}
Err(e) => Err((e, response_buffer)),
}
let mut response_buffer = BytesMut::new();
let transmit = state.inner.retry(incoming, &mut response_buffer)?;
state.transmit_state.respond(transmit, response_buffer);
Ok(())
}
}

Expand All @@ -410,7 +401,7 @@ pub(crate) struct State {
socket: Arc<dyn AsyncUdpSocket>,
inner: proto::Endpoint,
transmit_state: TransmitState,
incoming: VecDeque<(proto::Incoming, BytesMut)>,
incoming: VecDeque<proto::Incoming>,
driver: Option<Waker>,
ipv6: bool,
connections: ConnectionSet,
Expand Down Expand Up @@ -464,7 +455,7 @@ impl State {
) {
Some(DatagramEvent::NewConnection(incoming)) => {
if self.incoming.len() < MAX_INCOMING_CONNECTIONS {
self.incoming.push_back((incoming, response_buffer));
self.incoming.push_back(incoming);
} else {
let transmit =
self.inner.refuse(incoming, &mut response_buffer);
Expand Down Expand Up @@ -707,10 +698,10 @@ impl<'a> Future for Accept<'a> {
if endpoint.driver_lost {
return Poll::Ready(None);
}
if let Some((incoming, response_buffer)) = endpoint.incoming.pop_front() {
if let Some(incoming) = endpoint.incoming.pop_front() {
// Release the mutex lock on endpoint so cloning it doesn't deadlock
drop(endpoint);
let incoming = Incoming::new(incoming, this.endpoint.inner.clone(), response_buffer);
let incoming = Incoming::new(incoming, this.endpoint.inner.clone());
return Poll::Ready(Some(incoming));
}
if endpoint.connections.close.is_some() {
Expand Down
50 changes: 13 additions & 37 deletions quinn/src/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{
fmt,
future::{Future, IntoFuture},
net::{IpAddr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};

use bytes::BytesMut;
use proto::ConnectionError;
use thiserror::Error;

Expand All @@ -16,48 +14,37 @@ use crate::{
};

/// An incoming connection for which the server has not yet begun its part of the handshake
#[derive(Debug)]
pub struct Incoming(Option<State>);

impl Incoming {
pub(crate) fn new(
inner: proto::Incoming,
endpoint: EndpointRef,
response_buffer: BytesMut,
) -> Self {
Self(Some(State {
inner,
endpoint,
response_buffer,
}))
pub(crate) fn new(inner: proto::Incoming, endpoint: EndpointRef) -> Self {
Self(Some(State { inner, endpoint }))
}

/// Attempt to accept this incoming connection (an error may still occur)
pub fn accept(mut self) -> Result<Connecting, ConnectionError> {
let state = self.0.take().unwrap();
state.endpoint.accept(state.inner, state.response_buffer)
state.endpoint.accept(state.inner)
}

/// Reject this incoming connection attempt
pub fn refuse(mut self) {
let state = self.0.take().unwrap();
state.endpoint.refuse(state.inner, state.response_buffer);
state.endpoint.refuse(state.inner);
}

/// Respond with a retry packet, requiring the client to retry with address validation
///
/// Errors if `remote_address_validated()` is true.
pub fn retry(mut self) -> Result<(), RetryError> {
let state = self.0.take().unwrap();
state
.endpoint
.retry(state.inner, state.response_buffer)
.map_err(|(e, response_buffer)| {
RetryError(Self(Some(State {
inner: e.into_incoming(),
endpoint: state.endpoint,
response_buffer,
})))
})
state.endpoint.retry(state.inner).map_err(|e| {
RetryError(Self(Some(State {
inner: e.into_incoming(),
endpoint: state.endpoint,
})))
})
}

/// Ignore this incoming connection attempt, not sending any packet in response
Expand Down Expand Up @@ -89,26 +76,15 @@ impl Drop for Incoming {
fn drop(&mut self) {
// Implicit reject, similar to Connection's implicit close
if let Some(state) = self.0.take() {
state.endpoint.refuse(state.inner, state.response_buffer);
state.endpoint.refuse(state.inner);
}
}
}

impl fmt::Debug for Incoming {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let state = self.0.as_ref().unwrap();
f.debug_struct("Incoming")
.field("inner", &state.inner)
.field("endpoint", &state.endpoint)
// response_buffer is too big and not meaningful enough
.finish_non_exhaustive()
}
}

#[derive(Debug)]
struct State {
inner: proto::Incoming,
endpoint: EndpointRef,
response_buffer: BytesMut,
}

/// Error for attempting to retry an [`Incoming`] which already bears an address
Expand Down

0 comments on commit cc0d2e9

Please sign in to comment.