Skip to content

Commit

Permalink
address some comments from Ben
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Jul 6, 2023
1 parent e0eeaf1 commit e6f8e9d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
33 changes: 17 additions & 16 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::{
net::{SocketAddr, SocketAddrV6},
pin::Pin,
str,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
time::Instant,
};
Expand Down Expand Up @@ -119,13 +116,11 @@ impl Endpoint {
) -> io::Result<Self> {
let addr = socket.local_addr()?;
let allow_mtud = !socket.may_fragment();
let transmit_queue_size = Arc::new(AtomicUsize::default());
let rc = EndpointRef::new(
socket,
proto::Endpoint::new(Arc::new(config), server_config.map(Arc::new), allow_mtud),
addr.is_ipv6(),
runtime.clone(),
transmit_queue_size,
);
let driver = EndpointDriver(rc.clone());
runtime.spawn(Box::pin(async {
Expand Down Expand Up @@ -386,7 +381,7 @@ pub(crate) struct State {
send_limiter: WorkLimiter,
runtime: Arc<dyn Runtime>,
/// The aggregateed contents length of the packets in the transmit queue
transmit_queue_contents_len: Arc<AtomicUsize>,
transmit_queue_contents_len: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -444,13 +439,18 @@ impl State {
.send(ConnectionEvent::Proto(event));
}
Some(DatagramEvent::Response(t)) => {
if self.transmit_queue_contents_len.load(Ordering::Relaxed)
// Limiting the memory usage for items queued in the outgoing queue from endpoint
// generated packets. Otherwise, we may see a build-up of the queue under test with
// flood of initial packets against the endpoint. The sender with the sender-limiter
// may not keep up the pace of these packets queued into the queue.
if self.transmit_queue_contents_len
< MAX_TRANSMIT_QUEUE_CONTENTS_LEN
{
let contents_len = t.contents.len();
self.outgoing.push_back(udp_transmit(t));
self.transmit_queue_contents_len
.fetch_add(contents_len, Ordering::Relaxed);
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_add(contents_len);
}
}
None => {}
Expand Down Expand Up @@ -499,8 +499,9 @@ impl State {
Poll::Ready(Ok(n)) => {
let contents_len: usize =
self.outgoing.drain(..n).map(|t| t.contents.len()).sum();
self.transmit_queue_contents_len
.fetch_sub(contents_len, Ordering::Relaxed);
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_sub(contents_len);
// We count transmits instead of `poll_send` calls since the cost
// of a `sendmmsg` still linearly increases with number of packets.
self.send_limiter.record_work(n);
Expand Down Expand Up @@ -544,8 +545,9 @@ impl State {
Transmit(t) => {
let contents_len = t.contents.len();
self.outgoing.push_back(udp_transmit(t));
self.transmit_queue_contents_len
.fetch_add(contents_len, Ordering::Relaxed);
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_add(contents_len);
}
},
Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),
Expand Down Expand Up @@ -675,7 +677,6 @@ impl EndpointRef {
inner: proto::Endpoint,
ipv6: bool,
runtime: Arc<dyn Runtime>,
transmit_queue_size: Arc<AtomicUsize>,
) -> Self {
let udp_state = Arc::new(UdpState::new());
let recv_buf = vec![
Expand Down Expand Up @@ -710,7 +711,7 @@ impl EndpointRef {
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
runtime,
transmit_queue_contents_len: transmit_queue_size,
transmit_queue_contents_len: 0,
}),
}))
}
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ const SEND_TIME_BOUND: Duration = Duration::from_micros(50);
/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets
/// generated from the endpoint (retry or initial close) can be dropped when this limit is being execeeded.
/// Chose to represent 100 MB of data.
const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100000000;
const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 10_0000_000;

0 comments on commit e6f8e9d

Please sign in to comment.