From 47588ac0dff57344943f6fc2d9c4bf7f3586a01d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 6 May 2024 07:24:02 +0200 Subject: [PATCH 01/14] refactor: remove unused test-fixture/src/sim/net.rs (#1873) The `net.rs` file is not declared as a module in `src/sim/mod.rs` and thus unused. --- test-fixture/src/sim/net.rs | 111 ------------------------------------ 1 file changed, 111 deletions(-) delete mode 100644 test-fixture/src/sim/net.rs diff --git a/test-fixture/src/sim/net.rs b/test-fixture/src/sim/net.rs deleted file mode 100644 index 754426f895..0000000000 --- a/test-fixture/src/sim/net.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use super::rng::RandomDuration; -use super::{Node, Rng}; -use neqo_common::Datagram; -use neqo_transport::Output; -use std::collections::BTreeMap; -use std::fmt::{self, Debug}; -use std::iter; -use std::ops::Range; -use std::time::{Duration, Instant}; - -/// -pub struct RandomDrop { - threshold: u64, - max: u64, - rng: Rng, -} - -impl RandomDuration { - /// Make a new random `Duration` generator. This asserts if the range provided - /// is inverted (i.e., `bounds.start > bounds.end`), or spans 2^64 - /// or more nanoseconds. - /// A zero-length range means that random values won't be taken from the Rng - pub fn new(bounds: Range, rng: Rng) -> Self { - let max = u64::try_from((bounds.end - bounds.start).as_nanos()).unwrap(); - Self { - start: bounds.start, - max, - rng, - } - } - - fn next(&mut self) -> Duration { - let r = if self.max == 0 { - Duration::new(0, 0) - } else { - self.rng.borrow_mut().random_from(0..self.max) - } - self.start + Duration::from_nanos(r) - } -} - -enum DelayState { - New(Range), - Ready(RandomDuration), -} - -pub struct Delay { - state: DelayState, - queue: BTreeMap, -} - -impl Delay -{ - pub fn new(bounds: Range) -> Self - { - Self { - State: DelayState::New(bounds), - queue: BTreeMap::default(), - } - } - - fn insert(&mut self, d: Datagram, now: Instant) { - let mut t = if let State::Ready(r) = self.state { - now + self.source.next() - } else { - unreachable!(); - } - while self.queue.contains_key(&t) { - // This is a little inefficient, but it avoids drops on collisions, - // which are super-common for a fixed delay. - t += Duration::from_nanos(1); - } - self.queue.insert(t, d); - } -} - -impl Node for Delay -{ - fn init(&mut self, rng: Rng, now: Instant) { - if let DelayState::New(bounds) = self.state { - self.state = RandomDuration::new(bounds); - } else { - unreachable!(); - } - } - - fn process(&mut self, d: Option, now: Instant) -> Output { - if let Some(dgram) = d { - self.insert(dgram, now); - } - if let Some((&k, _)) = self.queue.range(..now).nth(0) { - Output::Datagram(self.queue.remove(&k).unwrap()) - } else if let Some(&t) = self.queue.keys().nth(0) { - Output::Callback(t - now) - } else { - Output::None - } - } -} - -impl Debug for Delay { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("delay") - } -} From e467273e1a08a355c3681a7470a7233853e04cac Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 6 May 2024 07:25:53 +0200 Subject: [PATCH 02/14] fix(http3): always qlog on send_buffer() (#1876) * fix(http3): always qlog on send_buffer() `neqo_http3::SendMessage` calls `qlog::h3_data_moved_down()` whenever it moves data down to the QUIC layer. `SendMessage` moves data down to the QUIC layer either directly via `self.stream.send_atomic` or indirectly buffered through `self.stream.send_buffer`. Previously only one of the 3 calls to `self.stream.send_buffer` would thereafter call `qlog::h3_data_moved_down()`. This commit moves the `h3_data_moved_down` call into `self.stream.send_buffer`, thus ensuring the function is always called when data is moved. In addition, `self.stream.send_atomic` now as well does the qlog call, thus containing all qlog logic in `buffered_send_stream.rs` instead of `send_message.rs`. * Trigger benchmark run * Don't qlog if buffer is empty * Fix typo * Use early return to keep indentation short * Don't qlog if sent is 0 --- neqo-http3/src/buffered_send_stream.rs | 51 ++++++++++++++------------ neqo-http3/src/send_message.rs | 4 +- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/neqo-http3/src/buffered_send_stream.rs b/neqo-http3/src/buffered_send_stream.rs index 4f6761fa80..60da0512b5 100644 --- a/neqo-http3/src/buffered_send_stream.rs +++ b/neqo-http3/src/buffered_send_stream.rs @@ -7,7 +7,7 @@ use neqo_common::qtrace; use neqo_transport::{Connection, StreamId}; -use crate::Res; +use crate::{qlog, Res}; #[derive(Debug, PartialEq, Eq)] pub enum BufferedStream { @@ -38,7 +38,7 @@ impl BufferedStream { /// # Panics /// - /// If the `BufferedStream` is initialized more than one it will panic. + /// If the `BufferedStream` is initialized more than once, it will panic. pub fn init(&mut self, stream_id: StreamId) { debug_assert!(&Self::Uninitialized == self); *self = Self::Initialized { @@ -63,19 +63,23 @@ impl BufferedStream { /// Returns `neqo_transport` errors. pub fn send_buffer(&mut self, conn: &mut Connection) -> Res { let label = ::neqo_common::log_subject!(::log::Level::Debug, self); - let mut sent = 0; - if let Self::Initialized { stream_id, buf } = self { - if !buf.is_empty() { - qtrace!([label], "sending data."); - sent = conn.stream_send(*stream_id, &buf[..])?; - if sent == buf.len() { - buf.clear(); - } else { - let b = buf.split_off(sent); - *buf = b; - } - } + let Self::Initialized { stream_id, buf } = self else { + return Ok(0); + }; + if buf.is_empty() { + return Ok(0); + } + qtrace!([label], "sending data."); + let sent = conn.stream_send(*stream_id, &buf[..])?; + if sent == 0 { + return Ok(0); + } else if sent == buf.len() { + buf.clear(); + } else { + let b = buf.split_off(sent); + *buf = b; } + qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, sent); Ok(sent) } @@ -85,16 +89,17 @@ impl BufferedStream { pub fn send_atomic(&mut self, conn: &mut Connection, to_send: &[u8]) -> Res { // First try to send anything that is in the buffer. self.send_buffer(conn)?; - if let Self::Initialized { stream_id, buf } = self { - if buf.is_empty() { - let res = conn.stream_send_atomic(*stream_id, to_send)?; - Ok(res) - } else { - Ok(false) - } - } else { - Ok(false) + let Self::Initialized { stream_id, buf } = self else { + return Ok(false); + }; + if !buf.is_empty() { + return Ok(false); + } + let res = conn.stream_send_atomic(*stream_id, to_send)?; + if res { + qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, to_send.len()); } + Ok(res) } #[must_use] diff --git a/neqo-http3/src/send_message.rs b/neqo-http3/src/send_message.rs index 15965c44f6..7fb37beb70 100644 --- a/neqo-http3/src/send_message.rs +++ b/neqo-http3/src/send_message.rs @@ -13,7 +13,7 @@ use neqo_transport::{Connection, StreamId}; use crate::{ frames::HFrame, headers_checks::{headers_valid, is_interim, trailers_valid}, - qlog, BufferedStream, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpSendStream, Res, + BufferedStream, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpSendStream, Res, SendStream, SendStreamEvents, Stream, }; @@ -216,7 +216,6 @@ impl SendStream for SendMessage { .send_atomic(conn, &buf[..to_send]) .map_err(|e| Error::map_stream_send_errors(&e))?; debug_assert!(sent); - qlog::h3_data_moved_down(conn.qlog_mut(), self.stream_id(), to_send); Ok(to_send) } @@ -243,7 +242,6 @@ impl SendStream for SendMessage { /// info that the stream has been closed.) fn send(&mut self, conn: &mut Connection) -> Res<()> { let sent = Error::map_error(self.stream.send_buffer(conn), Error::HttpInternal(5))?; - qlog::h3_data_moved_down(conn.qlog_mut(), self.stream_id(), sent); qtrace!([self], "{} bytes sent", sent); if !self.stream.has_buffered_data() { From c42597d07cae31d5b5d5680fb04cd1dfdc1a78da Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 6 May 2024 08:44:19 +0000 Subject: [PATCH 03/14] build(deps): bump codecov/codecov-action from 4.3.0 to 4.3.1 (#1881) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4.3.0 to 4.3.1. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/84508663e988701840491b86de86b666e8a86bed...5ecb98a3c6b747ed38dc09f787459979aebb39be) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 99ba122d2d..4da09d9f4f 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -152,7 +152,7 @@ jobs: if: success() || failure() - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@84508663e988701840491b86de86b666e8a86bed # v4.3.0 + uses: codecov/codecov-action@5ecb98a3c6b747ed38dc09f787459979aebb39be # v4.3.1 with: file: lcov.info fail_ci_if_error: false From adb650785b4693275303be5ecd53a344b768fdeb Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 6 May 2024 08:54:38 +0000 Subject: [PATCH 04/14] build(deps): bump actions/download-artifact from 4.1.6 to 4.1.7 (#1879) * build(deps): bump actions/download-artifact from 4.1.6 to 4.1.7 Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4.1.6 to 4.1.7. - [Release notes](https://github.com/actions/download-artifact/releases) - [Commits](https://github.com/actions/download-artifact/compare/9c19ed7fe5d278cd354c7dfd5d3b88589c7e2395...65a9edc5881444af0b9093a5e628f2fe47ea3b2e) --- updated-dependencies: - dependency-name: actions/download-artifact dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Addition --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Lars Eggert --- .github/actions/pr-comment/action.yml | 2 +- .github/workflows/qns.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/actions/pr-comment/action.yml b/.github/actions/pr-comment/action.yml index 6ee1e7d813..fce0bd3dd7 100644 --- a/.github/actions/pr-comment/action.yml +++ b/.github/actions/pr-comment/action.yml @@ -15,7 +15,7 @@ inputs: runs: using: composite steps: - - uses: actions/download-artifact@9c19ed7fe5d278cd354c7dfd5d3b88589c7e2395 # v4.1.6 + - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: run-id: ${{ github.event.workflow_run.id }} name: ${{ inputs.name }} diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index 904eab03c3..275f53bb17 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -129,7 +129,7 @@ jobs: pair: ${{ fromJson(needs.implementations.outputs.pairs) }} runs-on: ubuntu-latest steps: - - uses: actions/download-artifact@9c19ed7fe5d278cd354c7dfd5d3b88589c7e2395 # v4.1.6 + - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: name: '${{ env.LATEST }} Docker image' path: /tmp @@ -164,7 +164,7 @@ jobs: restore-keys: qns-${{ runner.os }}- - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 - - uses: actions/download-artifact@9c19ed7fe5d278cd354c7dfd5d3b88589c7e2395 # v4.1.6 + - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: pattern: '*results' path: results From 9d17e2ad33afb2ef2f8ab92cda3d39664f53c3e9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 6 May 2024 09:46:46 +0000 Subject: [PATCH 05/14] build(deps): bump actions/checkout from 4.1.3 to 4.1.4 (#1880) * build(deps): bump actions/checkout from 4.1.3 to 4.1.4 Bumps [actions/checkout](https://github.com/actions/checkout) from 4.1.3 to 4.1.4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/1d96c772d19495a3b5c517cd2bc0cb401ea0529f...0ad4b8fadaa221de15dcec353f45205ec38ea70b) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Additions --------- Signed-off-by: dependabot[bot] Signed-off-by: Lars Eggert Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Lars Eggert --- .github/actions/nss/action.yml | 4 ++-- .github/actions/quic-interop-runner/action.yml | 2 +- .github/workflows/actionlint.yml | 2 +- .github/workflows/bench-comment.yml | 2 +- .github/workflows/bench.yml | 4 ++-- .github/workflows/check.yml | 2 +- .github/workflows/mutants.yml | 2 +- .github/workflows/qns-comment.yml | 2 +- .github/workflows/qns.yml | 4 ++-- 9 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.github/actions/nss/action.yml b/.github/actions/nss/action.yml index 2fa61528a7..34ccae1d7d 100644 --- a/.github/actions/nss/action.yml +++ b/.github/actions/nss/action.yml @@ -49,14 +49,14 @@ runs: # # - name: Checkout NSPR # if: env.BUILD_NSS == '1' - # uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + # uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 # with: # repository: "nss-dev/nspr" # path: ${{ github.workspace }}/nspr # - name: Checkout NSS # if: env.BUILD_NSS == '1' - # uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + # uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 # with: # repository: "nss-dev/nss" # path: ${{ github.workspace }}/nss diff --git a/.github/actions/quic-interop-runner/action.yml b/.github/actions/quic-interop-runner/action.yml index 3f5547d3c6..d0874ef924 100644 --- a/.github/actions/quic-interop-runner/action.yml +++ b/.github/actions/quic-interop-runner/action.yml @@ -24,7 +24,7 @@ runs: using: "composite" steps: - name: Checkout quic-interop/quic-interop-runner repository - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: repository: 'quic-interop/quic-interop-runner' path: 'quic-interop-runner' diff --git a/.github/workflows/actionlint.yml b/.github/workflows/actionlint.yml index b258454518..107e5f0726 100644 --- a/.github/workflows/actionlint.yml +++ b/.github/workflows/actionlint.yml @@ -21,7 +21,7 @@ jobs: run: shell: bash steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - name: Download actionlint id: get_actionlint run: bash <(curl https://raw.githubusercontent.com/rhysd/actionlint/main/scripts/download-actionlint.bash) diff --git a/.github/workflows/bench-comment.yml b/.github/workflows/bench-comment.yml index dce7e25f5f..508a5fe5dd 100644 --- a/.github/workflows/bench-comment.yml +++ b/.github/workflows/bench-comment.yml @@ -23,7 +23,7 @@ jobs: github.event.workflow_run.event == 'pull_request' && github.event.workflow_run.conclusion == 'success' steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - uses: ./.github/actions/pr-comment with: name: bench diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 232ad4c6f1..092985ed89 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -26,10 +26,10 @@ jobs: steps: - name: Checkout neqo - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - name: Checkout msquic - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: repository: microsoft/msquic ref: main diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 4da09d9f4f..4d4b5898c2 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -42,7 +42,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - name: Install dependencies (Linux) if: runner.os == 'Linux' diff --git a/.github/workflows/mutants.yml b/.github/workflows/mutants.yml index 1837749da8..e7ca9f55f0 100644 --- a/.github/workflows/mutants.yml +++ b/.github/workflows/mutants.yml @@ -17,7 +17,7 @@ jobs: mutants: runs-on: ubuntu-latest steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: fetch-depth: 0 diff --git a/.github/workflows/qns-comment.yml b/.github/workflows/qns-comment.yml index f27d4904a1..37a2a38a45 100644 --- a/.github/workflows/qns-comment.yml +++ b/.github/workflows/qns-comment.yml @@ -22,7 +22,7 @@ jobs: if: | github.event.workflow_run.event == 'pull_request' steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - uses: ./.github/actions/pr-comment with: name: qns diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index 275f53bb17..27ac3200bd 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -142,7 +142,7 @@ jobs: echo "client=$(echo "$PAIR" | cut -d% -f1)" >> "$GITHUB_OUTPUT" echo "server=$(echo "$PAIR" | cut -d% -f2)" >> "$GITHUB_OUTPUT" - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 # TODO: Replace once https://github.com/quic-interop/quic-interop-runner/pull/356 is merged. - uses: ./.github/actions/quic-interop-runner @@ -163,7 +163,7 @@ jobs: key: qns-${{ runner.os }}-${{ github.sha }} restore-keys: qns-${{ runner.os }}- - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: pattern: '*results' From d589ea0d57979adf769ac079df3c66d79563e90f Mon Sep 17 00:00:00 2001 From: Kershaw Date: Mon, 6 May 2024 12:26:32 +0200 Subject: [PATCH 06/14] remove the assertion that can be triggered when a HANDSHAKE_DONE frame is lost (#1882) --- neqo-transport/src/connection/state.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/neqo-transport/src/connection/state.rs b/neqo-transport/src/connection/state.rs index 9f8f2d4f5c..e76f937938 100644 --- a/neqo-transport/src/connection/state.rs +++ b/neqo-transport/src/connection/state.rs @@ -212,10 +212,6 @@ pub enum StateSignaling { impl StateSignaling { pub fn handshake_done(&mut self) { if !matches!(self, Self::Idle) { - debug_assert!( - false, - "StateSignaling must be in Idle state but is in {self:?} state.", - ); return; } *self = Self::HandshakeDone; From 343df5cc0d02e0b0953de4a0a390ae8980d89081 Mon Sep 17 00:00:00 2001 From: Kershaw Date: Mon, 6 May 2024 14:02:45 +0200 Subject: [PATCH 07/14] neqo v0.7.7 (#1885) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9eb8a19244..9f9fd0d98b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ resolver = "2" homepage = "https://github.com/mozilla/neqo/" repository = "https://github.com/mozilla/neqo/" authors = ["The Neqo Authors "] -version = "0.7.6" +version = "0.7.7" # Keep in sync with `.rustfmt.toml` `edition`. edition = "2021" license = "MIT OR Apache-2.0" From c8540004cbce006c97527ac64d3af389de531753 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 6 May 2024 16:15:32 +0300 Subject: [PATCH 08/14] ci: Remove CI code to show differential QNS results (#1884) It's too complex, and we should instead just fix the interop issues. --- .github/workflows/qns.yml | 39 +-------------------------------------- 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index 27ac3200bd..a29f6bbd5e 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -8,7 +8,7 @@ on: paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] merge_group: schedule: - # Run at 1 AM each day, so there is a `main`-branch baseline in the cache. + # Run at 1 AM each day - cron: '0 1 * * *' workflow_dispatch: @@ -156,13 +156,6 @@ jobs: needs: run-qns runs-on: ubuntu-latest steps: - - name: Download cached main-branch results - uses: actions/cache/restore@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 - with: - path: results-main - key: qns-${{ runner.os }}-${{ github.sha }} - restore-keys: qns-${{ runner.os }}- - - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: @@ -170,9 +163,6 @@ jobs: path: results - run: | - ls -l results-main || true - cat results-main/result.json || true - echo "[]" > result.json for RUN in results/*; do [ -e "$RUN/result.json" ] || continue CLIENT=$(jq -r < "$RUN/result.json" '.clients[0]') @@ -202,23 +192,9 @@ jobs: echo "**$RESULT**" } >> "$GROUP.md" done - jq < "$RUN/grouped.json" ". += { client: \"$CLIENT\", server: \"$SERVER\" }" > new.json && \ - jq < result.json --argjson new "$(cat new.json)" '. += [$new]' > result.json.tmp && \ - rm new.json && \ - mv result.json.tmp result.json done - DIFFER='def post_recurse(f): def r: (f | select(. != null) | r), .; r; def post_recurse: post_recurse(.[]?); (. | (post_recurse | arrays) |= sort)' - diff <(jq -S "$DIFFER" results-main/result.json) <(jq -S "$DIFFER" result.json) || true - diff -Baur results-main/result.json result.json || true { echo "### Failed Interop Tests" - SHA=$(cat results-main/baseline-sha.txt || true) - if [ -n "$SHA" ]; then - { - echo "Interop failures relative to $SHA." - echo - } >> results.md - fi if [ -e failed.md ]; then echo "[QUIC Interop Runner](https://github.com/quic-interop/quic-interop-runner), *client* vs. *server*" cat failed.md @@ -240,19 +216,6 @@ jobs: echo "" } >> comment.md - - name: Remember main-branch push URL - if: github.ref == 'refs/heads/main' - run: echo "${{ github.sha }}" > baseline-sha.txt - - - name: Cache main-branch results - if: github.ref == 'refs/heads/main' - uses: actions/cache/save@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 - with: - path: | - result.json - baseline-sha.txt - key: qns-${{ runner.os }}-${{ github.sha }} - - uses: ./.github/actions/pr-comment-data-export with: name: qns From 20a4058f24f68cd47064743f5562824118a6acbc Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 6 May 2024 16:46:46 +0300 Subject: [PATCH 09/14] ci: Build Firefox with current neqo (#1834) * ci: Build Firefox with current neqo * --application-choice browser * Allow qlog dupes * Typo * Get Firefox source from GitHub * Again * Again * Again * Again * Again * Debug * Again * Again * Again * Again * Again * Again * qlog * Fix * Again * Again * Again * Fingers crossed... * Try and prevent running out of disk space * Again * Again * Set CARGO_HOME and use sccache * actionlint * Use matrix * Again * Again * Upload binaries * Identify what to upload * Push * Naming * Again * Again * Again * Again * Again * Don't disable tests for debug builds * Work around https://bugzilla.mozilla.org/show_bug.cgi?id=1894031 * Prepare to comment on PR * Again * Again * Undo * Again * Again * Try and comment * cargo update * Again * Again * Again * Again * Again * Consolidate PR reporting * Fix actionlint --------- Signed-off-by: Lars Eggert --- .../actions/maximize-build-space/action.yml | 206 ++++++++++++++++++ .github/workflows/bench.yml | 2 +- .github/workflows/firefox.yml | 169 ++++++++++++++ .../{bench-comment.yml => pr-comment.yml} | 8 +- .github/workflows/qns-comment.yml | 29 --- .github/workflows/qns.yml | 5 +- 6 files changed, 383 insertions(+), 36 deletions(-) create mode 100644 .github/actions/maximize-build-space/action.yml create mode 100644 .github/workflows/firefox.yml rename .github/workflows/{bench-comment.yml => pr-comment.yml} (76%) delete mode 100644 .github/workflows/qns-comment.yml diff --git a/.github/actions/maximize-build-space/action.yml b/.github/actions/maximize-build-space/action.yml new file mode 100644 index 0000000000..c9ada92815 --- /dev/null +++ b/.github/actions/maximize-build-space/action.yml @@ -0,0 +1,206 @@ +# https://github.com/easimon/maximize-build-space/blob/fadc013e293a3453768b4ddb9db8c85104752807/action.yml + +name: 'Maximize build disk space' +description: 'Maximize the available disk space for your build job' +branding: + icon: 'crop' + color: 'orange' +inputs: + root-reserve-mb: + description: 'Space to be left free on the root filesystem, in Megabytes.' + required: false + default: '1024' + temp-reserve-mb: + description: 'Space to be left free on the temp filesystem (/mnt), in Megabytes.' + required: false + default: '100' + swap-size-mb: + description: 'Swap space to create, in Megabytes.' + required: false + default: '4096' + overprovision-lvm: + description: | + Create the LVM disk images as sparse files, making the space required for the LVM image files *appear* unused on the + hosting volumes until actually allocated. Use with care, this can lead to surprising out-of-disk-space situations. + You should prefer adjusting root-reserve-mb/temp-reserve-mb over using this option. + required: false + default: 'false' + build-mount-path: + description: 'Absolute path to the mount point where the build space will be available, defaults to $GITHUB_WORKSPACE if unset.' + required: false + build-mount-path-ownership: + description: 'Ownership of the mount point path, defaults to standard "runner" user and group.' + required: false + default: 'runner:runner' + pv-loop-path: + description: 'Absolute file path for the LVM image created on the root filesystem, the default is usually fine.' + required: false + default: '/pv.img' + tmp-pv-loop-path: + description: 'Absolute file path for the LVM image created on the temp filesystem, the default is usually fine. Must reside on /mnt' + required: false + default: '/mnt/tmp-pv.img' + remove-dotnet: + description: 'Removes .NET runtime and libraries. (frees ~17 GB)' + required: false + default: 'false' + remove-android: + description: 'Removes Android SDKs and Tools. (frees ~11 GB)' + required: false + default: 'false' + remove-haskell: + description: 'Removes GHC (Haskell) artifacts. (frees ~2.7 GB)' + required: false + default: 'false' + remove-codeql: + description: 'Removes CodeQL Action Bundles. (frees ~5.4 GB)' + required: false + default: 'false' + remove-docker-images: + description: 'Removes cached Docker images. (frees ~3 GB)' + required: false + default: 'false' +runs: + using: "composite" + steps: + - name: Disk space report before modification + shell: bash + run: | + echo "Memory and swap:" + sudo free + echo + sudo swapon --show + echo + + echo "Available storage:" + sudo df -h + echo + + - name: Maximize build disk space + shell: bash + run: | + set -euo pipefail + + BUILD_MOUNT_PATH="${{ inputs.build-mount-path }}" + if [[ -z "${BUILD_MOUNT_PATH}" ]]; then + BUILD_MOUNT_PATH="${GITHUB_WORKSPACE}" + fi + + echo "Arguments:" + echo + echo " Root reserve: ${{ inputs.root-reserve-mb }} MiB" + echo " Temp reserve: ${{ inputs.temp-reserve-mb }} MiB" + echo " Swap space: ${{ inputs.swap-size-mb }} MiB" + echo " Overprovision LVM: ${{ inputs.overprovision-lvm }}" + echo " Mount path: ${BUILD_MOUNT_PATH}" + echo " Root PV loop path: ${{ inputs.pv-loop-path }}" + echo " Temp PV loop path: ${{ inputs.tmp-pv-loop-path }}" + echo -n " Removing: " + if [[ ${{ inputs.remove-dotnet }} == 'true' ]]; then + echo -n "dotnet " + fi + if [[ ${{ inputs.remove-android }} == 'true' ]]; then + echo -n "android " + fi + if [[ ${{ inputs.remove-haskell }} == 'true' ]]; then + echo -n "haskell " + fi + if [[ ${{ inputs.remove-codeql }} == 'true' ]]; then + echo -n "codeql " + fi + if [[ ${{ inputs.remove-docker-images }} == 'true' ]]; then + echo -n "docker " + fi + echo + + # store owner of $GITHUB_WORKSPACE in case the action deletes it + WORKSPACE_OWNER="$(stat -c '%U:%G' "${GITHUB_WORKSPACE}")" + + # ensure mount path exists before the action + sudo mkdir -p "${BUILD_MOUNT_PATH}" + sudo find "${BUILD_MOUNT_PATH}" -maxdepth 0 ! -empty -exec echo 'WARNING: directory [{}] is not empty, data loss might occur. Content:' \; -exec ls -al "{}" \; + + echo "Removing unwanted software... " + if [[ ${{ inputs.remove-dotnet }} == 'true' ]]; then + sudo rm -rf /usr/share/dotnet + fi + if [[ ${{ inputs.remove-android }} == 'true' ]]; then + sudo rm -rf /usr/local/lib/android + fi + if [[ ${{ inputs.remove-haskell }} == 'true' ]]; then + sudo rm -rf /opt/ghc + fi + if [[ ${{ inputs.remove-codeql }} == 'true' ]]; then + sudo rm -rf /opt/hostedtoolcache/CodeQL + fi + if [[ ${{ inputs.remove-docker-images }} == 'true' ]]; then + sudo docker image prune --all --force + fi + echo "... done" + + VG_NAME=buildvg + + # github runners have an active swap file in /mnt/swapfile + # we want to reuse the temp disk, so first unmount swap and clean the temp disk + echo "Unmounting and removing swap file." + sudo swapoff -a + sudo rm -f /mnt/swapfile + + echo "Creating LVM Volume." + echo " Creating LVM PV on root fs." + # create loop pv image on root fs + ROOT_RESERVE_KB=$(expr ${{ inputs.root-reserve-mb }} \* 1024) + ROOT_FREE_KB=$(df --block-size=1024 --output=avail / | tail -1) + ROOT_LVM_SIZE_KB=$(expr $ROOT_FREE_KB - $ROOT_RESERVE_KB) + ROOT_LVM_SIZE_BYTES=$(expr $ROOT_LVM_SIZE_KB \* 1024) + sudo touch "${{ inputs.pv-loop-path }}" && sudo fallocate -z -l "${ROOT_LVM_SIZE_BYTES}" "${{ inputs.pv-loop-path }}" + export ROOT_LOOP_DEV=$(sudo losetup --find --show "${{ inputs.pv-loop-path }}") + sudo pvcreate -f "${ROOT_LOOP_DEV}" + + # create pv on temp disk + echo " Creating LVM PV on temp fs." + TMP_RESERVE_KB=$(expr ${{ inputs.temp-reserve-mb }} \* 1024) + TMP_FREE_KB=$(df --block-size=1024 --output=avail /mnt | tail -1) + TMP_LVM_SIZE_KB=$(expr $TMP_FREE_KB - $TMP_RESERVE_KB) + TMP_LVM_SIZE_BYTES=$(expr $TMP_LVM_SIZE_KB \* 1024) + sudo touch "${{ inputs.tmp-pv-loop-path }}" && sudo fallocate -z -l "${TMP_LVM_SIZE_BYTES}" "${{ inputs.tmp-pv-loop-path }}" + export TMP_LOOP_DEV=$(sudo losetup --find --show "${{ inputs.tmp-pv-loop-path }}") + sudo pvcreate -f "${TMP_LOOP_DEV}" + + # create volume group from these pvs + sudo vgcreate "${VG_NAME}" "${TMP_LOOP_DEV}" "${ROOT_LOOP_DEV}" + + echo "Recreating swap" + # create and activate swap + sudo lvcreate -L "${{ inputs.swap-size-mb }}M" -n swap "${VG_NAME}" + sudo mkswap "/dev/mapper/${VG_NAME}-swap" + sudo swapon "/dev/mapper/${VG_NAME}-swap" + + echo "Creating build volume" + # create and mount build volume + sudo lvcreate -l 100%FREE -n buildlv "${VG_NAME}" + if [[ ${{ inputs.overprovision-lvm }} == 'true' ]]; then + sudo mkfs.ext4 -m0 "/dev/mapper/${VG_NAME}-buildlv" + else + sudo mkfs.ext4 -Enodiscard -m0 "/dev/mapper/${VG_NAME}-buildlv" + fi + sudo mount "/dev/mapper/${VG_NAME}-buildlv" "${BUILD_MOUNT_PATH}" + sudo chown -R "${{ inputs.build-mount-path-ownership }}" "${BUILD_MOUNT_PATH}" + + # if build mount path is a parent of $GITHUB_WORKSPACE, and has been deleted, recreate it + if [[ ! -d "${GITHUB_WORKSPACE}" ]]; then + sudo mkdir -p "${GITHUB_WORKSPACE}" + sudo chown -R "${WORKSPACE_OWNER}" "${GITHUB_WORKSPACE}" + fi + + - name: Disk space report after modification + shell: bash + run: | + echo "Memory and swap:" + sudo free + echo + sudo swapon --show + echo + + echo "Available storage:" + sudo df -h diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 092985ed89..e27be7daf9 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -244,6 +244,6 @@ jobs: - name: Export PR comment data uses: ./.github/actions/pr-comment-data-export with: - name: bench + name: ${{ github.workflow }} contents: results.md log-url: ${{ steps.export.outputs.artifact-url }} diff --git a/.github/workflows/firefox.yml b/.github/workflows/firefox.yml new file mode 100644 index 0000000000..adf65a85e1 --- /dev/null +++ b/.github/workflows/firefox.yml @@ -0,0 +1,169 @@ +name: Firefox +on: + push: + branches: ["main"] + paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] + pull_request: + branches: ["main"] + paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] + merge_group: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref_name }} + cancel-in-progress: true + +env: + FIREFOX: Firefox + +jobs: + firefox: + name: Build Firefox + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-14, windows-latest] + type: [debug, release] + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash + env: + MOZBUILD_STATE_PATH: ${{ github.workspace }}/mozbuild + CARGO_HOME: ${{ github.workspace }}/cargo + + steps: + # We need to check out Neqo first, because the maximize-build-space action + # is vendored in. + - name: Check out Neqo + uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + + - name: Maximize build space + if: runner.os == 'Linux' + uses: ./.github/actions/maximize-build-space + with: + root-reserve-mb: 2048 + temp-reserve-mb: 2048 + swap-size-mb: 4096 + remove-dotnet: true + remove-android: true + remove-haskell: true + remove-docker-images: true + + # The previous step blew it away, so we need to check it out again. + - name: Check out Neqo again + if: runner.os == 'Linux' + uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + + - name: Check out Firefox + uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + with: + repository: mozilla/gecko-dev + path: mozilla-unified + + - name: Install deps (Windows) + if: runner.os == 'Windows' + run: choco install -y mozillabuild + + - name: Bootstrap Firefox + run: | + cd mozilla-unified + { + echo "mk_add_options MOZ_OBJDIR=../$FIREFOX" + echo "ac_add_options --with-ccache=sccache" + echo "ac_add_options --enable-application=browser" + # Work around https://bugzilla.mozilla.org/show_bug.cgi?id=1894031 + if [ "${{ runner.os }}" != "Windows" ] || [ "${{ matrix.type}}" != "debug" ]; then + echo "ac_add_options --disable-tests" + fi + echo "ac_add_options --enable-${{ matrix.type }}" + } >> mozconfig + ./mach bootstrap --application-choice browser + + - name: Plumb in Neqo + run: | + # Get qlog version used by neqo + cargo generate-lockfile + QLOG_VERSION=$(cargo pkgid qlog | cut -d@ -f2) + rm Cargo.lock + cd mozilla-unified + { + echo '[[audits.qlog]]' + echo 'who = "CI"' + echo 'criteria = "safe-to-deploy"' + echo "version = \"$QLOG_VERSION\"" + } >> supply-chain/audits.toml + sed -i'' -e "s/qlog =.*/qlog = \"$QLOG_VERSION\"/" netwerk/socket/neqo_glue/Cargo.toml + { + echo '[patch."https://github.com/mozilla/neqo"]' + echo 'neqo-http3 = { path = "../neqo-http3" }' + echo 'neqo-transport = { path = "../neqo-transport" }' + echo 'neqo-common = { path = "../neqo-common" }' + echo 'neqo-qpack = { path = "../neqo-qpack" }' + echo 'neqo-crypto = { path = "../neqo-crypto" }' + } >> Cargo.toml + cargo update neqo-http3 neqo-transport neqo-common neqo-qpack neqo-crypto + ./mach vendor rust --ignore-modified + + - name: Build Firefox + env: + NAME: ${{ runner.os == 'macOS' && 'Nightly' || 'bin' }} + TYPE: ${{ runner.os == 'macOS' && matrix.type == 'debug' && 'Debug' || '' }} + EXT: ${{ runner.os == 'macOS' && '.app' || '' }} + run: | + cd mozilla-unified + ./mach build && tar -cf "../$FIREFOX.tar" -C "../$FIREFOX/dist" "$NAME$TYPE$EXT" + exit 0 + + - name: Export binary + id: upload + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 + with: + name: ${{ runner.os }}-${{ env.FIREFOX }}-${{ matrix.type }}.tgz + path: ${{ env.FIREFOX }}.tar + compression-level: 9 + + - run: echo "${{ steps.upload.outputs.artifact-url }}" >> artifact + + - name: Export artifact URL + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 + with: + name: artifact-${{ runner.os }}-${{ env.FIREFOX }}-${{ matrix.type }} + path: artifact + retention-days: 1 + + comment: + name: Comment on PR + if: always() + needs: firefox + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + + - uses: actions/download-artifact@9c19ed7fe5d278cd354c7dfd5d3b88589c7e2395 # v4.1.6 + with: + pattern: 'artifact-*' + path: artifacts + + - run: | + { + echo "### Firefox builds for this PR" + echo "The following builds are available for testing. Crossed-out builds did not succeed." + for os in Linux macOS Windows; do + echo -n "* **$os**:" + for type in debug release; do + artifact="artifacts/artifact-$os-${{ env.FIREFOX }}-$type/artifact" + if [ -e "$artifact" ]; then + echo -n " [${type^}]($(cat $artifact))" + else + echo -n " ~~${type^}~~" + fi + done + echo + done + } > comment.md + cat comment.md > "$GITHUB_STEP_SUMMARY" + + - uses: ./.github/actions/pr-comment-data-export + with: + name: ${{ github.workflow }} + contents: comment.md diff --git a/.github/workflows/bench-comment.yml b/.github/workflows/pr-comment.yml similarity index 76% rename from .github/workflows/bench-comment.yml rename to .github/workflows/pr-comment.yml index 508a5fe5dd..ac500f3831 100644 --- a/.github/workflows/bench-comment.yml +++ b/.github/workflows/pr-comment.yml @@ -4,11 +4,11 @@ # tests itself might run off of a fork, i.e., an untrusted environment and should # thus not be granted write permissions. -name: Benchmark Comment +name: PR Comment on: workflow_run: - workflows: ["CI"] + workflows: ["QNS", "CI", "Firefox"] types: - completed @@ -21,10 +21,10 @@ jobs: runs-on: ubuntu-latest if: | github.event.workflow_run.event == 'pull_request' && - github.event.workflow_run.conclusion == 'success' + (github.event.workflow_run.name != 'CI' || github.event.workflow_run.conclusion == 'success' ) steps: - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - uses: ./.github/actions/pr-comment with: - name: bench + name: ${{ github.event.workflow_run.name }} token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/qns-comment.yml b/.github/workflows/qns-comment.yml deleted file mode 100644 index 37a2a38a45..0000000000 --- a/.github/workflows/qns-comment.yml +++ /dev/null @@ -1,29 +0,0 @@ -# Post test results as pull request comment. -# -# This is done as a separate workflow as it requires write permissions. The -# tests itself might run off of a fork, i.e., an untrusted environment and should -# thus not be granted write permissions. - -name: QUIC Network Simulator Comment - -on: - workflow_run: - workflows: ["QUIC Network Simulator"] - types: - - completed - -permissions: read-all - -jobs: - comment: - permissions: - pull-requests: write - runs-on: ubuntu-latest - if: | - github.event.workflow_run.event == 'pull_request' - steps: - - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - - uses: ./.github/actions/pr-comment - with: - name: qns - token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index a29f6bbd5e..5a5e07b7fd 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -1,4 +1,5 @@ -name: QUIC Network Simulator +name: QNS + on: push: branches: ["main"] @@ -218,5 +219,5 @@ jobs: - uses: ./.github/actions/pr-comment-data-export with: - name: qns + name: ${{ github.workflow }} contents: comment.md From bb88aab4525ff74c42b6cc187813c0b0e2813f80 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 May 2024 02:51:29 +0200 Subject: [PATCH 10/14] fix(SendMessage): use SendStream::set_writable_event_low_watermark (#1838) * fix(SendMessage): use SendStream::set_writable_event_low_watermark Previously `SendMessage::send_data` could stall, if less than the minimum message size is available to be sent. See https://github.com/mozilla/neqo/issues/1819 for details. This commit implements solution (3) proposed in https://github.com/mozilla/neqo/issues/1819. This commit introduces `SendStream::set_writable_event_low_watermark` which is then used in `SendMessage::send_data` to signal to `SendStream` the minimum required send space (low watermark) for the next send. Once reached, `SendStream` emits a `SendStreamWritable` eventually triggering another `SendMessage::send_data`. Alternative to https://github.com/mozilla/neqo/pull/1835. Compared to https://github.com/mozilla/neqo/pull/1835, this fix does not utilize the `SendMessage` buffer, thus does not introduce an indirection to the send path. In addition, under the assumption that available send space is increased in larger batches, this fix does not send tiny data frames (2 byte header, 1 byte goodput). Downside, compared to https://github.com/mozilla/neqo/pull/1835, is that it requires both changes in `neqo-transport` and `neqo-http3`. Secondarily, this fixes https://github.com/mozilla/neqo/pull/1821 as well. * Move const * Add documentation * Add SendStream test * Fix intra doc links * Add neqo-http3 test * Replace goodput with payload * Re-trigger benchmarks Let's see whether the "Download" benchmark is consistent. * Rename emit_writable_event to maybe_emit_writable_event * Replace expect with unwrap * Use NonZeroUsize::get * Replace expect with unwrap * %s/Actually sending/Sending * Typo * Have update() return available amount * Document setting once would suffice * Reduce verbosity * fix: drop RefCell mutable borrow early --- neqo-http3/src/send_message.rs | 12 +++- neqo-http3/src/server_events.rs | 13 ++++ neqo-http3/tests/httpconn.rs | 77 ++++++++++++++++++++++ neqo-transport/src/connection/mod.rs | 29 +++++++++ neqo-transport/src/fc.rs | 9 +-- neqo-transport/src/send_stream.rs | 97 +++++++++++++++++++++++++--- neqo-transport/src/streams.rs | 23 ++++--- 7 files changed, 233 insertions(+), 27 deletions(-) diff --git a/neqo-http3/src/send_message.rs b/neqo-http3/src/send_message.rs index 7fb37beb70..6553d20432 100644 --- a/neqo-http3/src/send_message.rs +++ b/neqo-http3/src/send_message.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{cell::RefCell, cmp::min, fmt::Debug, rc::Rc}; +use std::{cell::RefCell, cmp::min, fmt::Debug, num::NonZeroUsize, rc::Rc}; use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType}; use neqo_qpack::encoder::QPackEncoder; @@ -17,6 +17,7 @@ use crate::{ SendStream, SendStreamEvents, Stream, }; +const MIN_DATA_FRAME_SIZE: usize = 3; // Minimal DATA frame size: 2 (header) + 1 (payload) const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2 const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header) const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3 @@ -177,7 +178,14 @@ impl SendStream for SendMessage { let available = conn .stream_avail_send_space(self.stream_id()) .map_err(|e| Error::map_stream_send_errors(&e.into()))?; - if available <= 2 { + if available < MIN_DATA_FRAME_SIZE { + // Setting this once, instead of every time the available send space + // is exhausted, would suffice. That said, function call should be + // cheap, thus not worth optimizing. + conn.stream_set_writable_event_low_watermark( + self.stream_id(), + NonZeroUsize::new(MIN_DATA_FRAME_SIZE).unwrap(), + )?; return Ok(0); } let to_send = if available <= MAX_DATA_HEADER_SIZE_2_LIMIT { diff --git a/neqo-http3/src/server_events.rs b/neqo-http3/src/server_events.rs index 214a48c757..119d9f9f39 100644 --- a/neqo-http3/src/server_events.rs +++ b/neqo-http3/src/server_events.rs @@ -84,6 +84,19 @@ impl StreamHandler { .send_data(self.stream_id(), buf, &mut self.conn.borrow_mut()) } + /// Bytes sendable on stream at the QUIC layer. + /// + /// Note that this does not yet account for HTTP3 frame headers. + /// + /// # Errors + /// + /// It may return `InvalidStreamId` if a stream does not exist anymore. + pub fn available(&mut self) -> Res { + let stream_id = self.stream_id(); + let n = self.conn.borrow_mut().stream_avail_send_space(stream_id)?; + Ok(n) + } + /// Close sending side. /// /// # Errors diff --git a/neqo-http3/tests/httpconn.rs b/neqo-http3/tests/httpconn.rs index c0c62de9c9..8b9e7b42e8 100644 --- a/neqo-http3/tests/httpconn.rs +++ b/neqo-http3/tests/httpconn.rs @@ -246,6 +246,83 @@ fn test_103_response() { process_client_events(&mut hconn_c); } +/// Test [`neqo_http3::SendMessage::send_data`] to set +/// [`neqo_transport::SendStream::set_writable_event_low_watermark`]. +#[allow(clippy::cast_possible_truncation)] +#[test] +fn test_data_writable_events_low_watermark() -> Result<(), Box> { + const STREAM_LIMIT: u64 = 5000; + const DATA_FRAME_HEADER_SIZE: usize = 3; + + // Create a client and a server. + let mut hconn_c = http3_client_with_params(Http3Parameters::default().connection_parameters( + ConnectionParameters::default().max_stream_data(StreamType::BiDi, false, STREAM_LIMIT), + )); + let mut hconn_s = default_http3_server(); + mem::drop(connect_peers(&mut hconn_c, &mut hconn_s)); + + // Client sends GET to server. + let stream_id = hconn_c.fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default(), + )?; + hconn_c.stream_close_send(stream_id)?; + exchange_packets(&mut hconn_c, &mut hconn_s, None); + + // Server receives GET and responds with headers. + let mut request = receive_request(&mut hconn_s).unwrap(); + request.send_headers(&[Header::new(":status", "200")])?; + + // Sending these headers clears the server's send stream buffer and thus + // emits a DataWritable event. + exchange_packets(&mut hconn_c, &mut hconn_s, None); + let data_writable = |e| { + matches!( + e, + Http3ServerEvent::DataWritable { + stream + } if stream.stream_id() == stream_id + ) + }; + assert!(hconn_s.events().any(data_writable)); + + // Have server fill entire send buffer minus 1 byte. + let all_but_one = request.available()? - DATA_FRAME_HEADER_SIZE - 1; + let buf = vec![1; all_but_one]; + let sent = request.send_data(&buf)?; + assert_eq!(sent, all_but_one); + assert_eq!(request.available()?, 1); + + // Sending the buffered data clears the send stream buffer and thus emits a + // DataWritable event. + exchange_packets(&mut hconn_c, &mut hconn_s, None); + assert!(hconn_s.events().any(data_writable)); + + // Sending more fails, given that each data frame needs to be preceeded by a + // header, i.e. needs more than 1 byte of send space to send 1 byte payload. + assert_eq!(request.available()?, 1); + assert_eq!(request.send_data(&buf)?, 0); + + // Have the client read all the pending data. + let mut recv_buf = vec![0_u8; all_but_one]; + let (recvd, _) = hconn_c.read_data(now(), stream_id, &mut recv_buf)?; + assert_eq!(sent, recvd); + exchange_packets(&mut hconn_c, &mut hconn_s, None); + + // Expect the server's available send space to be back to the stream limit. + assert_eq!(request.available()?, STREAM_LIMIT as usize); + + // Expect the server to emit a DataWritable event, even though it always had + // at least 1 byte available to send, i.e. it never exhausted the entire + // available send space. + assert!(hconn_s.events().any(data_writable)); + + Ok(()) +} + #[test] fn test_data_writable_events() { const STREAM_LIMIT: u64 = 5000; diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index f955381414..732cd31cf4 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -12,6 +12,7 @@ use std::{ fmt::{self, Debug}, iter, mem, net::{IpAddr, SocketAddr}, + num::NonZeroUsize, ops::RangeInclusive, rc::{Rc, Weak}, time::{Duration, Instant}, @@ -3184,6 +3185,34 @@ impl Connection { Ok(self.streams.get_send_stream(stream_id)?.avail()) } + /// Set low watermark for [`ConnectionEvent::SendStreamWritable`] event. + /// + /// Stream emits a [`crate::ConnectionEvent::SendStreamWritable`] event + /// when: + /// - the available sendable bytes increased to or above the watermark + /// - and was previously below the watermark. + /// + /// Default value is `1`. In other words + /// [`crate::ConnectionEvent::SendStreamWritable`] is emitted whenever the + /// available sendable bytes was previously at `0` and now increased to `1` + /// or more. + /// + /// Use this when your protocol needs at least `watermark` amount of available + /// sendable bytes to make progress. + /// + /// # Errors + /// When the stream ID is invalid. + pub fn stream_set_writable_event_low_watermark( + &mut self, + stream_id: StreamId, + watermark: NonZeroUsize, + ) -> Res<()> { + self.streams + .get_send_stream_mut(stream_id)? + .set_writable_event_low_watermark(watermark); + Ok(()) + } + /// Close the stream. Enqueued data will be sent. /// # Errors /// When the stream ID is invalid. diff --git a/neqo-transport/src/fc.rs b/neqo-transport/src/fc.rs index 5ddfce6463..d619fd8e82 100644 --- a/neqo-transport/src/fc.rs +++ b/neqo-transport/src/fc.rs @@ -64,15 +64,16 @@ where } } - /// Update the maximum. Returns `true` if the change was an increase. - pub fn update(&mut self, limit: u64) -> bool { + /// Update the maximum. Returns `Some` with the updated available flow + /// control if the change was an increase and `None` otherwise. + pub fn update(&mut self, limit: u64) -> Option { debug_assert!(limit < u64::MAX); if limit > self.limit { self.limit = limit; self.blocked_frame = false; - true + Some(self.available()) } else { - false + None } } diff --git a/neqo-transport/src/send_stream.rs b/neqo-transport/src/send_stream.rs index 98476e9d18..e443edc033 100644 --- a/neqo-transport/src/send_stream.rs +++ b/neqo-transport/src/send_stream.rs @@ -12,6 +12,7 @@ use std::{ collections::{btree_map::Entry, BTreeMap, VecDeque}, hash::{Hash, Hasher}, mem, + num::NonZeroUsize, ops::Add, rc::Rc, }; @@ -710,6 +711,7 @@ pub struct SendStream { sendorder: Option, bytes_sent: u64, fair: bool, + writable_event_low_watermark: NonZeroUsize, } impl Hash for SendStream { @@ -726,6 +728,7 @@ impl PartialEq for SendStream { impl Eq for SendStream {} impl SendStream { + #[allow(clippy::missing_panics_doc)] // not possible pub fn new( stream_id: StreamId, max_stream_data: u64, @@ -745,6 +748,7 @@ impl SendStream { sendorder: None, bytes_sent: 0, fair: false, + writable_event_low_watermark: 1.try_into().unwrap(), }; if ss.avail() > 0 { ss.conn_events.send_stream_writable(stream_id); @@ -1128,10 +1132,10 @@ impl SendStream { SendStreamState::Send { ref mut send_buf, .. } => { + let previous_limit = send_buf.avail(); send_buf.mark_as_acked(offset, len); - if self.avail() > 0 { - self.conn_events.send_stream_writable(self.stream_id); - } + let current_limit = send_buf.avail(); + self.maybe_emit_writable_event(previous_limit, current_limit); } SendStreamState::DataSent { ref mut send_buf, @@ -1203,14 +1207,21 @@ impl SendStream { } } + /// Set low watermark for [`crate::ConnectionEvent::SendStreamWritable`] + /// event. + /// + /// See [`crate::Connection::stream_set_writable_event_low_watermark`]. + pub fn set_writable_event_low_watermark(&mut self, watermark: NonZeroUsize) { + self.writable_event_low_watermark = watermark; + } + pub fn set_max_stream_data(&mut self, limit: u64) { if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = &mut self.state { - let stream_was_blocked = fc.available() == 0; - fc.update(limit); - if stream_was_blocked && self.avail() > 0 { - self.conn_events.send_stream_writable(self.stream_id); + let previous_limit = fc.available(); + if let Some(current_limit) = fc.update(limit) { + self.maybe_emit_writable_event(previous_limit, current_limit); } } } @@ -1369,6 +1380,27 @@ impl SendStream { pub(crate) fn state(&mut self) -> &mut SendStreamState { &mut self.state } + + pub(crate) fn maybe_emit_writable_event( + &mut self, + previous_limit: usize, + current_limit: usize, + ) { + let low_watermark = self.writable_event_low_watermark.get(); + + // Skip if: + // - stream was not constrained by limit before, + // - or stream is still constrained by limit, + // - or stream is constrained by different limit. + if low_watermark < previous_limit + || current_limit < low_watermark + || self.avail() < low_watermark + { + return; + } + + self.conn_events.send_stream_writable(self.stream_id); + } } impl ::std::fmt::Display for SendStream { @@ -1756,7 +1788,7 @@ pub struct SendStreamRecoveryToken { #[cfg(test)] mod tests { - use std::{cell::RefCell, collections::VecDeque, rc::Rc}; + use std::{cell::RefCell, collections::VecDeque, num::NonZeroUsize, rc::Rc}; use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder}; @@ -2450,7 +2482,7 @@ mod tests { // Increasing conn max (conn:4, stream:4) will unblock but not emit // event b/c that happens in Connection::emit_frame() (tested in // connection.rs) - assert!(conn_fc.borrow_mut().update(4)); + assert!(conn_fc.borrow_mut().update(4).is_some()); assert_eq!(conn_events.events().count(), 0); assert_eq!(s.avail(), 2); assert_eq!(s.send(b"hello").unwrap(), 2); @@ -2476,6 +2508,53 @@ mod tests { assert_eq!(s.send(b"hello").unwrap(), 0); } + #[test] + fn send_stream_writable_event_gen_with_watermark() { + let conn_fc = connection_fc(0); + let mut conn_events = ConnectionEvents::default(); + + let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone()); + // Set watermark at 3. + s.set_writable_event_low_watermark(NonZeroUsize::new(3).unwrap()); + + // Stream is initially blocked (conn:0, stream:0, watermark: 3) and will + // not accept data. + assert_eq!(s.avail(), 0); + assert_eq!(s.send(b"hi!").unwrap(), 0); + + // Increasing the connection limit (conn:10, stream:0, watermark: 3) will not generate + // event or allow sending anything. Stream is constrained by stream limit. + assert!(conn_fc.borrow_mut().update(10).is_some()); + assert_eq!(s.avail(), 0); + assert_eq!(conn_events.events().count(), 0); + + // Increasing the connection limit further (conn:11, stream:0, watermark: 3) will not + // generate event or allow sending anything. Stream wasn't constrained by connection + // limit before. + assert!(conn_fc.borrow_mut().update(11).is_some()); + assert_eq!(s.avail(), 0); + assert_eq!(conn_events.events().count(), 0); + + // Increasing to (conn:11, stream:2, watermark: 3) will allow 2 bytes + // but not generate a SendStreamWritable event as it is still below the + // configured watermark. + s.set_max_stream_data(2); + assert_eq!(conn_events.events().count(), 0); + assert_eq!(s.avail(), 2); + + // Increasing to (conn:11, stream:3, watermark: 3) will generate an + // event as available sendable bytes are >= watermark. + s.set_max_stream_data(3); + let evts = conn_events.events().collect::>(); + assert_eq!(evts.len(), 1); + assert!(matches!( + evts[0], + ConnectionEvent::SendStreamWritable { .. } + )); + + assert_eq!(s.send(b"hi!").unwrap(), 3); + } + #[test] fn send_stream_writable_event_new_stream() { let conn_fc = connection_fc(2); diff --git a/neqo-transport/src/streams.rs b/neqo-transport/src/streams.rs index d8662afa3b..b95b33c294 100644 --- a/neqo-transport/src/streams.rs +++ b/neqo-transport/src/streams.rs @@ -476,17 +476,13 @@ impl Streams { } pub fn handle_max_data(&mut self, maximum_data: u64) { - let conn_was_blocked = self.sender_fc.borrow().available() == 0; - let conn_credit_increased = self.sender_fc.borrow_mut().update(maximum_data); - - if conn_was_blocked && conn_credit_increased { - for (id, ss) in &mut self.send { - if ss.avail() > 0 { - // These may not actually all be writable if one - // uses up all the conn credit. Not our fault. - self.events.send_stream_writable(*id); - } - } + let previous_limit = self.sender_fc.borrow().available(); + let Some(current_limit) = self.sender_fc.borrow_mut().update(maximum_data) else { + return; + }; + + for (_id, ss) in &mut self.send { + ss.maybe_emit_writable_event(previous_limit, current_limit); } } @@ -531,7 +527,10 @@ impl Streams { } pub fn handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64) { - if self.local_stream_limits[stream_type].update(maximum_streams) { + let increased = self.local_stream_limits[stream_type] + .update(maximum_streams) + .is_some(); + if increased { self.events.send_stream_creatable(stream_type); } } From ebae88f95e423744a3c894f0cb5bc70f40712da3 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 7 May 2024 12:21:52 +0300 Subject: [PATCH 11/14] ci: Pin rustup-init (#1888) Fixes #1887 --- qns/Dockerfile | 50 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/qns/Dockerfile b/qns/Dockerfile index c4bbd2a79e..dde601f574 100644 --- a/qns/Dockerfile +++ b/qns/Dockerfile @@ -1,27 +1,51 @@ FROM martenseemann/quic-network-simulator-endpoint@sha256:12596544531465e77bdede50dd1e85b2c46c00f1634b3445eed277ca177666db AS buildimage RUN apt-get update && apt-get install -y --no-install-recommends \ - curl git mercurial coreutils \ - build-essential libclang-dev lld \ - gyp ninja-build zlib1g-dev python \ + git coreutils build-essential libclang-dev lld gyp ninja-build zlib1g-dev python \ && apt-get autoremove -y && apt-get clean -y \ && rm -rf /var/lib/apt/lists/* -ARG RUST_VERSION=stable +# From https://github.com/rust-lang/docker-rust/blob/cc31986e1dfe94671c639231ecf0503942c121d9/1.78.0/bookworm/slim/Dockerfile ENV RUSTUP_HOME=/usr/local/rustup \ CARGO_HOME=/usr/local/cargo \ - PATH=/usr/local/cargo/bin:$PATH + PATH=/usr/local/cargo/bin:$PATH \ + RUST_VERSION=1.78.0 -ADD --checksum=sha256:a3d541a5484c8fa2f1c21478a6f6c505a778d473c21d60a18a4df5185d320ef8 \ - https://static.rust-lang.org/rustup/dist/x86_64-unknown-linux-gnu/rustup-init x86_64_rustup - -ADD --checksum=sha256:76cd420cb8a82e540025c5f97bda3c65ceb0b0661d5843e6ef177479813b0367 \ - https://static.rust-lang.org/rustup/dist/aarch64-unknown-linux-gnu/rustup-init aarch64_rustup +RUN set -eux; \ + apt-get update; \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + gcc \ + libc6-dev \ + wget \ + ; \ + dpkgArch="$(dpkg --print-architecture)"; \ + case "${dpkgArch##*-}" in \ + amd64) rustArch='x86_64-unknown-linux-gnu'; rustupSha256='a3d541a5484c8fa2f1c21478a6f6c505a778d473c21d60a18a4df5185d320ef8' ;; \ + armhf) rustArch='armv7-unknown-linux-gnueabihf'; rustupSha256='7cff34808434a28d5a697593cd7a46cefdf59c4670021debccd4c86afde0ff76' ;; \ + arm64) rustArch='aarch64-unknown-linux-gnu'; rustupSha256='76cd420cb8a82e540025c5f97bda3c65ceb0b0661d5843e6ef177479813b0367' ;; \ + i386) rustArch='i686-unknown-linux-gnu'; rustupSha256='cacdd10eb5ec58498cd95dbb7191fdab5fa4343e05daaf0fb7cdcae63be0a272' ;; \ + ppc64el) rustArch='powerpc64le-unknown-linux-gnu'; rustupSha256='b152711fb15fd629f0d4c2731cbf9167e6352da0ffcb2210447d80c010180f96' ;; \ + s390x) rustArch='s390x-unknown-linux-gnu'; rustupSha256='4ff9e7963ed0457e64cbb29d2b5a37496d1fa303f9300adc5251ee3c16bd3b30' ;; \ + *) echo >&2 "unsupported architecture: ${dpkgArch}"; exit 1 ;; \ + esac; \ + url="https://static.rust-lang.org/rustup/archive/1.27.0/${rustArch}/rustup-init"; \ + wget "$url"; \ + echo "${rustupSha256} *rustup-init" | sha256sum -c -; \ + chmod +x rustup-init; \ + ./rustup-init -y --no-modify-path --profile minimal --default-toolchain $RUST_VERSION --default-host ${rustArch}; \ + rm rustup-init; \ + chmod -R a+w $RUSTUP_HOME $CARGO_HOME; \ + rustup --version; \ + cargo --version; \ + rustc --version; \ + apt-get remove -y --auto-remove \ + wget \ + ; \ + rm -rf /var/lib/apt/lists/*; -RUN mv $(uname -m)_rustup rustup-init && \ - chmod +x rustup-init && \ - ./rustup-init -y -q --no-modify-path --profile minimal --default-toolchain $RUST_VERSION +# End of copy from https://github.com/rust-lang/docker-rust... ENV NSS_DIR=/nss \ NSPR_DIR=/nspr \ From 6979f019b6a3f5e8ea4616068eba02fa5f3a01f5 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 7 May 2024 13:13:26 +0300 Subject: [PATCH 12/14] chore: Fix new clippy lints in cargo 1.80.0-nightly (#1883) * chore: Fix new clippy lints in cargo 1.80.0-nightly (05364cb2f 2024-05-03) * No need for `#[allow(clippy::mutable_key_type)]` * No need for `build = "build.rs"` --- fuzz/build.rs | 9 +++++++++ neqo-common/Cargo.toml | 1 - neqo-crypto/Cargo.toml | 1 - neqo-crypto/build.rs | 1 + neqo-http3/src/server.rs | 6 +++--- neqo-http3/tests/httpconn.rs | 14 ++++++-------- neqo-transport/build.rs | 9 +++++++++ 7 files changed, 28 insertions(+), 13 deletions(-) create mode 100644 fuzz/build.rs create mode 100644 neqo-transport/build.rs diff --git a/fuzz/build.rs b/fuzz/build.rs new file mode 100644 index 0000000000..47615132f1 --- /dev/null +++ b/fuzz/build.rs @@ -0,0 +1,9 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +fn main() { + println!("cargo:rustc-check-cfg=cfg(fuzzing)"); +} diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index a70b3738d5..dc4997bcf1 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -1,6 +1,5 @@ [package] name = "neqo-common" -build = "build.rs" authors.workspace = true homepage.workspace = true repository.workspace = true diff --git a/neqo-crypto/Cargo.toml b/neqo-crypto/Cargo.toml index 85b2630404..b16c098fab 100644 --- a/neqo-crypto/Cargo.toml +++ b/neqo-crypto/Cargo.toml @@ -1,6 +1,5 @@ [package] name = "neqo-crypto" -build = "build.rs" authors.workspace = true homepage.workspace = true repository.workspace = true diff --git a/neqo-crypto/build.rs b/neqo-crypto/build.rs index 2dd4543797..5080f89d03 100644 --- a/neqo-crypto/build.rs +++ b/neqo-crypto/build.rs @@ -421,6 +421,7 @@ fn setup_for_gecko() -> Vec { } fn main() { + println!("cargo:rustc-check-cfg=cfg(nss_nodb)"); let flags = if cfg!(feature = "gecko") { setup_for_gecko() } else if let Ok(nss_dir) = env::var("NSS_DIR") { diff --git a/neqo-http3/src/server.rs b/neqo-http3/src/server.rs index 8fce803fb3..e5fb5e1ae5 100644 --- a/neqo-http3/src/server.rs +++ b/neqo-http3/src/server.rs @@ -1271,11 +1271,11 @@ mod tests { while let Some(event) = hconn.next_event() { match event { Http3ServerEvent::Headers { stream, .. } => { - assert!(!requests.contains_key(&stream)); - requests.insert(stream, 0); + assert!(!requests.contains_key(&stream.stream_id())); + requests.insert(stream.stream_id(), 0); } Http3ServerEvent::Data { stream, .. } => { - assert!(requests.contains_key(&stream)); + assert!(requests.contains_key(&stream.stream_id())); } Http3ServerEvent::DataWritable { .. } | Http3ServerEvent::StreamReset { .. } diff --git a/neqo-http3/tests/httpconn.rs b/neqo-http3/tests/httpconn.rs index 8b9e7b42e8..801668af9f 100644 --- a/neqo-http3/tests/httpconn.rs +++ b/neqo-http3/tests/httpconn.rs @@ -522,14 +522,12 @@ fn fetch_noresponse_will_idletimeout() { let mut done = false; while !done { while let Some(event) = hconn_c.next_event() { - if let Http3ClientEvent::StateChange(state) = event { - match state { - Http3State::Closing(error_code) | Http3State::Closed(error_code) => { - assert_eq!(error_code, CloseReason::Transport(Error::IdleTimeout)); - done = true; - } - _ => {} - } + if let Http3ClientEvent::StateChange( + Http3State::Closing(error_code) | Http3State::Closed(error_code), + ) = event + { + assert_eq!(error_code, CloseReason::Transport(Error::IdleTimeout)); + done = true; } } diff --git a/neqo-transport/build.rs b/neqo-transport/build.rs new file mode 100644 index 0000000000..47615132f1 --- /dev/null +++ b/neqo-transport/build.rs @@ -0,0 +1,9 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +fn main() { + println!("cargo:rustc-check-cfg=cfg(fuzzing)"); +} From 08cdd476c217f2ff9b1d420d4d56e091496170a3 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 May 2024 15:48:02 +0200 Subject: [PATCH 13/14] refactor(bin): introduce server/http3.rs and server/http09.rs (#1877) The QUIC Interop Runner requires an http3 and http09 implementation for both client and server. The client code is already structured into an http3 and an http09 implementation since https://github.com/mozilla/neqo/pull/1727. This commit does the same for the server side, i.e. splits the http3 and http09 implementation into separate Rust modules. --- .../src/server/{old_https.rs => http09.rs} | 18 +- neqo-bin/src/server/http3.rs | 249 ++++++++++++++++++ neqo-bin/src/server/mod.rs | 246 +---------------- 3 files changed, 265 insertions(+), 248 deletions(-) rename neqo-bin/src/server/{old_https.rs => http09.rs} (95%) create mode 100644 neqo-bin/src/server/http3.rs diff --git a/neqo-bin/src/server/old_https.rs b/neqo-bin/src/server/http09.rs similarity index 95% rename from neqo-bin/src/server/old_https.rs rename to neqo-bin/src/server/http09.rs index 05520e1d3d..64b1e1be19 100644 --- a/neqo-bin/src/server/old_https.rs +++ b/neqo-bin/src/server/http09.rs @@ -17,21 +17,21 @@ use neqo_transport::{ }; use regex::Regex; -use super::{qns_read_response, Args, HttpServer}; +use super::{qns_read_response, Args}; #[derive(Default)] -struct Http09StreamState { +struct HttpStreamState { writable: bool, data_to_send: Option<(Vec, usize)>, } -pub struct Http09Server { +pub struct HttpServer { server: Server, - write_state: HashMap, + write_state: HashMap, read_state: HashMap>, } -impl Http09Server { +impl HttpServer { pub fn new( now: Instant, certs: &[impl AsRef], @@ -92,7 +92,7 @@ impl Http09Server { } else { self.write_state.insert( stream_id, - Http09StreamState { + HttpStreamState { writable: false, data_to_send: Some((resp, 0)), }, @@ -194,7 +194,7 @@ impl Http09Server { } } -impl HttpServer for Http09Server { +impl super::HttpServer for HttpServer { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { self.server.process(dgram, now) } @@ -210,7 +210,7 @@ impl HttpServer for Http09Server { match event { ConnectionEvent::NewStream { stream_id } => { self.write_state - .insert(stream_id, Http09StreamState::default()); + .insert(stream_id, HttpStreamState::default()); } ConnectionEvent::RecvStreamReadable { stream_id } => { self.stream_readable(stream_id, &mut acr, args); @@ -258,7 +258,7 @@ impl HttpServer for Http09Server { } } -impl Display for Http09Server { +impl Display for HttpServer { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { write!(f, "Http 0.9 server ") } diff --git a/neqo-bin/src/server/http3.rs b/neqo-bin/src/server/http3.rs new file mode 100644 index 0000000000..40a733ffb5 --- /dev/null +++ b/neqo-bin/src/server/http3.rs @@ -0,0 +1,249 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + borrow::Cow, + cell::RefCell, + cmp::min, + collections::HashMap, + fmt::{self, Display}, + path::PathBuf, + rc::Rc, + time::Instant, +}; + +use neqo_common::{qdebug, qerror, qwarn, Datagram, Header}; +use neqo_crypto::{generate_ech_keys, random, AntiReplay, Cipher}; +use neqo_http3::{ + Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, +}; +use neqo_transport::{server::ValidateAddress, ConnectionIdGenerator}; + +use super::{qns_read_response, Args}; + +pub struct HttpServer { + server: Http3Server, + /// Progress writing to each stream. + remaining_data: HashMap, + posts: HashMap, +} + +impl HttpServer { + const MESSAGE: &'static [u8] = &[0; 4096]; + + pub fn new( + args: &Args, + anti_replay: AntiReplay, + cid_mgr: Rc>, + ) -> Self { + let server = Http3Server::new( + args.now(), + &[args.key.clone()], + &[args.shared.alpn.clone()], + anti_replay, + cid_mgr, + Http3Parameters::default() + .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) + .max_table_size_encoder(args.shared.max_table_size_encoder) + .max_table_size_decoder(args.shared.max_table_size_decoder) + .max_blocked_streams(args.shared.max_blocked_streams), + None, + ) + .expect("We cannot make a server!"); + Self { + server, + remaining_data: HashMap::new(), + posts: HashMap::new(), + } + } +} + +impl Display for HttpServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.server.fmt(f) + } +} + +impl super::HttpServer for HttpServer { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> neqo_http3::Output { + self.server.process(dgram, now) + } + + fn process_events(&mut self, args: &Args, _now: Instant) { + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::Headers { + mut stream, + headers, + fin, + } => { + qdebug!("Headers (request={stream} fin={fin}): {headers:?}"); + + if headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "POST") + { + self.posts.insert(stream, 0); + continue; + } + + let Some(path) = headers.iter().find(|&h| h.name() == ":path") else { + stream + .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) + .unwrap(); + continue; + }; + + let mut response = if args.shared.qns_test.is_some() { + match qns_read_response(path.value()) { + Ok(data) => ResponseData::from(data), + Err(e) => { + qerror!("Failed to read {}: {e}", path.value()); + stream + .send_headers(&[Header::new(":status", "404")]) + .unwrap(); + stream.stream_close_send().unwrap(); + continue; + } + } + } else if let Ok(count) = + path.value().trim_matches(|p| p == '/').parse::() + { + ResponseData::repeat(Self::MESSAGE, count) + } else { + ResponseData::from(Self::MESSAGE) + }; + + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("content-length", response.remaining.to_string()), + ]) + .unwrap(); + response.send(&mut stream); + if response.done() { + stream.stream_close_send().unwrap(); + } else { + self.remaining_data.insert(stream.stream_id(), response); + } + } + Http3ServerEvent::DataWritable { mut stream } => { + if self.posts.get_mut(&stream).is_none() { + if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) { + remaining.send(&mut stream); + if remaining.done() { + self.remaining_data.remove(&stream.stream_id()); + stream.stream_close_send().unwrap(); + } + } + } + } + + Http3ServerEvent::Data { + mut stream, + data, + fin, + } => { + if let Some(received) = self.posts.get_mut(&stream) { + *received += data.len(); + } + if fin { + if let Some(received) = self.posts.remove(&stream) { + let msg = received.to_string().as_bytes().to_vec(); + stream + .send_headers(&[Header::new(":status", "200")]) + .unwrap(); + stream.send_data(&msg).unwrap(); + stream.stream_close_send().unwrap(); + } + } + } + _ => {} + } + } + } + + fn set_qlog_dir(&mut self, dir: Option) { + self.server.set_qlog_dir(dir); + } + + fn validate_address(&mut self, v: ValidateAddress) { + self.server.set_validation(v); + } + + fn set_ciphers(&mut self, ciphers: &[Cipher]) { + self.server.set_ciphers(ciphers); + } + + fn enable_ech(&mut self) -> &[u8] { + let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); + self.server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .unwrap(); + self.server.ech_config() + } + + fn has_events(&self) -> bool { + self.server.has_events() + } +} + +struct ResponseData { + data: Cow<'static, [u8]>, + offset: usize, + remaining: usize, +} + +impl From<&[u8]> for ResponseData { + fn from(data: &[u8]) -> Self { + Self::from(data.to_vec()) + } +} + +impl From> for ResponseData { + fn from(data: Vec) -> Self { + let remaining = data.len(); + Self { + data: Cow::Owned(data), + offset: 0, + remaining, + } + } +} + +impl ResponseData { + fn repeat(buf: &'static [u8], total: usize) -> Self { + Self { + data: Cow::Borrowed(buf), + offset: 0, + remaining: total, + } + } + + fn send(&mut self, stream: &mut Http3OrWebTransportStream) { + while self.remaining > 0 { + let end = min(self.data.len(), self.offset + self.remaining); + let slice = &self.data[self.offset..end]; + match stream.send_data(slice) { + Ok(0) => { + return; + } + Ok(sent) => { + self.remaining -= sent; + self.offset = (self.offset + sent) % self.data.len(); + } + Err(e) => { + qwarn!("Error writing to stream {}: {:?}", stream, e); + return; + } + } + } + } + + fn done(&self) -> bool { + self.remaining == 0 + } +} diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index df385119c2..bc874e413d 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -5,10 +5,7 @@ // except according to those terms. use std::{ - borrow::Cow, cell::RefCell, - cmp::min, - collections::HashMap, fmt::{self, Display}, fs, io, net::{SocketAddr, ToSocketAddrs}, @@ -24,25 +21,20 @@ use futures::{ future::{select, select_all, Either}, FutureExt, }; -use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header}; +use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - generate_ech_keys, init_db, random, AntiReplay, Cipher, + init_db, AntiReplay, Cipher, }; -use neqo_http3::{ - Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, -}; -use neqo_transport::{ - server::ValidateAddress, ConnectionIdGenerator, Output, RandomConnectionIdGenerator, Version, -}; -use old_https::Http09Server; +use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version}; use tokio::time::Sleep; use crate::{udp, SharedArgs}; const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); -mod old_https; +mod http09; +mod http3; #[derive(Debug)] pub enum Error { @@ -200,230 +192,6 @@ trait HttpServer: Display { fn enable_ech(&mut self) -> &[u8]; } -struct ResponseData { - data: Cow<'static, [u8]>, - offset: usize, - remaining: usize, -} - -impl From<&[u8]> for ResponseData { - fn from(data: &[u8]) -> Self { - Self::from(data.to_vec()) - } -} - -impl From> for ResponseData { - fn from(data: Vec) -> Self { - let remaining = data.len(); - Self { - data: Cow::Owned(data), - offset: 0, - remaining, - } - } -} - -impl ResponseData { - fn repeat(buf: &'static [u8], total: usize) -> Self { - Self { - data: Cow::Borrowed(buf), - offset: 0, - remaining: total, - } - } - - fn send(&mut self, stream: &mut Http3OrWebTransportStream) { - while self.remaining > 0 { - let end = min(self.data.len(), self.offset + self.remaining); - let slice = &self.data[self.offset..end]; - match stream.send_data(slice) { - Ok(0) => { - return; - } - Ok(sent) => { - self.remaining -= sent; - self.offset = (self.offset + sent) % self.data.len(); - } - Err(e) => { - qwarn!("Error writing to stream {}: {:?}", stream, e); - return; - } - } - } - } - - fn done(&self) -> bool { - self.remaining == 0 - } -} - -struct SimpleServer { - server: Http3Server, - /// Progress writing to each stream. - remaining_data: HashMap, - posts: HashMap, -} - -impl SimpleServer { - const MESSAGE: &'static [u8] = &[0; 4096]; - - pub fn new( - args: &Args, - anti_replay: AntiReplay, - cid_mgr: Rc>, - ) -> Self { - let server = Http3Server::new( - args.now(), - &[args.key.clone()], - &[args.shared.alpn.clone()], - anti_replay, - cid_mgr, - Http3Parameters::default() - .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) - .max_table_size_encoder(args.shared.max_table_size_encoder) - .max_table_size_decoder(args.shared.max_table_size_decoder) - .max_blocked_streams(args.shared.max_blocked_streams), - None, - ) - .expect("We cannot make a server!"); - Self { - server, - remaining_data: HashMap::new(), - posts: HashMap::new(), - } - } -} - -impl Display for SimpleServer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.server.fmt(f) - } -} - -impl HttpServer for SimpleServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.server.process(dgram, now) - } - - fn process_events(&mut self, args: &Args, _now: Instant) { - while let Some(event) = self.server.next_event() { - match event { - Http3ServerEvent::Headers { - mut stream, - headers, - fin, - } => { - qdebug!("Headers (request={stream} fin={fin}): {headers:?}"); - - if headers - .iter() - .any(|h| h.name() == ":method" && h.value() == "POST") - { - self.posts.insert(stream, 0); - continue; - } - - let Some(path) = headers.iter().find(|&h| h.name() == ":path") else { - stream - .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) - .unwrap(); - continue; - }; - - let mut response = if args.shared.qns_test.is_some() { - match qns_read_response(path.value()) { - Ok(data) => ResponseData::from(data), - Err(e) => { - qerror!("Failed to read {}: {e}", path.value()); - stream - .send_headers(&[Header::new(":status", "404")]) - .unwrap(); - stream.stream_close_send().unwrap(); - continue; - } - } - } else if let Ok(count) = - path.value().trim_matches(|p| p == '/').parse::() - { - ResponseData::repeat(Self::MESSAGE, count) - } else { - ResponseData::from(Self::MESSAGE) - }; - - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("content-length", response.remaining.to_string()), - ]) - .unwrap(); - response.send(&mut stream); - if response.done() { - stream.stream_close_send().unwrap(); - } else { - self.remaining_data.insert(stream.stream_id(), response); - } - } - Http3ServerEvent::DataWritable { mut stream } => { - if self.posts.get_mut(&stream).is_none() { - if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) { - remaining.send(&mut stream); - if remaining.done() { - self.remaining_data.remove(&stream.stream_id()); - stream.stream_close_send().unwrap(); - } - } - } - } - - Http3ServerEvent::Data { - mut stream, - data, - fin, - } => { - if let Some(received) = self.posts.get_mut(&stream) { - *received += data.len(); - } - if fin { - if let Some(received) = self.posts.remove(&stream) { - let msg = received.to_string().as_bytes().to_vec(); - stream - .send_headers(&[Header::new(":status", "200")]) - .unwrap(); - stream.send_data(&msg).unwrap(); - stream.stream_close_send().unwrap(); - } - } - } - _ => {} - } - } - } - - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .unwrap(); - self.server.ech_config() - } - - fn has_events(&self) -> bool { - self.server.has_events() - } -} - struct ServersRunner { args: Args, server: Box, @@ -466,7 +234,7 @@ impl ServersRunner { let mut svr: Box = if args.shared.use_old_http { Box::new( - Http09Server::new( + http09::HttpServer::new( args.now(), &[args.key.clone()], &[args.shared.alpn.clone()], @@ -477,7 +245,7 @@ impl ServersRunner { .expect("We cannot make a server!"), ) } else { - Box::new(SimpleServer::new(args, anti_replay, cid_mgr)) + Box::new(http3::HttpServer::new(args, anti_replay, cid_mgr)) }; svr.set_ciphers(&args.get_ciphers()); svr.set_qlog_dir(args.shared.qlog_dir.clone()); From beffcc6ca9b19bcf605aa3d36927da887c7eb6f4 Mon Sep 17 00:00:00 2001 From: Martin Thomson Date: Wed, 8 May 2024 00:08:14 +1000 Subject: [PATCH 14/14] Cleanup tracking (#1886) * fix: Resurrect #1662 Botched the "merge from main" on #1662. Reverted the merge. This PS has the changes in #1662, rebased to main. Sorry for the mess. * Fix * One-liner * Merge main * Merge main * Move back to BTreeMap * Tweak ordering * More simplification/performance * Test expectation fixup * Update neqo-transport/src/cc/classic_cc.rs Signed-off-by: Lars Eggert * Apply suggestions from code review Signed-off-by: Lars Eggert --------- Signed-off-by: Lars Eggert Co-authored-by: Lars Eggert --- neqo-crypto/src/agent.rs | 7 +- neqo-transport/src/cc/classic_cc.rs | 58 +-- neqo-transport/src/cc/mod.rs | 2 +- neqo-transport/src/cc/tests/cubic.rs | 2 +- neqo-transport/src/cc/tests/new_reno.rs | 6 +- neqo-transport/src/connection/mod.rs | 14 +- neqo-transport/src/ecn.rs | 6 +- neqo-transport/src/path.rs | 8 +- neqo-transport/src/qlog.rs | 5 +- .../src/{recovery.rs => recovery/mod.rs} | 222 ++++------ neqo-transport/src/recovery/sent.rs | 379 ++++++++++++++++++ neqo-transport/src/recovery/token.rs | 63 +++ neqo-transport/src/sender.rs | 4 +- neqo-transport/src/tracking.rs | 111 ----- 14 files changed, 569 insertions(+), 318 deletions(-) rename neqo-transport/src/{recovery.rs => recovery/mod.rs} (90%) create mode 100644 neqo-transport/src/recovery/sent.rs create mode 100644 neqo-transport/src/recovery/token.rs diff --git a/neqo-crypto/src/agent.rs b/neqo-crypto/src/agent.rs index 3d5a8b9f35..c04accd775 100644 --- a/neqo-crypto/src/agent.rs +++ b/neqo-crypto/src/agent.rs @@ -875,14 +875,13 @@ impl Client { arg: *mut c_void, ) -> ssl::SECStatus { let mut info: MaybeUninit = MaybeUninit::uninit(); - if ssl::SSL_GetResumptionTokenInfo( + let info_res = &ssl::SSL_GetResumptionTokenInfo( token, len, info.as_mut_ptr(), c_uint::try_from(mem::size_of::()).unwrap(), - ) - .is_err() - { + ); + if info_res.is_err() { // Ignore the token. return ssl::SECSuccess; } diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 6914e91f67..23f0d04bd2 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -17,9 +17,9 @@ use crate::{ cc::MAX_DATAGRAM_SIZE, packet::PacketNumber, qlog::{self, QlogMetric}, + recovery::SentPacket, rtt::RttEstimate, sender::PACING_BURST_SIZE, - tracking::SentPacket, }; #[rustfmt::skip] // to keep `::` and thus prevent conflict with `crate::qlog` use ::qlog::events::{quic::CongestionStateUpdated, EventData}; @@ -167,8 +167,8 @@ impl CongestionControl for ClassicCongestionControl { qdebug!( "packet_acked this={:p}, pn={}, ps={}, ignored={}, lost={}, rtt_est={:?}", self, - pkt.pn, - pkt.size, + pkt.pn(), + pkt.len(), i32::from(!pkt.cc_outstanding()), i32::from(pkt.lost()), rtt_est, @@ -176,12 +176,12 @@ impl CongestionControl for ClassicCongestionControl { if !pkt.cc_outstanding() { continue; } - if pkt.pn < self.first_app_limited { + if pkt.pn() < self.first_app_limited { is_app_limited = false; } // BIF is set to 0 on a path change, but in case that was because of a simple rebinding // event, we may still get ACKs for packets sent before the rebinding. - self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.size); + self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.len()); if !self.after_recovery_start(pkt) { // Do not increase congestion window for packets sent before @@ -194,7 +194,7 @@ impl CongestionControl for ClassicCongestionControl { qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]); } - new_acked += pkt.size; + new_acked += pkt.len(); } if is_app_limited { @@ -269,12 +269,12 @@ impl CongestionControl for ClassicCongestionControl { qdebug!( "packet_lost this={:p}, pn={}, ps={}", self, - pkt.pn, - pkt.size + pkt.pn(), + pkt.len() ); // BIF is set to 0 on a path change, but in case that was because of a simple rebinding // event, we may still declare packets lost that were sent before the rebinding. - self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.size); + self.bytes_in_flight = self.bytes_in_flight.saturating_sub(pkt.len()); } qlog::metrics_updated( &mut self.qlog, @@ -308,13 +308,13 @@ impl CongestionControl for ClassicCongestionControl { fn discard(&mut self, pkt: &SentPacket) { if pkt.cc_outstanding() { - assert!(self.bytes_in_flight >= pkt.size); - self.bytes_in_flight -= pkt.size; + assert!(self.bytes_in_flight >= pkt.len()); + self.bytes_in_flight -= pkt.len(); qlog::metrics_updated( &mut self.qlog, &[QlogMetric::BytesInFlight(self.bytes_in_flight)], ); - qtrace!([self], "Ignore pkt with size {}", pkt.size); + qtrace!([self], "Ignore pkt with size {}", pkt.len()); } } @@ -329,7 +329,7 @@ impl CongestionControl for ClassicCongestionControl { fn on_packet_sent(&mut self, pkt: &SentPacket) { // Record the recovery time and exit any transient state. if self.state.transient() { - self.recovery_start = Some(pkt.pn); + self.recovery_start = Some(pkt.pn()); self.state.update(); } @@ -341,15 +341,15 @@ impl CongestionControl for ClassicCongestionControl { // window. Assume that all in-flight packets up to this one are NOT app-limited. // However, subsequent packets might be app-limited. Set `first_app_limited` to the // next packet number. - self.first_app_limited = pkt.pn + 1; + self.first_app_limited = pkt.pn() + 1; } - self.bytes_in_flight += pkt.size; + self.bytes_in_flight += pkt.len(); qdebug!( "packet_sent this={:p}, pn={}, ps={}", self, - pkt.pn, - pkt.size + pkt.pn(), + pkt.len() ); qlog::metrics_updated( &mut self.qlog, @@ -448,20 +448,20 @@ impl ClassicCongestionControl { let cutoff = max(first_rtt_sample_time, prev_largest_acked_sent); for p in lost_packets .iter() - .skip_while(|p| Some(p.time_sent) < cutoff) + .skip_while(|p| Some(p.time_sent()) < cutoff) { - if p.pn != last_pn + 1 { + if p.pn() != last_pn + 1 { // Not a contiguous range of lost packets, start over. start = None; } - last_pn = p.pn; + last_pn = p.pn(); if !p.cc_in_flight() { // Not interesting, keep looking. continue; } if let Some(t) = start { let elapsed = p - .time_sent + .time_sent() .checked_duration_since(t) .expect("time is monotonic"); if elapsed > pc_period { @@ -476,7 +476,7 @@ impl ClassicCongestionControl { return true; } } else { - start = Some(p.time_sent); + start = Some(p.time_sent()); } } false @@ -490,7 +490,7 @@ impl ClassicCongestionControl { // state and update the variable `self.recovery_start`. Before the // first recovery, all packets were sent after the recovery event, // allowing to reduce the cwnd on congestion events. - !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn >= pn) + !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn) } /// Handle a congestion event. @@ -560,8 +560,8 @@ mod tests { CongestionControl, CongestionControlAlgorithm, CWND_INITIAL_PKTS, MAX_DATAGRAM_SIZE, }, packet::{PacketNumber, PacketType}, + recovery::SentPacket, rtt::RttEstimate, - tracking::SentPacket, }; const PTO: Duration = Duration::from_millis(100); @@ -923,13 +923,13 @@ mod tests { fn persistent_congestion_ack_eliciting() { let mut lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); lost[0] = SentPacket::new( - lost[0].pt, - lost[0].pn, - lost[0].ecn_mark, - lost[0].time_sent, + lost[0].packet_type(), + lost[0].pn(), + lost[0].ecn_mark(), + lost[0].time_sent(), false, Vec::new(), - lost[0].size, + lost[0].len(), ); assert!(!persistent_congestion_by_pto( ClassicCongestionControl::new(NewReno::default()), diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index 2adffbc0c4..e85413b491 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -14,7 +14,7 @@ use std::{ use neqo_common::qlog::NeqoQlog; -use crate::{path::PATH_MTU_V6, rtt::RttEstimate, tracking::SentPacket, Error}; +use crate::{path::PATH_MTU_V6, recovery::SentPacket, rtt::RttEstimate, Error}; mod classic_cc; mod cubic; diff --git a/neqo-transport/src/cc/tests/cubic.rs b/neqo-transport/src/cc/tests/cubic.rs index 8ff591cb47..4d8c436cc4 100644 --- a/neqo-transport/src/cc/tests/cubic.rs +++ b/neqo-transport/src/cc/tests/cubic.rs @@ -25,8 +25,8 @@ use crate::{ CongestionControl, MAX_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE_F64, }, packet::PacketType, + recovery::SentPacket, rtt::RttEstimate, - tracking::SentPacket, }; const RTT: Duration = Duration::from_millis(100); diff --git a/neqo-transport/src/cc/tests/new_reno.rs b/neqo-transport/src/cc/tests/new_reno.rs index 0cc560bf2b..a82e4995f4 100644 --- a/neqo-transport/src/cc/tests/new_reno.rs +++ b/neqo-transport/src/cc/tests/new_reno.rs @@ -17,8 +17,8 @@ use crate::{ MAX_DATAGRAM_SIZE, }, packet::PacketType, + recovery::SentPacket, rtt::RttEstimate, - tracking::SentPacket, }; const PTO: Duration = Duration::from_millis(100); @@ -133,14 +133,14 @@ fn issue_876() { // and ack it. cwnd increases slightly cc.on_packets_acked(&sent_packets[6..], &RTT_ESTIMATE, time_now); - assert_eq!(cc.acked_bytes(), sent_packets[6].size); + assert_eq!(cc.acked_bytes(), sent_packets[6].len()); cwnd_is_halved(&cc); assert_eq!(cc.bytes_in_flight(), 5 * MAX_DATAGRAM_SIZE - 2); // Packet from before is lost. Should not hurt cwnd. cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[1..2]); assert!(!cc.recovery_packet()); - assert_eq!(cc.acked_bytes(), sent_packets[6].size); + assert_eq!(cc.acked_bytes(), sent_packets[6].len()); cwnd_is_halved(&cc); assert_eq!(cc.bytes_in_flight(), 4 * MAX_DATAGRAM_SIZE); } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 732cd31cf4..fcc41dab83 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -46,7 +46,7 @@ use crate::{ path::{Path, PathRef, Paths}, qlog, quic_datagrams::{DatagramTracking, QuicDatagrams}, - recovery::{LossRecovery, RecoveryToken, SendProfile}, + recovery::{LossRecovery, RecoveryToken, SendProfile, SentPacket}, recv_stream::RecvStreamStats, rtt::{RttEstimate, GRANULARITY}, send_stream::SendStream, @@ -57,7 +57,7 @@ use crate::{ self, TransportParameter, TransportParameterId, TransportParameters, TransportParametersHandler, }, - tracking::{AckTracker, PacketNumberSpace, RecvdPackets, SentPacket}, + tracking::{AckTracker, PacketNumberSpace, RecvdPackets}, version::{Version, WireVersion}, AppError, CloseReason, Error, Res, StreamId, }; @@ -2370,7 +2370,7 @@ impl Connection { packets.len(), mtu ); - initial.size += mtu - packets.len(); + initial.track_padding(mtu - packets.len()); // These zeros aren't padding frames, they are an invalid all-zero coalesced // packet, which is why we don't increase `frame_tx.padding` count here. packets.resize(mtu, 0); @@ -2894,7 +2894,7 @@ impl Connection { /// to retransmit the frame as needed. fn handle_lost_packets(&mut self, lost_packets: &[SentPacket]) { for lost in lost_packets { - for token in &lost.tokens { + for token in lost.tokens() { qdebug!([self], "Lost: {:?}", token); match token { RecoveryToken::Ack(_) => {} @@ -2930,13 +2930,13 @@ impl Connection { fn handle_ack( &mut self, space: PacketNumberSpace, - largest_acknowledged: u64, + largest_acknowledged: PacketNumber, ack_ranges: R, ack_ecn: Option, ack_delay: u64, now: Instant, ) where - R: IntoIterator> + Debug, + R: IntoIterator> + Debug, R::IntoIter: ExactSizeIterator, { qdebug!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges); @@ -2954,7 +2954,7 @@ impl Connection { now, ); for acked in acked_packets { - for token in &acked.tokens { + for token in acked.tokens() { match token { RecoveryToken::Stream(stream_token) => self.streams.acked(stream_token), RecoveryToken::Ack(at) => self.acks.acked(at), diff --git a/neqo-transport/src/ecn.rs b/neqo-transport/src/ecn.rs index 20eb4da003..cd550e589c 100644 --- a/neqo-transport/src/ecn.rs +++ b/neqo-transport/src/ecn.rs @@ -9,7 +9,7 @@ use std::ops::{AddAssign, Deref, DerefMut, Sub}; use enum_map::EnumMap; use neqo_common::{qdebug, qinfo, qwarn, IpTosEcn}; -use crate::{packet::PacketNumber, tracking::SentPacket}; +use crate::{packet::PacketNumber, recovery::SentPacket}; /// The number of packets to use for testing a path for ECN capability. pub const ECN_TEST_COUNT: usize = 10; @@ -159,7 +159,7 @@ impl EcnInfo { // > Validating ECN counts from reordered ACK frames can result in failure. An endpoint MUST // > NOT fail ECN validation as a result of processing an ACK frame that does not increase // > the largest acknowledged packet number. - let largest_acked = acked_packets.first().expect("must be there").pn; + let largest_acked = acked_packets.first().expect("must be there").pn(); if largest_acked <= self.largest_acked { return; } @@ -186,7 +186,7 @@ impl EcnInfo { // > ECT(0) marking. let newly_acked_sent_with_ect0: u64 = acked_packets .iter() - .filter(|p| p.ecn_mark == IpTosEcn::Ect0) + .filter(|p| p.ecn_mark() == IpTosEcn::Ect0) .count() .try_into() .unwrap(); diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 0e4c82b1ca..037a04408a 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -25,11 +25,11 @@ use crate::{ ecn::{EcnCount, EcnInfo}, frame::{FRAME_TYPE_PATH_CHALLENGE, FRAME_TYPE_PATH_RESPONSE, FRAME_TYPE_RETIRE_CONNECTION_ID}, packet::PacketBuilder, - recovery::RecoveryToken, + recovery::{RecoveryToken, SentPacket}, rtt::RttEstimate, sender::PacketSender, stats::FrameStats, - tracking::{PacketNumberSpace, SentPacket}, + tracking::PacketNumberSpace, Stats, }; @@ -954,12 +954,12 @@ impl Path { qinfo!( [self], "discarding a packet without an RTT estimate; guessing RTT={:?}", - now - sent.time_sent + now - sent.time_sent() ); stats.rtt_init_guess = true; self.rtt.update( &mut self.qlog, - now - sent.time_sent, + now - sent.time_sent(), Duration::new(0, 0), false, now, diff --git a/neqo-transport/src/qlog.rs b/neqo-transport/src/qlog.rs index 715ba85e81..7d28d94d96 100644 --- a/neqo-transport/src/qlog.rs +++ b/neqo-transport/src/qlog.rs @@ -27,9 +27,9 @@ use crate::{ frame::{CloseError, Frame}, packet::{DecryptedPacket, PacketNumber, PacketType, PublicPacket}, path::PathRef, + recovery::SentPacket, stream_id::StreamType as NeqoStreamType, tparams::{self, TransportParametersHandler}, - tracking::SentPacket, version::{Version, VersionConfig, WireVersion}, }; @@ -254,7 +254,8 @@ pub fn packet_dropped(qlog: &mut NeqoQlog, public_packet: &PublicPacket) { pub fn packets_lost(qlog: &mut NeqoQlog, pkts: &[SentPacket]) { qlog.add_event_with_stream(|stream| { for pkt in pkts { - let header = PacketHeader::with_type(pkt.pt.into(), Some(pkt.pn), None, None, None); + let header = + PacketHeader::with_type(pkt.packet_type().into(), Some(pkt.pn()), None, None, None); let ev_data = EventData::PacketLost(PacketLost { header: Some(header), diff --git a/neqo-transport/src/recovery.rs b/neqo-transport/src/recovery/mod.rs similarity index 90% rename from neqo-transport/src/recovery.rs rename to neqo-transport/src/recovery/mod.rs index 22a635d9f3..b181bd88a0 100644 --- a/neqo-transport/src/recovery.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -6,31 +6,30 @@ // Tracking of sent packets and detecting their loss. +mod sent; +mod token; + use std::{ cmp::{max, min}, - collections::BTreeMap, - mem, + convert::TryFrom, ops::RangeInclusive, time::{Duration, Instant}, }; use neqo_common::{qdebug, qinfo, qlog::NeqoQlog, qtrace, qwarn}; +pub use sent::SentPacket; +use sent::SentPackets; use smallvec::{smallvec, SmallVec}; +pub use token::{RecoveryToken, StreamRecoveryToken}; use crate::{ - ackrate::AckRate, - cid::ConnectionIdEntry, - crypto::CryptoRecoveryToken, ecn::EcnCount, packet::PacketNumber, path::{Path, PathRef}, qlog::{self, QlogMetric}, - quic_datagrams::DatagramTracking, rtt::RttEstimate, - send_stream::SendStreamRecoveryToken, stats::{Stats, StatsCell}, - stream_id::{StreamId, StreamType}, - tracking::{AckToken, PacketNumberSpace, PacketNumberSpaceSet, SentPacket}, + tracking::{PacketNumberSpace, PacketNumberSpaceSet}, }; pub(crate) const PACKET_THRESHOLD: u64 = 3; @@ -49,54 +48,6 @@ pub(crate) const MIN_OUTSTANDING_UNACK: usize = 16; /// The scale we use for the fast PTO feature. pub const FAST_PTO_SCALE: u8 = 100; -#[derive(Debug, Clone)] -#[allow(clippy::module_name_repetitions)] -pub enum StreamRecoveryToken { - Stream(SendStreamRecoveryToken), - ResetStream { - stream_id: StreamId, - }, - StopSending { - stream_id: StreamId, - }, - - MaxData(u64), - DataBlocked(u64), - - MaxStreamData { - stream_id: StreamId, - max_data: u64, - }, - StreamDataBlocked { - stream_id: StreamId, - limit: u64, - }, - - MaxStreams { - stream_type: StreamType, - max_streams: u64, - }, - StreamsBlocked { - stream_type: StreamType, - limit: u64, - }, -} - -#[derive(Debug, Clone)] -#[allow(clippy::module_name_repetitions)] -pub enum RecoveryToken { - Stream(StreamRecoveryToken), - Ack(AckToken), - Crypto(CryptoRecoveryToken), - HandshakeDone, - KeepAlive, // Special PING. - NewToken(usize), - NewConnectionId(ConnectionIdEntry<[u8; 16]>), - RetireConnectionId(u64), - AckFrequency(AckRate), - Datagram(DatagramTracking), -} - /// `SendProfile` tells a sender how to send packets. #[derive(Debug)] pub struct SendProfile { @@ -181,7 +132,8 @@ pub(crate) struct LossRecoverySpace { /// This might be less than the number of ACK-eliciting packets, /// because PTO packets don't count. in_flight_outstanding: usize, - sent_packets: BTreeMap, + /// The packets that we have sent and are tracking. + sent_packets: SentPackets, /// The time that the first out-of-order packet was sent. /// This is `None` if there were no out-of-order packets detected. /// When set to `Some(T)`, time-based loss detection should be enabled. @@ -196,7 +148,7 @@ impl LossRecoverySpace { largest_acked_sent_time: None, last_ack_eliciting: None, in_flight_outstanding: 0, - sent_packets: BTreeMap::default(), + sent_packets: SentPackets::default(), first_ooo_time: None, } } @@ -221,9 +173,9 @@ impl LossRecoverySpace { pub fn pto_packets(&mut self, count: usize) -> impl Iterator { self.sent_packets .iter_mut() - .filter_map(|(pn, sent)| { + .filter_map(|sent| { if sent.pto() { - qtrace!("PTO: marking packet {} lost ", pn); + qtrace!("PTO: marking packet {} lost ", sent.pn()); Some(&*sent) } else { None @@ -256,16 +208,16 @@ impl LossRecoverySpace { pub fn on_packet_sent(&mut self, sent_packet: SentPacket) { if sent_packet.ack_eliciting() { - self.last_ack_eliciting = Some(sent_packet.time_sent); + self.last_ack_eliciting = Some(sent_packet.time_sent()); self.in_flight_outstanding += 1; } else if self.space != PacketNumberSpace::ApplicationData && self.last_ack_eliciting.is_none() { // For Initial and Handshake spaces, make sure that we have a PTO baseline // always. See `LossRecoverySpace::pto_base_time()` for details. - self.last_ack_eliciting = Some(sent_packet.time_sent); + self.last_ack_eliciting = Some(sent_packet.time_sent()); } - self.sent_packets.insert(sent_packet.pn, sent_packet); + self.sent_packets.track(sent_packet); } /// If we are only sending ACK frames, send a PING frame after 2 PTOs so that @@ -285,56 +237,42 @@ impl LossRecoverySpace { .map_or(false, |t| now > t + (pto * n_pto)) } + fn remove_outstanding(&mut self, count: usize) { + debug_assert!(self.in_flight_outstanding >= count); + self.in_flight_outstanding -= count; + if self.in_flight_outstanding == 0 { + qtrace!("remove_packet outstanding == 0 for space {}", self.space); + } + } + fn remove_packet(&mut self, p: &SentPacket) { if p.ack_eliciting() { - debug_assert!(self.in_flight_outstanding > 0); - self.in_flight_outstanding -= 1; - if self.in_flight_outstanding == 0 { - qtrace!("remove_packet outstanding == 0 for space {}", self.space); - } + self.remove_outstanding(1); } } - /// Remove all acknowledged packets. + /// Remove all newly acknowledged packets. /// Returns all the acknowledged packets, with the largest packet number first. /// ...and a boolean indicating if any of those packets were ack-eliciting. /// This operates more efficiently because it assumes that the input is sorted /// in the order that an ACK frame is (from the top). fn remove_acked(&mut self, acked_ranges: R, stats: &mut Stats) -> (Vec, bool) where - R: IntoIterator>, + R: IntoIterator>, R::IntoIter: ExactSizeIterator, { - let acked_ranges = acked_ranges.into_iter(); - let mut keep = Vec::with_capacity(acked_ranges.len()); - - let mut acked = Vec::new(); + let acked = self.sent_packets.take_ranges(acked_ranges); let mut eliciting = false; - for range in acked_ranges { - let first_keep = *range.end() + 1; - if let Some((&first, _)) = self.sent_packets.range(range).next() { - let mut tail = self.sent_packets.split_off(&first); - if let Some((&next, _)) = tail.range(first_keep..).next() { - keep.push(tail.split_off(&next)); - } - for (_, p) in tail.into_iter().rev() { - self.remove_packet(&p); - eliciting |= p.ack_eliciting(); - if p.lost() { - stats.late_ack += 1; - } - if p.pto_fired() { - stats.pto_ack += 1; - } - acked.push(p); - } + for p in &acked { + self.remove_packet(p); + eliciting |= p.ack_eliciting(); + if p.lost() { + stats.late_ack += 1; + } + if p.pto_fired() { + stats.pto_ack += 1; } } - - for mut k in keep.into_iter().rev() { - self.sent_packets.append(&mut k); - } - (acked, eliciting) } @@ -343,12 +281,12 @@ impl LossRecoverySpace { /// and when keys are dropped. fn remove_ignored(&mut self) -> impl Iterator { self.in_flight_outstanding = 0; - mem::take(&mut self.sent_packets).into_values() + std::mem::take(&mut self.sent_packets).drain_all() } /// Remove the primary path marking on any packets this is tracking. fn migrate(&mut self) { - for pkt in self.sent_packets.values_mut() { + for pkt in self.sent_packets.iter_mut() { pkt.clear_primary_path(); } } @@ -357,26 +295,9 @@ impl LossRecoverySpace { /// We try to keep these around until a probe is sent for them, so it is /// important that `cd` is set to at least the current PTO time; otherwise we /// might remove all in-flight packets and stop sending probes. - #[allow(clippy::option_if_let_else)] // Hard enough to read as-is. fn remove_old_lost(&mut self, now: Instant, cd: Duration) { - let mut it = self.sent_packets.iter(); - // If the first item is not expired, do nothing. - if it.next().map_or(false, |(_, p)| p.expired(now, cd)) { - // Find the index of the first unexpired packet. - let to_remove = if let Some(first_keep) = - it.find_map(|(i, p)| if p.expired(now, cd) { None } else { Some(*i) }) - { - // Some packets haven't expired, so keep those. - let keep = self.sent_packets.split_off(&first_keep); - mem::replace(&mut self.sent_packets, keep) - } else { - // All packets are expired. - mem::take(&mut self.sent_packets) - }; - for (_, p) in to_remove { - self.remove_packet(&p); - } - } + let removed = self.sent_packets.remove_expired(now, cd); + self.remove_outstanding(removed); } /// Detect lost packets. @@ -402,44 +323,39 @@ impl LossRecoverySpace { let largest_acked = self.largest_acked; - // Lost for retrans/CC purposes - let mut lost_pns = SmallVec::<[_; 8]>::new(); - - for (pn, packet) in self + for packet in self .sent_packets .iter_mut() // BTreeMap iterates in order of ascending PN - .take_while(|(&k, _)| k < largest_acked.unwrap_or(PacketNumber::MAX)) + .take_while(|p| p.pn() < largest_acked.unwrap_or(PacketNumber::MAX)) { // Packets sent before now - loss_delay are deemed lost. - if packet.time_sent + loss_delay <= now { + if packet.time_sent() + loss_delay <= now { qtrace!( "lost={}, time sent {:?} is before lost_delay {:?}", - pn, - packet.time_sent, + packet.pn(), + packet.time_sent(), loss_delay ); - } else if largest_acked >= Some(*pn + PACKET_THRESHOLD) { + } else if largest_acked >= Some(packet.pn() + PACKET_THRESHOLD) { qtrace!( "lost={}, is >= {} from largest acked {:?}", - pn, + packet.pn(), PACKET_THRESHOLD, largest_acked ); } else { if largest_acked.is_some() { - self.first_ooo_time = Some(packet.time_sent); + self.first_ooo_time = Some(packet.time_sent()); } // No more packets can be declared lost after this one. break; }; if packet.declare_lost(now) { - lost_pns.push(*pn); + lost_packets.push(packet.clone()); } } - - lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone())); } } @@ -629,8 +545,8 @@ impl LossRecovery { } pub fn on_packet_sent(&mut self, path: &PathRef, mut sent_packet: SentPacket) { - let pn_space = PacketNumberSpace::from(sent_packet.pt); - qdebug!([self], "packet {}-{} sent", pn_space, sent_packet.pn); + let pn_space = PacketNumberSpace::from(sent_packet.packet_type()); + qdebug!([self], "packet {}-{} sent", pn_space, sent_packet.pn()); if let Some(space) = self.spaces.get_mut(pn_space) { path.borrow_mut().packet_sent(&mut sent_packet); space.on_packet_sent(sent_packet); @@ -639,7 +555,7 @@ impl LossRecovery { [self], "ignoring {}-{} from dropped space", pn_space, - sent_packet.pn + sent_packet.pn() ); } } @@ -671,14 +587,14 @@ impl LossRecovery { &mut self, primary_path: &PathRef, pn_space: PacketNumberSpace, - largest_acked: u64, + largest_acked: PacketNumber, acked_ranges: R, ack_ecn: Option, ack_delay: Duration, now: Instant, ) -> (Vec, Vec) where - R: IntoIterator>, + R: IntoIterator>, R::IntoIter: ExactSizeIterator, { qdebug!( @@ -707,11 +623,11 @@ impl LossRecovery { // If the largest acknowledged is newly acked and any newly acked // packet was ack-eliciting, update the RTT. (-recovery 5.1) - space.largest_acked_sent_time = Some(largest_acked_pkt.time_sent); + space.largest_acked_sent_time = Some(largest_acked_pkt.time_sent()); if any_ack_eliciting && largest_acked_pkt.on_primary_path() { self.rtt_sample( primary_path.borrow_mut().rtt_mut(), - largest_acked_pkt.time_sent, + largest_acked_pkt.time_sent(), now, ack_delay, ); @@ -1019,6 +935,7 @@ impl ::std::fmt::Display for LossRecovery { mod tests { use std::{ cell::RefCell, + convert::TryInto, ops::{Deref, DerefMut, RangeInclusive}, rc::Rc, time::{Duration, Instant}, @@ -1034,7 +951,7 @@ mod tests { cc::CongestionControlAlgorithm, cid::{ConnectionId, ConnectionIdEntry}, ecn::EcnCount, - packet::PacketType, + packet::{PacketNumber, PacketType}, path::{Path, PathRef}, rtt::RttEstimate, stats::{Stats, StatsCell}, @@ -1061,8 +978,8 @@ mod tests { pub fn on_ack_received( &mut self, pn_space: PacketNumberSpace, - largest_acked: u64, - acked_ranges: Vec>, + largest_acked: PacketNumber, + acked_ranges: Vec>, ack_ecn: Option, ack_delay: Duration, now: Instant, @@ -1235,8 +1152,8 @@ mod tests { ); } - fn add_sent(lrs: &mut LossRecoverySpace, packet_numbers: &[u64]) { - for &pn in packet_numbers { + fn add_sent(lrs: &mut LossRecoverySpace, max_pn: PacketNumber) { + for pn in 0..=max_pn { lrs.on_packet_sent(SentPacket::new( PacketType::Short, pn, @@ -1249,15 +1166,18 @@ mod tests { } } - fn match_acked(acked: &[SentPacket], expected: &[u64]) { - assert!(acked.iter().map(|p| &p.pn).eq(expected)); + fn match_acked(acked: &[SentPacket], expected: &[PacketNumber]) { + assert_eq!( + acked.iter().map(SentPacket::pn).collect::>(), + expected + ); } #[test] fn remove_acked() { let mut lrs = LossRecoverySpace::new(PacketNumberSpace::ApplicationData); let mut stats = Stats::default(); - add_sent(&mut lrs, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + add_sent(&mut lrs, 10); let (acked, _) = lrs.remove_acked(vec![], &mut stats); assert!(acked.is_empty()); let (acked, _) = lrs.remove_acked(vec![7..=8, 2..=4], &mut stats); @@ -1265,7 +1185,7 @@ mod tests { let (acked, _) = lrs.remove_acked(vec![8..=11], &mut stats); match_acked(&acked, &[10, 9]); let (acked, _) = lrs.remove_acked(vec![0..=2], &mut stats); - match_acked(&acked, &[1]); + match_acked(&acked, &[1, 0]); let (acked, _) = lrs.remove_acked(vec![5..=6], &mut stats); match_acked(&acked, &[6, 5]); } @@ -1517,7 +1437,7 @@ mod tests { Vec::new(), ON_SENT_SIZE, ); - let pn_space = PacketNumberSpace::from(sent_pkt.pt); + let pn_space = PacketNumberSpace::from(sent_pkt.packet_type()); lr.on_packet_sent(sent_pkt); lr.on_ack_received( pn_space, @@ -1630,7 +1550,7 @@ mod tests { lr.on_packet_sent(SentPacket::new( PacketType::Initial, - 1, + 0, IpTosEcn::default(), now(), true, diff --git a/neqo-transport/src/recovery/sent.rs b/neqo-transport/src/recovery/sent.rs new file mode 100644 index 0000000000..1a7be9dd4a --- /dev/null +++ b/neqo-transport/src/recovery/sent.rs @@ -0,0 +1,379 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// A collection for sent packets. + +use std::{ + collections::BTreeMap, + ops::RangeInclusive, + time::{Duration, Instant}, +}; + +use neqo_common::IpTosEcn; + +use crate::{ + packet::{PacketNumber, PacketType}, + recovery::RecoveryToken, +}; + +#[derive(Debug, Clone)] +pub struct SentPacket { + pt: PacketType, + pn: PacketNumber, + ecn_mark: IpTosEcn, + ack_eliciting: bool, + time_sent: Instant, + primary_path: bool, + tokens: Vec, + + time_declared_lost: Option, + /// After a PTO, this is true when the packet has been released. + pto: bool, + + len: usize, +} + +impl SentPacket { + pub fn new( + pt: PacketType, + pn: PacketNumber, + ecn_mark: IpTosEcn, + time_sent: Instant, + ack_eliciting: bool, + tokens: Vec, + len: usize, + ) -> Self { + Self { + pt, + pn, + ecn_mark, + time_sent, + ack_eliciting, + primary_path: true, + tokens, + time_declared_lost: None, + pto: false, + len, + } + } + + /// The type of this packet. + pub fn packet_type(&self) -> PacketType { + self.pt + } + + /// The number of the packet. + pub fn pn(&self) -> PacketNumber { + self.pn + } + + /// The ECN mark of the packet. + pub fn ecn_mark(&self) -> IpTosEcn { + self.ecn_mark + } + + /// The time that this packet was sent. + pub fn time_sent(&self) -> Instant { + self.time_sent + } + + /// Returns `true` if the packet will elicit an ACK. + pub fn ack_eliciting(&self) -> bool { + self.ack_eliciting + } + + /// Returns `true` if the packet was sent on the primary path. + pub fn on_primary_path(&self) -> bool { + self.primary_path + } + + /// The length of the packet that was sent. + pub fn len(&self) -> usize { + self.len + } + + /// Access the recovery tokens that this holds. + pub fn tokens(&self) -> &[RecoveryToken] { + &self.tokens + } + + /// Clears the flag that had this packet on the primary path. + /// Used when migrating to clear out state. + pub fn clear_primary_path(&mut self) { + self.primary_path = false; + } + + /// For Initial packets, it is possible that the packet builder needs to amend the length. + pub fn track_padding(&mut self, padding: usize) { + debug_assert_eq!(self.pt, PacketType::Initial); + self.len += padding; + } + + /// Whether the packet has been declared lost. + pub fn lost(&self) -> bool { + self.time_declared_lost.is_some() + } + + /// Whether accounting for the loss or acknowledgement in the + /// congestion controller is pending. + /// Returns `true` if the packet counts as being "in flight", + /// and has not previously been declared lost. + /// Note that this should count packets that contain only ACK and PADDING, + /// but we don't send PADDING, so we don't track that. + pub fn cc_outstanding(&self) -> bool { + self.ack_eliciting() && self.on_primary_path() && !self.lost() + } + + /// Whether the packet should be tracked as in-flight. + pub fn cc_in_flight(&self) -> bool { + self.ack_eliciting() && self.on_primary_path() + } + + /// Declare the packet as lost. Returns `true` if this is the first time. + pub fn declare_lost(&mut self, now: Instant) -> bool { + if self.lost() { + false + } else { + self.time_declared_lost = Some(now); + true + } + } + + /// Ask whether this tracked packet has been declared lost for long enough + /// that it can be expired and no longer tracked. + pub fn expired(&self, now: Instant, expiration_period: Duration) -> bool { + self.time_declared_lost + .map_or(false, |loss_time| (loss_time + expiration_period) <= now) + } + + /// Whether the packet contents were cleared out after a PTO. + pub fn pto_fired(&self) -> bool { + self.pto + } + + /// On PTO, we need to get the recovery tokens so that we can ensure that + /// the frames we sent can be sent again in the PTO packet(s). Do that just once. + pub fn pto(&mut self) -> bool { + if self.pto || self.lost() { + false + } else { + self.pto = true; + true + } + } +} + +/// A collection for packets that we have sent that haven't been acknowledged. +#[derive(Debug, Default)] +pub struct SentPackets { + /// The collection. + packets: BTreeMap, +} + +impl SentPackets { + pub fn len(&self) -> usize { + self.packets.len() + } + + pub fn track(&mut self, packet: SentPacket) { + self.packets.insert(packet.pn, packet); + } + + pub fn iter_mut(&mut self) -> impl Iterator { + self.packets.values_mut() + } + + /// Take values from a specified ranges of packet numbers. + /// The values returned will be reversed, so that the most recent packet appears first. + /// This is because ACK frames arrive with ranges starting from the largest acknowledged + /// and we want to match that. + pub fn take_ranges(&mut self, acked_ranges: R) -> Vec + where + R: IntoIterator>, + R::IntoIter: ExactSizeIterator, + { + let mut result = Vec::new(); + // Remove all packets. We will add them back as we don't need them. + let mut packets = std::mem::take(&mut self.packets); + for range in acked_ranges { + // For each acked range, split off the acknowledged part, + // then split off the part that hasn't been acknowledged. + // This order works better when processing ranges that + // have already been processed, which is common. + let mut acked = packets.split_off(range.start()); + let keep = acked.split_off(&(*range.end() + 1)); + self.packets.extend(keep); + result.extend(acked.into_values().rev()); + } + self.packets.extend(packets); + result + } + + /// Empty out the packets, but keep the offset. + pub fn drain_all(&mut self) -> impl Iterator { + std::mem::take(&mut self.packets).into_values() + } + + /// See `LossRecoverySpace::remove_old_lost` for details on `now` and `cd`. + /// Returns the number of ack-eliciting packets removed. + pub fn remove_expired(&mut self, now: Instant, cd: Duration) -> usize { + let mut it = self.packets.iter(); + // If the first item is not expired, do nothing (the most common case). + if it.next().map_or(false, |(_, p)| p.expired(now, cd)) { + // Find the index of the first unexpired packet. + let to_remove = if let Some(first_keep) = + it.find_map(|(i, p)| if p.expired(now, cd) { None } else { Some(*i) }) + { + // Some packets haven't expired, so keep those. + let keep = self.packets.split_off(&first_keep); + std::mem::replace(&mut self.packets, keep) + } else { + // All packets are expired. + std::mem::take(&mut self.packets) + }; + to_remove + .into_values() + .filter(SentPacket::ack_eliciting) + .count() + } else { + 0 + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + cell::OnceCell, + convert::TryFrom, + time::{Duration, Instant}, + }; + + use neqo_common::IpTosEcn; + + use super::{SentPacket, SentPackets}; + use crate::packet::{PacketNumber, PacketType}; + + const PACKET_GAP: Duration = Duration::from_secs(1); + fn start_time() -> Instant { + thread_local!(static STARTING_TIME: OnceCell = const { OnceCell::new() }); + STARTING_TIME.with(|t| *t.get_or_init(Instant::now)) + } + + fn pkt(n: u32) -> SentPacket { + SentPacket::new( + PacketType::Short, + PacketNumber::from(n), + IpTosEcn::default(), + start_time() + (PACKET_GAP * n), + true, + Vec::new(), + 100, + ) + } + + fn pkts() -> SentPackets { + let mut pkts = SentPackets::default(); + pkts.track(pkt(0)); + pkts.track(pkt(1)); + pkts.track(pkt(2)); + assert_eq!(pkts.len(), 3); + pkts + } + + trait HasPacketNumber { + fn pn(&self) -> PacketNumber; + } + impl HasPacketNumber for SentPacket { + fn pn(&self) -> PacketNumber { + self.pn + } + } + impl HasPacketNumber for &'_ SentPacket { + fn pn(&self) -> PacketNumber { + self.pn + } + } + impl HasPacketNumber for &'_ mut SentPacket { + fn pn(&self) -> PacketNumber { + self.pn + } + } + + fn remove_one(pkts: &mut SentPackets, idx: PacketNumber) { + assert_eq!(pkts.len(), 3); + let store = pkts.take_ranges([idx..=idx]); + let mut it = store.into_iter(); + assert_eq!(idx, it.next().unwrap().pn()); + assert!(it.next().is_none()); + std::mem::drop(it); + assert_eq!(pkts.len(), 2); + } + + fn assert_zero_and_two<'a, 'b: 'a>( + mut it: impl Iterator + 'a, + ) { + assert_eq!(it.next().unwrap().pn(), 0); + assert_eq!(it.next().unwrap().pn(), 2); + assert!(it.next().is_none()); + } + + #[test] + fn iterate_skipped() { + let mut pkts = pkts(); + for (i, p) in pkts.packets.values().enumerate() { + assert_eq!(i, usize::try_from(p.pn).unwrap()); + } + remove_one(&mut pkts, 1); + + // Validate the merged result multiple ways. + assert_zero_and_two(pkts.iter_mut()); + + { + // Reverse the expectations here as this iterator reverses its output. + let store = pkts.take_ranges([0..=2]); + let mut it = store.into_iter(); + assert_eq!(it.next().unwrap().pn(), 2); + assert_eq!(it.next().unwrap().pn(), 0); + assert!(it.next().is_none()); + }; + + // The None values are still there in this case, so offset is 0. + assert_eq!(pkts.packets.len(), 0); + assert_eq!(pkts.len(), 0); + } + + #[test] + fn drain() { + let mut pkts = pkts(); + remove_one(&mut pkts, 1); + + assert_zero_and_two(pkts.drain_all()); + assert_eq!(pkts.len(), 0); + } + + #[test] + fn remove_expired() { + let mut pkts = pkts(); + remove_one(&mut pkts, 0); + + for p in pkts.iter_mut() { + p.declare_lost(p.time_sent); // just to keep things simple. + } + + // Expire up to pkt(1). + let count = pkts.remove_expired(start_time() + PACKET_GAP, Duration::new(0, 0)); + assert_eq!(count, 1); + assert_eq!(pkts.len(), 1); + } + + #[test] + fn first_skipped_ok() { + let mut pkts = SentPackets::default(); + pkts.track(pkt(4)); // This is fine. + assert_eq!(pkts.len(), 1); + } +} diff --git a/neqo-transport/src/recovery/token.rs b/neqo-transport/src/recovery/token.rs new file mode 100644 index 0000000000..93f84268cd --- /dev/null +++ b/neqo-transport/src/recovery/token.rs @@ -0,0 +1,63 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::{ + ackrate::AckRate, + cid::ConnectionIdEntry, + crypto::CryptoRecoveryToken, + quic_datagrams::DatagramTracking, + send_stream::SendStreamRecoveryToken, + stream_id::{StreamId, StreamType}, + tracking::AckToken, +}; + +#[derive(Debug, Clone)] +#[allow(clippy::module_name_repetitions)] +pub enum StreamRecoveryToken { + Stream(SendStreamRecoveryToken), + ResetStream { + stream_id: StreamId, + }, + StopSending { + stream_id: StreamId, + }, + + MaxData(u64), + DataBlocked(u64), + + MaxStreamData { + stream_id: StreamId, + max_data: u64, + }, + StreamDataBlocked { + stream_id: StreamId, + limit: u64, + }, + + MaxStreams { + stream_type: StreamType, + max_streams: u64, + }, + StreamsBlocked { + stream_type: StreamType, + limit: u64, + }, +} + +#[derive(Debug, Clone)] +#[allow(clippy::module_name_repetitions)] +pub enum RecoveryToken { + Stream(StreamRecoveryToken), + Ack(AckToken), + Crypto(CryptoRecoveryToken), + HandshakeDone, + KeepAlive, // Special PING. + NewToken(usize), + NewConnectionId(ConnectionIdEntry<[u8; 16]>), + RetireConnectionId(u64), + AckFrequency(AckRate), + Datagram(DatagramTracking), +} diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index abb14d0a25..22abef4dc7 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -18,8 +18,8 @@ use neqo_common::qlog::NeqoQlog; use crate::{ cc::{ClassicCongestionControl, CongestionControl, CongestionControlAlgorithm, Cubic, NewReno}, pace::Pacer, + recovery::SentPacket, rtt::RttEstimate, - tracking::SentPacket, }; /// The number of packets we allow to burst from the pacer. @@ -114,7 +114,7 @@ impl PacketSender { pub fn on_packet_sent(&mut self, pkt: &SentPacket, rtt: Duration) { self.pacer - .spend(pkt.time_sent, rtt, self.cc.cwnd(), pkt.size); + .spend(pkt.time_sent(), rtt, self.cc.cwnd(), pkt.len()); self.cc.on_packet_sent(pkt); } diff --git a/neqo-transport/src/tracking.rs b/neqo-transport/src/tracking.rs index 6643d516e3..7c97b55f27 100644 --- a/neqo-transport/src/tracking.rs +++ b/neqo-transport/src/tracking.rs @@ -133,117 +133,6 @@ impl std::fmt::Debug for PacketNumberSpaceSet { } } -#[derive(Debug, Clone)] -pub struct SentPacket { - pub pt: PacketType, - pub pn: PacketNumber, - pub ecn_mark: IpTosEcn, - ack_eliciting: bool, - pub time_sent: Instant, - primary_path: bool, - pub tokens: Vec, - - time_declared_lost: Option, - /// After a PTO, this is true when the packet has been released. - pto: bool, - - pub size: usize, -} - -impl SentPacket { - pub fn new( - pt: PacketType, - pn: PacketNumber, - ecn_mark: IpTosEcn, - time_sent: Instant, - ack_eliciting: bool, - tokens: Vec, - size: usize, - ) -> Self { - Self { - pt, - pn, - ecn_mark, - time_sent, - ack_eliciting, - primary_path: true, - tokens, - time_declared_lost: None, - pto: false, - size, - } - } - - /// Returns `true` if the packet will elicit an ACK. - pub fn ack_eliciting(&self) -> bool { - self.ack_eliciting - } - - /// Returns `true` if the packet was sent on the primary path. - pub fn on_primary_path(&self) -> bool { - self.primary_path - } - - /// Clears the flag that had this packet on the primary path. - /// Used when migrating to clear out state. - pub fn clear_primary_path(&mut self) { - self.primary_path = false; - } - - /// Whether the packet has been declared lost. - pub fn lost(&self) -> bool { - self.time_declared_lost.is_some() - } - - /// Whether accounting for the loss or acknowledgement in the - /// congestion controller is pending. - /// Returns `true` if the packet counts as being "in flight", - /// and has not previously been declared lost. - /// Note that this should count packets that contain only ACK and PADDING, - /// but we don't send PADDING, so we don't track that. - pub fn cc_outstanding(&self) -> bool { - self.ack_eliciting() && self.on_primary_path() && !self.lost() - } - - /// Whether the packet should be tracked as in-flight. - pub fn cc_in_flight(&self) -> bool { - self.ack_eliciting() && self.on_primary_path() - } - - /// Declare the packet as lost. Returns `true` if this is the first time. - pub fn declare_lost(&mut self, now: Instant) -> bool { - if self.lost() { - false - } else { - self.time_declared_lost = Some(now); - true - } - } - - /// Ask whether this tracked packet has been declared lost for long enough - /// that it can be expired and no longer tracked. - pub fn expired(&self, now: Instant, expiration_period: Duration) -> bool { - self.time_declared_lost - .map_or(false, |loss_time| (loss_time + expiration_period) <= now) - } - - /// Whether the packet contents were cleared out after a PTO. - pub fn pto_fired(&self) -> bool { - self.pto - } - - /// On PTO, we need to get the recovery tokens so that we can ensure that - /// the frames we sent can be sent again in the PTO packet(s). Do that just once. - pub fn pto(&mut self) -> bool { - if self.pto || self.lost() { - false - } else { - self.pto = true; - true - } - } -} - impl std::fmt::Display for PacketNumberSpace { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.write_str(match self {