Skip to content

Commit

Permalink
Instantiate socket state once
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Jan 30, 2024
1 parent 50afb27 commit 69c0c91
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
12 changes: 4 additions & 8 deletions neqo-common/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ use crate::{Datagram, IpTos};
/// # Panics
///
/// Panics if the datagram is too large to send.
pub fn tx(socket: impl AsFd, d: &Datagram) -> io::Result<usize> {
// TODO: Don't instantiate on each write.
let send_state = UdpSocketState::new((&socket).into()).unwrap();
pub fn tx(socket: impl AsFd, state: &UdpSocketState, d: &Datagram) -> io::Result<usize> {
let transmit = Transmit {
destination: d.destination(),
ecn: EcnCodepoint::from_bits(Into::<u8>::into(d.tos())),
Expand All @@ -44,7 +42,7 @@ pub fn tx(socket: impl AsFd, d: &Datagram) -> io::Result<usize> {
// TODO
src_ip: None,
};
let n = send_state
let n = state
.send((&socket).into(), slice::from_ref(&transmit))
.unwrap();
Ok(n)
Expand Down Expand Up @@ -72,21 +70,19 @@ pub fn tx(socket: impl AsFd, d: &Datagram) -> io::Result<usize> {
/// Panics if the datagram is too large to receive.
pub fn rx(
socket: impl AsFd,
state: &UdpSocketState,
buf: &mut [u8],
// TODO: Can these be return values instead of mutable inputs?
tos: &mut u8,
ttl: &mut u8,
) -> io::Result<(usize, SocketAddr)> {
let mut meta = RecvMeta::default();
// TODO: Don't instantiate on each read.
let recv_state = UdpSocketState::new((&socket).into()).unwrap();

// TODO: needed?
// #[cfg(test)]
// // `UdpSocketState` switches to non-blocking mode, undo that for the tests.
// socket.set_nonblocking(false).unwrap();

match recv_state.recv(
match state.recv(
(&socket).into(),
&mut [IoSliceMut::new(buf)],
slice::from_mut(&mut meta),
Expand Down
1 change: 1 addition & 0 deletions neqo-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ qlog = "0.11.0"
regex = "1.9"
structopt = "0.3"
tokio = { version = "1", features = ["net", "time", "macros", "rt", "rt-multi-thread"] }
quinn-udp = { git = "https://github.com/quinn-rs/quinn/" }

[features]
deny-warnings = []
65 changes: 38 additions & 27 deletions neqo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,19 +580,21 @@ impl HttpServer for SimpleServer {

fn read_dgram(
socket: &mut tokio::net::UdpSocket,
state: &quinn_udp::UdpSocketState,
local_address: &SocketAddr,
) -> Result<Option<Datagram>, io::Error> {
let mut buf = [0; u16::MAX as usize];
let mut tos = 0;
let mut ttl = 0;
let (sz, remote_addr) = match udp::rx(socket, &mut buf[..], &mut tos, &mut ttl) {
let (sz, remote_addr) = match udp::rx(socket, state, &mut buf[..], &mut tos, &mut ttl) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(err) => {
eprintln!("UDP recv error: {err:?}");
return Err(err);
}
Ok(res) => res,
};
qdebug!("read {}, {:?}", sz, &buf[0..32]);

if sz == buf.len() {
eprintln!("Might have received more than {} bytes", buf.len());
Expand All @@ -616,7 +618,7 @@ struct ServersRunner {
args: Args,
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, tokio::net::UdpSocket)>,
sockets: Vec<(SocketAddr, tokio::net::UdpSocket, quinn_udp::UdpSocketState)>,
}

impl ServersRunner {
Expand Down Expand Up @@ -660,14 +662,13 @@ impl ServersRunner {

print!("Server waiting for connection on: {local_addr:?}");

// TODO: needed?
socket
.set_nonblocking(true)
.expect("set_nonblocking to succeed");
let state = quinn_udp::UdpSocketState::new((&socket).into()).unwrap();

self.sockets.push((
host,
tokio::net::UdpSocket::from_std(socket).expect("conversion to Tokio socket to succeed"),
tokio::net::UdpSocket::from_std(socket)
.expect("conversion to Tokio socket to succeed"),
state,
));
}

Expand Down Expand Up @@ -708,27 +709,30 @@ impl ServersRunner {
}

/// Tries to find a socket, but then just falls back to sending from the first.
fn find_socket(&mut self, addr: SocketAddr) -> &mut tokio::net::UdpSocket {
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap();
fn find_socket(
&mut self,
addr: SocketAddr,
) -> (&mut tokio::net::UdpSocket, &mut quinn_udp::UdpSocketState) {
let ((_host, first_socket, first_state), rest) = self.sockets.split_first_mut().unwrap();
rest.iter_mut()
.map(|(_host, socket)| socket)
.find(|socket| {
.map(|(_host, socket, state)| (socket, state))
.find(|(socket, _state)| {
socket
.local_addr()
.ok()
.map_or(false, |socket_addr| socket_addr == addr)
})
.unwrap_or(first_socket)
.unwrap_or((first_socket, first_state))
}

async fn process(&mut self, mut dgram: Option<&Datagram>) {
async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
qdebug!("process with {:?}", dgram);
loop {
match self.server.process(dgram.take(), self.args.now()) {
Output::Datagram(dgram) => {
let socket = self.find_socket(dgram.source());
if let Err(e) = udp::tx(socket, &dgram) {
eprintln!("UDP write error: {}", e);
}
qdebug!("writing to {:?}", dgram.source());
let (socket, state) = self.find_socket(dgram.source());
udp::tx(socket, state, &dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
Expand All @@ -741,14 +745,15 @@ impl ServersRunner {
}
}
}
Ok(())
}

// Wait for any of the sockets to be readable or the timeout to fire.
async fn ready(&mut self) -> Result<Ready, io::Error> {
let sockets_ready = select_all(
self.sockets
.iter()
.map(|(_host, socket)| Box::pin(socket.readable())),
.map(|(_host, socket, _state)| Box::pin(socket.readable())),
)
.map(|(res, inx, _)| match res {
Ok(()) => Ok(Ready::Socket(inx)),
Expand All @@ -765,22 +770,28 @@ impl ServersRunner {

async fn run(&mut self) -> Result<(), io::Error> {
loop {
qdebug!("iteration");
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgram = read_dgram(socket, host)?;
if dgram.is_none() {
break;
Ready::Socket(inx) => {
qdebug!("socket {} ready", inx);
loop {
qdebug!("reading from {}", inx);
let (host, socket, state) = self.sockets.get_mut(inx).unwrap();
let dgram = read_dgram(socket, state, host)?;
if dgram.is_none() {
break;
}
self.process(dgram.as_ref()).await?;
}
self.process(dgram.as_ref()).await;
},
}
Ready::Timeout => {
self.process(None).await;
qdebug!("timeout fired");
self.process(None).await?;
}
}

self.server.process_events(&self.args, self.args.now());
self.process(None).await;
self.process(None).await?;
}
}
}
Expand Down

0 comments on commit 69c0c91

Please sign in to comment.