Skip to content

Commit

Permalink
Merge branch 'main' into datagram_priority
Browse files Browse the repository at this point in the history
  • Loading branch information
jesup authored Jul 6, 2023
2 parents a0d1fb0 + 98bbe59 commit b6a66b4
Show file tree
Hide file tree
Showing 16 changed files with 1,092 additions and 142 deletions.
47 changes: 42 additions & 5 deletions neqo-http3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use neqo_common::{qdebug, qerror, qinfo, qtrace, qwarn, Decoder, Header, Message
use neqo_qpack::decoder::QPackDecoder;
use neqo_qpack::encoder::QPackEncoder;
use neqo_transport::{
AppError, Connection, ConnectionError, DatagramTracking, State, StreamId, StreamType,
ZeroRttState,
streams::SendOrder, AppError, Connection, ConnectionError, DatagramTracking, State, StreamId,
StreamType, ZeroRttState,
};
use std::cell::RefCell;
use std::collections::{BTreeSet, HashMap};
Expand Down Expand Up @@ -232,7 +232,7 @@ possible if there is no buffered data.
If a stream has buffered data it will be registered in the `streams_with_pending_data` queue and
actual sending will be performed in the `process_sending` function call. (This is done in this way,
i.e. data is buffered first and then sent, for 2 reasons: in this way, sending will happen in a
single function, therefore error handling and clean up is easier and the QUIIC layer may not be
single function, therefore error handling and clean up is easier and the QUIC layer may not be
able to accept all data and being able to buffer data is required in any case.)
The `send` and `send_data` functions may detect that the stream is closed and all outstanding data
Expand Down Expand Up @@ -626,7 +626,7 @@ impl Http3Connection {
}
}

/// This is called when 0RTT has been reseted to clear `send_streams`, `recv_streams` and settings.
/// This is called when 0RTT has been reset to clear `send_streams`, `recv_streams` and settings.
pub fn handle_zero_rtt_rejected(&mut self) -> Res<()> {
if self.state == Http3State::ZeroRtt {
self.state = Http3State::Initializing;
Expand Down Expand Up @@ -735,6 +735,14 @@ impl Http3Connection {
conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?;
return Ok(ReceiveOutput::NoOutput);
}
// set incoming WebTransport streams to be fair (share bandwidth)
conn.stream_fairness(stream_id, true).ok();
qinfo!(
[self],
"A new WebTransport stream {} for session {}.",
stream_id,
session_id
);
}
NewStreamType::Unknown => {
conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?;
Expand Down Expand Up @@ -920,7 +928,7 @@ impl Http3Connection {
);

// Call immediately send so that at least headers get sent. This will make Firefox faster, since
// it can send request body immediatly in most cases and does not need to do a complete process loop.
// it can send request body immediately in most cases and does not need to do a complete process loop.
self.send_streams
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
Expand Down Expand Up @@ -995,6 +1003,32 @@ impl Http3Connection {
Ok(())
}

/// Set the stream `SendOrder`.
/// # Errors
/// Returns `InvalidStreamId` if the stream id doesn't exist
pub fn stream_set_sendorder(
conn: &mut Connection,
stream_id: StreamId,
sendorder: Option<SendOrder>,
) -> Res<()> {
conn.stream_sendorder(stream_id, sendorder)
.map_err(|_| Error::InvalidStreamId)
}

/// Set the stream Fairness. Fair streams will share bandwidth with other
/// streams of the same sendOrder group (or the unordered group). Unfair streams
/// will give bandwidth preferentially to the lowest streamId with data to send.
/// # Errors
/// Returns `InvalidStreamId` if the stream id doesn't exist
pub fn stream_set_fairness(
conn: &mut Connection,
stream_id: StreamId,
fairness: bool,
) -> Res<()> {
conn.stream_fairness(stream_id, fairness)
.map_err(|_| Error::InvalidStreamId)
}

pub fn cancel_fetch(
&mut self,
stream_id: StreamId,
Expand Down Expand Up @@ -1238,6 +1272,9 @@ impl Http3Connection {
let stream_id = conn
.stream_create(stream_type)
.map_err(|e| Error::map_stream_create_errors(&e))?;
// Set outgoing WebTransport streams to be fair (share bandwidth)
// This really can't fail, panics if it does
conn.stream_fairness(stream_id, true).unwrap();

self.webtransport_create_stream_internal(
wt,
Expand Down
39 changes: 36 additions & 3 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use neqo_common::{
use neqo_crypto::{agent::CertificateInfo, AuthenticationStatus, ResumptionToken, SecretAgentInfo};
use neqo_qpack::Stats as QpackStats;
use neqo_transport::{
send_stream::SendStreamStats, AppError, Connection, ConnectionEvent, ConnectionId,
ConnectionIdGenerator, DatagramTracking, Output, Stats as TransportStats, StreamId, StreamType,
Version, ZeroRttState,
streams::SendOrder, AppError, Connection, ConnectionEvent, ConnectionId, ConnectionIdGenerator,
DatagramTracking, Output, RecvStreamStats, SendStreamStats, Stats as TransportStats, StreamId,
StreamType, Version, ZeroRttState,
};
use std::{
cell::RefCell,
Expand Down Expand Up @@ -755,6 +755,28 @@ impl Http3Client {
- u64::try_from(Encoder::varint_len(session_id.as_u64())).unwrap())
}

/// Sets the `SendOrder` for a given stream
/// # Errors
/// It may return `InvalidStreamId` if a stream does not exist anymore.
/// # Panics
/// This cannot panic.
pub fn webtransport_set_sendorder(
&mut self,
stream_id: StreamId,
sendorder: SendOrder,
) -> Res<()> {
Http3Connection::stream_set_sendorder(&mut self.conn, stream_id, Some(sendorder))
}

/// Sets the `Fairness` for a given stream
/// # Errors
/// It may return `InvalidStreamId` if a stream does not exist anymore.
/// # Panics
/// This cannot panic.
pub fn webtransport_set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> {
Http3Connection::stream_set_fairness(&mut self.conn, stream_id, fairness)
}

/// Returns the current `SendStreamStats` of a `WebTransportSendStream`.
/// # Errors
/// `InvalidStreamId` if the stream does not exist.
Expand All @@ -766,6 +788,17 @@ impl Http3Client {
.stats(&mut self.conn)
}

/// Returns the current `RecvStreamStats` of a `WebTransportRecvStream`.
/// # Errors
/// `InvalidStreamId` if the stream does not exist.
pub fn webtransport_recv_stream_stats(&mut self, stream_id: StreamId) -> Res<RecvStreamStats> {
self.base_handler
.recv_streams
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
.stats(&mut self.conn)
}

/// This function combines `process_input` and `process_output` function.
pub fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
qtrace!([self], "Process.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use neqo_common::event::Provider;
use crate::{
features::extended_connect::SessionCloseReason, Error, Header, Http3Client, Http3ClientEvent,
Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, Http3State,
SendStreamStats, WebTransportEvent, WebTransportRequest, WebTransportServerEvent,
WebTransportSessionAcceptAction,
RecvStreamStats, SendStreamStats, WebTransportEvent, WebTransportRequest,
WebTransportServerEvent, WebTransportSessionAcceptAction,
};
use neqo_crypto::AuthenticationStatus;
use neqo_transport::{ConnectionParameters, StreamId, StreamType};
Expand Down Expand Up @@ -315,6 +315,10 @@ impl WtTest {
self.client.webtransport_send_stream_stats(wt_stream_id)
}

fn recv_stream_stats(&mut self, wt_stream_id: StreamId) -> Result<RecvStreamStats, Error> {
self.client.webtransport_recv_stream_stats(wt_stream_id)
}

fn receive_data_client(
&mut self,
expected_stream_id: StreamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,28 @@ fn wt_client_stream_uni() {
let mut wt = WtTest::new();
let wt_session = wt.create_wt_session();
let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
let stats = wt.send_stream_stats(wt_stream).unwrap();
assert_eq!(stats.bytes_written(), 0);
assert_eq!(stats.bytes_sent(), 0);
assert_eq!(stats.bytes_acked(), 0);
let send_stats = wt.send_stream_stats(wt_stream).unwrap();
assert_eq!(send_stats.bytes_written(), 0);
assert_eq!(send_stats.bytes_sent(), 0);
assert_eq!(send_stats.bytes_acked(), 0);

wt.send_data_client(wt_stream, BUF_CLIENT);
wt.receive_data_server(wt_stream, true, BUF_CLIENT, false);
let stats = wt.send_stream_stats(wt_stream).unwrap();
assert_eq!(stats.bytes_written(), BUF_CLIENT.len() as u64);
assert_eq!(stats.bytes_sent(), BUF_CLIENT.len() as u64);
assert_eq!(stats.bytes_acked(), BUF_CLIENT.len() as u64);
let send_stats = wt.send_stream_stats(wt_stream).unwrap();
assert_eq!(send_stats.bytes_written(), BUF_CLIENT.len() as u64);
assert_eq!(send_stats.bytes_sent(), BUF_CLIENT.len() as u64);
assert_eq!(send_stats.bytes_acked(), BUF_CLIENT.len() as u64);

// Send data again to test if the stats has the expected values.
wt.send_data_client(wt_stream, BUF_CLIENT);
wt.receive_data_server(wt_stream, false, BUF_CLIENT, false);
let stats = wt.send_stream_stats(wt_stream).unwrap();
assert_eq!(stats.bytes_written(), (BUF_CLIENT.len() * 2) as u64);
assert_eq!(stats.bytes_sent(), (BUF_CLIENT.len() * 2) as u64);
assert_eq!(stats.bytes_acked(), (BUF_CLIENT.len() * 2) as u64);
let send_stats = wt.send_stream_stats(wt_stream).unwrap();
assert_eq!(send_stats.bytes_written(), (BUF_CLIENT.len() * 2) as u64);
assert_eq!(send_stats.bytes_sent(), (BUF_CLIENT.len() * 2) as u64);
assert_eq!(send_stats.bytes_acked(), (BUF_CLIENT.len() * 2) as u64);

let recv_stats = wt.recv_stream_stats(wt_stream);
assert_eq!(recv_stats.unwrap_err(), Error::InvalidStreamId);
}

#[test]
Expand All @@ -49,10 +52,14 @@ fn wt_client_stream_bidi() {
let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false);
wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
wt.receive_data_client(wt_client_stream, false, BUF_SERVER, false);
let stats = wt.send_stream_stats(wt_client_stream).unwrap();
assert_eq!(stats.bytes_written(), BUF_CLIENT.len() as u64);
assert_eq!(stats.bytes_sent(), BUF_CLIENT.len() as u64);
assert_eq!(stats.bytes_acked(), BUF_CLIENT.len() as u64);
let send_stats = wt.send_stream_stats(wt_client_stream).unwrap();
assert_eq!(send_stats.bytes_written(), BUF_CLIENT.len() as u64);
assert_eq!(send_stats.bytes_sent(), BUF_CLIENT.len() as u64);
assert_eq!(send_stats.bytes_acked(), BUF_CLIENT.len() as u64);

let recv_stats = wt.recv_stream_stats(wt_client_stream).unwrap();
assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64);
assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64);
}

#[test]
Expand All @@ -64,8 +71,12 @@ fn wt_server_stream_uni() {
let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
let stats = wt.send_stream_stats(wt_server_stream.stream_id());
assert_eq!(stats.unwrap_err(), Error::InvalidStreamId);
let send_stats = wt.send_stream_stats(wt_server_stream.stream_id());
assert_eq!(send_stats.unwrap_err(), Error::InvalidStreamId);

let recv_stats = wt.recv_stream_stats(wt_server_stream.stream_id()).unwrap();
assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64);
assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64);
}

#[test]
Expand All @@ -84,6 +95,10 @@ fn wt_server_stream_bidi() {
assert_eq!(stats.bytes_written(), BUF_CLIENT.len() as u64);
assert_eq!(stats.bytes_sent(), BUF_CLIENT.len() as u64);
assert_eq!(stats.bytes_acked(), BUF_CLIENT.len() as u64);

let recv_stats = wt.recv_stream_stats(wt_server_stream.stream_id()).unwrap();
assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64);
assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64);
}

#[test]
Expand Down
12 changes: 11 additions & 1 deletion neqo-http3/src/features/extended_connect/webtransport_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
};
use neqo_common::{qtrace, Encoder, Header, MessageType, Role};
use neqo_qpack::{QPackDecoder, QPackEncoder};
use neqo_transport::{Connection, DatagramTracking, StreamId};
use neqo_transport::{streams::SendOrder, Connection, DatagramTracking, StreamId};
use std::any::Any;
use std::cell::RefCell;
use std::collections::BTreeSet;
Expand Down Expand Up @@ -486,6 +486,16 @@ impl SendStream for Rc<RefCell<WebTransportSession>> {
self.borrow_mut().has_data_to_send()
}

fn set_sendorder(&mut self, _conn: &mut Connection, _sendorder: Option<SendOrder>) -> Res<()> {
// Not relevant on session
Ok(())
}

fn set_fairness(&mut self, _conn: &mut Connection, _fairness: bool) -> Res<()> {
// Not relevant on session
Ok(())
}

fn stream_writable(&self) {}

fn done(&self) -> bool {
Expand Down
47 changes: 45 additions & 2 deletions neqo-http3/src/features/extended_connect/webtransport_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
SendStream, SendStreamEvents, Stream,
};
use neqo_common::Encoder;
use neqo_transport::{send_stream::SendStreamStats, Connection, StreamId};
use neqo_transport::{Connection, RecvStreamStats, SendStreamStats, StreamId};
use std::cell::RefCell;
use std::rc::Rc;

Expand Down Expand Up @@ -75,6 +75,35 @@ impl RecvStream for WebTransportRecvStream {
}
Ok((amount, fin))
}

fn stats(&mut self, conn: &mut Connection) -> Res<RecvStreamStats> {
const TYPE_LEN_UNI: usize = Encoder::varint_len(WEBTRANSPORT_UNI_STREAM);
const TYPE_LEN_BIDI: usize = Encoder::varint_len(WEBTRANSPORT_STREAM);

let stream_header_size = if self.stream_id.is_server_initiated() {
let id_len = if self.stream_id.is_uni() {
TYPE_LEN_UNI
} else {
TYPE_LEN_BIDI
};
(id_len + Encoder::varint_len(self.session_id.as_u64())) as u64
} else {
0
};

let stats = conn.recv_stream_stats(self.stream_id)?;
if stream_header_size == 0 {
return Ok(stats);
}

let subtract_non_app_bytes =
|count: u64| -> u64 { count.saturating_sub(stream_header_size) };

let bytes_received = subtract_non_app_bytes(stats.bytes_received());
let bytes_read = subtract_non_app_bytes(stats.bytes_read());

Ok(RecvStreamStats::new(bytes_received, bytes_read))
}
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -185,6 +214,16 @@ impl SendStream for WebTransportSendStream {
}
}

fn set_sendorder(&mut self, conn: &mut Connection, sendorder: Option<i64>) -> Res<()> {
conn.stream_sendorder(self.stream_id, sendorder)
.map_err(|_| crate::Error::InvalidStreamId)
}

fn set_fairness(&mut self, conn: &mut Connection, fairness: bool) -> Res<()> {
conn.stream_fairness(self.stream_id, fairness)
.map_err(|_| crate::Error::InvalidStreamId)
}

fn handle_stop_sending(&mut self, close_type: CloseType) {
self.set_done(close_type);
}
Expand Down Expand Up @@ -215,10 +254,14 @@ impl SendStream for WebTransportSendStream {
0
};

let stats = conn.send_stream_stats(self.stream_id)?;
if stream_header_size == 0 {
return Ok(stats);
}

let subtract_non_app_bytes =
|count: u64| -> u64 { count.saturating_sub(stream_header_size) };

let stats = conn.stream_stats(self.stream_id)?;
let bytes_written = subtract_non_app_bytes(stats.bytes_written());
let bytes_sent = subtract_non_app_bytes(stats.bytes_sent());
let bytes_acked = subtract_non_app_bytes(stats.bytes_acked());
Expand Down
Loading

0 comments on commit b6a66b4

Please sign in to comment.