Skip to content

Commit

Permalink
add test (#1450)
Browse files Browse the repository at this point in the history
* Move datagrams below regular streams

This will starve datagrams, which comes with its own risks.

Eventually, we'll need a better strategy.

* OK, make it run

* Or maybe run it this way

* Organize imports better

* add test

* clippy

* address comments

* clippy

* address comments

* clippy

* clippy

* clippy

---------

Co-authored-by: Martin Thomson <[email protected]>
  • Loading branch information
KershawChang and martinthomson authored Jul 19, 2023
1 parent 98bbe59 commit 514cae0
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 58 deletions.
4 changes: 1 addition & 3 deletions neqo-http3/src/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::any::Any;
use std::cell::RefCell;
use std::cmp::min;
use std::fmt::Debug;
use std::mem;
use std::rc::Rc;

const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2
Expand Down Expand Up @@ -303,7 +302,6 @@ impl SendStream for SendMessage {
Some(self)
}

#[allow(clippy::drop_copy)]
fn send_data_atomic(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<()> {
let data_frame = HFrame::Data {
len: buf.len() as u64,
Expand All @@ -312,7 +310,7 @@ impl SendStream for SendMessage {
data_frame.encode(&mut enc);
self.stream.buffer(enc.as_ref());
self.stream.buffer(buf);
mem::drop(self.stream.send_buffer(conn)?);
let _ = self.stream.send_buffer(conn)?;
Ok(())
}
}
Expand Down
102 changes: 54 additions & 48 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,6 @@

// The class implementing a QUIC connection.

use std::{
cell::RefCell,
cmp::{max, min},
convert::TryFrom,
fmt::{self, Debug},
mem,
net::{IpAddr, SocketAddr},
ops::RangeInclusive,
rc::{Rc, Weak},
time::{Duration, Instant},
};

use smallvec::SmallVec;

use neqo_common::{
event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo,
qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role,
};
use neqo_crypto::{
agent::CertificateInfo, random, Agent, AntiReplay, AuthenticationStatus, Cipher, Client,
HandshakeState, PrivateKey, PublicKey, ResumptionToken, SecretAgentInfo, SecretAgentPreInfo,
Server, ZeroRttChecker,
};

use crate::{
addr_valid::{AddressValidation, NewTokenState},
cid::{
Expand Down Expand Up @@ -65,6 +41,27 @@ use crate::{
version::{Version, WireVersion},
AppError, ConnectionError, Error, Res, StreamId,
};
use neqo_common::{
event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo,
qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role,
};
use neqo_crypto::{
agent::CertificateInfo, random, Agent, AntiReplay, AuthenticationStatus, Cipher, Client,
HandshakeState, PrivateKey, PublicKey, ResumptionToken, SecretAgentInfo, SecretAgentPreInfo,
Server, ZeroRttChecker,
};
use smallvec::SmallVec;
use std::{
cell::RefCell,
cmp::{max, min},
convert::TryFrom,
fmt::{self, Debug},
mem,
net::{IpAddr, SocketAddr},
ops::RangeInclusive,
rc::{Rc, Weak},
time::{Duration, Instant},
};

mod idle;
pub mod params;
Expand All @@ -73,12 +70,13 @@ mod state;
#[cfg(test)]
pub mod test_internal;

pub use params::{ConnectionParameters, ACK_RATIO_SCALE};
pub use state::{ClosingFrame, State};

use idle::IdleTimeout;
use params::PreferredAddressConfig;
pub use params::{ConnectionParameters, ACK_RATIO_SCALE};
use saved::SavedDatagrams;
use state::StateSignaling;
pub use state::{ClosingFrame, State};

#[derive(Debug, Default)]
struct Packet(Vec<u8>);
Expand Down Expand Up @@ -1905,71 +1903,79 @@ impl Connection {
builder: &mut PacketBuilder,
tokens: &mut Vec<RecoveryToken>,
) -> Res<()> {
let stats = &mut self.stats.borrow_mut();
let frame_stats = &mut stats.frame_tx;
if self.role == Role::Server {
if let Some(t) = self.state_signaling.write_done(builder)? {
tokens.push(t);
self.stats.borrow_mut().frame_tx.handshake_done += 1;
frame_stats.handshake_done += 1;
}
}

// datagrams are best-effort and unreliable. Let streams starve them for now
// Check if there is a Datagram to be written
self.quic_datagrams
.write_frames(builder, tokens, &mut self.stats.borrow_mut());
if builder.is_full() {
return Ok(());
}

let stats = &mut self.stats.borrow_mut().frame_tx;

self.streams
.write_frames(TransmissionPriority::Critical, builder, tokens, stats);
.write_frames(TransmissionPriority::Critical, builder, tokens, frame_stats);
if builder.is_full() {
return Ok(());
}

self.streams
.write_frames(TransmissionPriority::Important, builder, tokens, stats);
self.streams.write_frames(
TransmissionPriority::Important,
builder,
tokens,
frame_stats,
);
if builder.is_full() {
return Ok(());
}

// NEW_CONNECTION_ID, RETIRE_CONNECTION_ID, and ACK_FREQUENCY.
self.cid_manager.write_frames(builder, tokens, stats)?;
self.cid_manager
.write_frames(builder, tokens, frame_stats)?;
if builder.is_full() {
return Ok(());
}
self.paths.write_frames(builder, tokens, stats)?;
self.paths.write_frames(builder, tokens, frame_stats)?;
if builder.is_full() {
return Ok(());
}

self.streams
.write_frames(TransmissionPriority::High, builder, tokens, stats);
.write_frames(TransmissionPriority::High, builder, tokens, frame_stats);
if builder.is_full() {
return Ok(());
}

self.streams
.write_frames(TransmissionPriority::Normal, builder, tokens, stats);
.write_frames(TransmissionPriority::Normal, builder, tokens, frame_stats);
if builder.is_full() {
return Ok(());
}

// Datagrams are best-effort and unreliable. Let streams starve them for now.
self.quic_datagrams.write_frames(builder, tokens, stats);
if builder.is_full() {
return Ok(());
}

let frame_stats = &mut stats.frame_tx;
// CRYPTO here only includes NewSessionTicket, plus NEW_TOKEN.
// Both of these are only used for resumption and so can be relatively low priority.
self.crypto
.write_frame(PacketNumberSpace::ApplicationData, builder, tokens, stats)?;
self.crypto.write_frame(
PacketNumberSpace::ApplicationData,
builder,
tokens,
frame_stats,
)?;
if builder.is_full() {
return Ok(());
}
self.new_token.write_frames(builder, tokens, stats)?;
self.new_token.write_frames(builder, tokens, frame_stats)?;
if builder.is_full() {
return Ok(());
}

self.streams
.write_frames(TransmissionPriority::Low, builder, tokens, stats);
.write_frames(TransmissionPriority::Low, builder, tokens, frame_stats);

#[cfg(test)]
{
Expand Down
92 changes: 88 additions & 4 deletions neqo-transport/src/connection/tests/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use crate::events::{ConnectionEvent, OutgoingDatagramOutcome};
use crate::frame::FRAME_TYPE_DATAGRAM;
use crate::packet::PacketBuilder;
use crate::quic_datagrams::MAX_QUIC_DATAGRAM;
use crate::{Connection, ConnectionError, ConnectionParameters, Error};
use crate::{
send_stream::{RetransmissionPriority, TransmissionPriority},
Connection, ConnectionError, ConnectionParameters, Error, StreamType,
};
use neqo_common::event::Provider;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::rc::Rc;
use std::{cell::RefCell, convert::TryFrom, rc::Rc};
use test_fixture::now;

const DATAGRAM_LEN_MTU: u64 = 1310;
Expand Down Expand Up @@ -224,6 +225,89 @@ fn datagram_acked() {
));
}

fn send_packet_and_get_server_event(
client: &mut Connection,
server: &mut Connection,
) -> ConnectionEvent {
let out = client.process_output(now()).dgram();
server.process_input(out.unwrap(), now());
let mut events: Vec<_> = server
.events()
.filter_map(|evt| match evt {
ConnectionEvent::RecvStreamReadable { .. } | ConnectionEvent::Datagram { .. } => {
Some(evt)
}
_ => None,
})
.collect();
// We should only get one event - either RecvStreamReadable or Datagram.
assert_eq!(events.len(), 1);
events.remove(0)
}

/// Write a datagram that is big enough to fill a packet, but then see that
/// normal priority stream data is sent first.
#[test]
fn datagram_after_stream_data() {
let (mut client, mut server) = connect_datagram();

// Write a datagram first.
let dgram_sent = client.stats().frame_tx.datagram;
assert_eq!(client.send_datagram(DATA_MTU, Some(1)), Ok(()));

// Create a stream with normal priority and send some data.
let stream_id = client.stream_create(StreamType::BiDi).unwrap();
client.stream_send(stream_id, &[6; 1200]).unwrap();

assert!(
matches!(send_packet_and_get_server_event(&mut client, &mut server), ConnectionEvent::RecvStreamReadable { stream_id: s } if s == stream_id)
);
assert_eq!(client.stats().frame_tx.datagram, dgram_sent);

if let ConnectionEvent::Datagram(data) =
&send_packet_and_get_server_event(&mut client, &mut server)
{
assert_eq!(data, DATA_MTU);
} else {
panic!();
}
assert_eq!(client.stats().frame_tx.datagram, dgram_sent + 1);
}

#[test]
fn datagram_before_stream_data() {
let (mut client, mut server) = connect_datagram();

// Create a stream with low priority and send some data before datagram.
let stream_id = client.stream_create(StreamType::BiDi).unwrap();
client
.stream_priority(
stream_id,
TransmissionPriority::Low,
RetransmissionPriority::default(),
)
.unwrap();
client.stream_send(stream_id, &[6; 1200]).unwrap();

// Write a datagram.
let dgram_sent = client.stats().frame_tx.datagram;
assert_eq!(client.send_datagram(DATA_MTU, Some(1)), Ok(()));

if let ConnectionEvent::Datagram(data) =
&send_packet_and_get_server_event(&mut client, &mut server)
{
assert_eq!(data, DATA_MTU);
} else {
panic!();
}
assert_eq!(client.stats().frame_tx.datagram, dgram_sent + 1);

assert!(
matches!(send_packet_and_get_server_event(&mut client, &mut server), ConnectionEvent::RecvStreamReadable { stream_id: s } if s == stream_id)
);
assert_eq!(client.stats().frame_tx.datagram, dgram_sent + 1);
}

#[test]
fn datagram_lost() {
let (mut client, _) = connect_datagram();
Expand Down
4 changes: 1 addition & 3 deletions neqo-transport/src/tparams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ where
#[allow(unused_variables)]
mod tests {
use super::*;
use std::mem;

#[test]
fn basic_tps() {
Expand Down Expand Up @@ -937,8 +936,7 @@ mod tests {
#[test]
#[should_panic]
fn preferred_address_neither() {
#[allow(clippy::drop_copy)]
mem::drop(PreferredAddress::new(None, None));
_ = PreferredAddress::new(None, None);
}

#[test]
Expand Down

0 comments on commit 514cae0

Please sign in to comment.