diff --git a/.github/actions/nss/action.yml b/.github/actions/nss/action.yml index b8f7470f38..19d61ed392 100644 --- a/.github/actions/nss/action.yml +++ b/.github/actions/nss/action.yml @@ -33,18 +33,18 @@ runs: run: | if ! command -v pkg-config &> /dev/null; then echo "pkg-config: not found" - echo "BUILD_NSS=1" >> "$GITHUB_ENV" + echo "USE_SYSTEM_NSS=0" >> "$GITHUB_ENV" exit 0 fi if ! pkg-config --exists nss; then echo "pkg-config: NSS not found" - echo "BUILD_NSS=1" >> "$GITHUB_ENV" + echo "USE_SYSTEM_NSS=0" >> "$GITHUB_ENV" exit 0 fi NSS_VERSION="$(pkg-config --modversion nss)" if [ "$?" -ne 0 ]; then echo "pkg-config: failed to determine NSS version" - echo "BUILD_NSS=1" >> "$GITHUB_ENV" + echo "USE_SYSTEM_NSS=0" >> "$GITHUB_ENV" exit 0 fi NSS_MAJOR=$(echo "$NSS_VERSION" | cut -d. -f1) @@ -53,11 +53,11 @@ runs: REQ_NSS_MINOR=$(echo "${{ inputs.minimum-version}}" | cut -d. -f2) if [[ "$NSS_MAJOR" -lt "$REQ_NSS_MAJOR" || "$NSS_MAJOR" -eq "$REQ_NSS_MAJOR" && "$NSS_MINOR" -lt "$REQ_NSS_MINOR" ]]; then echo "System NSS is too old: $NSS_VERSION" - echo "BUILD_NSS=1" >> "$GITHUB_ENV" + echo "USE_SYSTEM_NSS=0" >> "$GITHUB_ENV" exit 0 fi echo "System NSS is suitable: $NSS_VERSION" - echo "BUILD_NSS=0" >> "$GITHUB_ENV" + echo "USE_SYSTEM_NSS=1" >> "$GITHUB_ENV" - name: Use sccache # Apparently the action can't be installed twice in the same workflow, so check if @@ -66,11 +66,11 @@ runs: # # Also, only enable sscache on our self-hosted runner, because the GitHub cache limit # is too small for this to be effective there. - if: env.SCCACHE_ENABLED != '1' && env.BUILD_NSS == '1' && runner.environment != 'github-hosted' + if: env.SCCACHE_ENABLED != '1' && env.USE_SYSTEM_NSS == '0' && runner.environment != 'github-hosted' uses: mozilla-actions/sccache-action@2e7f9ec7921547d4b46598398ca573513895d0bd # v0.0.4 - name: Enable sscache - if: env.BUILD_NSS == '1' && runner.environment != 'github-hosted' + if: env.USE_SYSTEM_NSS == '0' && runner.environment != 'github-hosted' shell: bash run: | echo "SCCACHE_ENABLED=1" >> "$GITHUB_ENV" @@ -86,21 +86,21 @@ runs: fi - name: Checkout NSS - if: env.BUILD_NSS == '1' + if: env.USE_SYSTEM_NSS == '0' uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: repository: nss-dev/nss path: nss - name: Checkout NSPR - if: env.BUILD_NSS == '1' + if: env.USE_SYSTEM_NSS == '0' uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: repository: nss-dev/nspr path: nspr - name: Get head revisions - if: env.BUILD_NSS == '1' + if: env.USE_SYSTEM_NSS == '0' shell: bash run: | NSS_HEAD=$(git -C nss rev-parse HEAD) @@ -110,21 +110,22 @@ runs: - name: Cache NSS id: cache - if: env.BUILD_NSS == '1' && runner.environment == 'github-hosted' + if: env.USE_SYSTEM_NSS == '0' && runner.environment == 'github-hosted' uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2 with: path: dist key: nss-${{ runner.os }}-${{ inputs.type }}-${{ env.NSS_HEAD }}-${{ env.NSPR_HEAD }} - name: Check if build is needed - if: env.BUILD_NSS == '1' && runner.environment == 'github-hosted' + if: env.USE_SYSTEM_NSS == '0' shell: bash run: | - if [ "${{ steps.cache.outputs.cache-hit }}" == "true" ]; then + if [ "${{ runner.environment }}" != "github-hosted" ] || [ "${{ steps.cache.outputs.cache-hit }}" == "false" ]; then + echo "Building NSS from source" + echo "BUILD_NSS=1" >> "$GITHUB_ENV" + else echo "Using cached prebuilt NSS" echo "BUILD_NSS=0" >> "$GITHUB_ENV" - else - echo "Building NSS from source" fi - name: Install build dependencies (Linux) @@ -176,6 +177,7 @@ runs: - name: Set up environment shell: bash + if: env.USE_SYSTEM_NSS == '0' run: | NSS_TARGET="${{ inputs.type }}" echo "NSS_TARGET=$NSS_TARGET" >> "$GITHUB_ENV" @@ -187,7 +189,6 @@ runs: echo "NSS_PREBUILT=1" >> "$GITHUB_ENV" env: NSS_DIR: ${{ github.workspace }}/nss - NSPR_DIR: ${{ github.workspace }}/nspr - name: Build shell: bash diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 490c7047c4..db4acd1336 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -8,6 +8,12 @@ on: paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] merge_group: workflow_dispatch: + inputs: + run_benchmarks: + description: 'Run benchmarks' + type: boolean + required: false + default: false env: CARGO_TERM_COLOR: always RUST_BACKTRACE: 1 @@ -99,4 +105,8 @@ jobs: bench: needs: [check] + if: > + (github.event_name == 'workflow_dispatch' && github.event.inputs.run_benchmarks) || + (github.event_name == 'pull_request' && !github.event.pull_request.draft) || + (github.event_name != 'workflow_dispatch' && github.event_name != 'pull_request') uses: ./.github/workflows/bench.yml diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index e0ffa067de..b9208c6626 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -55,7 +55,7 @@ jobs: # set latest tag for default branch type=raw,value=latest,enable={{is_default_branch}} - - uses: docker/build-push-action@5cd11c3a4ced054e52742c5fd54dca954e0edd85 # v6.7.0 + - uses: docker/build-push-action@32945a339266b759abcbdc89316275140b0fc960 # v6.8.0 if: github.event_name != 'pull_request' with: push: true @@ -66,7 +66,7 @@ jobs: cache-to: type=gha,mode=max platforms: 'linux/amd64, linux/arm64' - - uses: docker/build-push-action@5cd11c3a4ced054e52742c5fd54dca954e0edd85 # v6.7.0 + - uses: docker/build-push-action@32945a339266b759abcbdc89316275140b0fc960 # v6.8.0 id: docker_build_and_push with: tags: ${{ steps.meta.outputs.tags }} diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index 25840e91a6..1d5f6f7329 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -2620,7 +2620,7 @@ mod tests { force_idle(&mut client, &mut server); let idle_timeout = ConnectionParameters::default().get_idle_timeout(); - assert_eq!(client.process_output(now()).callback(), idle_timeout); + assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); } // Helper function: read response when a server sends HTTP_RESPONSE_2. @@ -5114,7 +5114,7 @@ mod tests { assert!(!fin); force_idle(&mut client, &mut server); - assert_eq!(client.process_output(now()).callback(), idle_timeout); + assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); } #[test] diff --git a/neqo-transport/src/connection/idle.rs b/neqo-transport/src/connection/idle.rs index 5aaf1cb4d7..c5b570a09c 100644 --- a/neqo-transport/src/connection/idle.rs +++ b/neqo-transport/src/connection/idle.rs @@ -96,6 +96,21 @@ impl IdleTimeout { self.start(now) + max(self.timeout / 2, pto) } + pub fn next_keep_alive(&self, now: Instant, pto: Duration) -> Option { + if self.keep_alive_outstanding { + return None; + } + + let timeout = self.keep_alive_timeout(now, pto); + // Timer is in the past, i.e. we should have sent a keep alive, + // but we were unable to, e.g. due to CC. + if timeout <= now { + return None; + } + + Some(timeout) + } + pub fn send_keep_alive( &mut self, now: Instant, diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index d8d9db2422..156c7de815 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -629,13 +629,17 @@ impl Connection { .unwrap() } + fn confirmed(&self) -> bool { + self.state == State::Confirmed + } + /// Get the simplest PTO calculation for all those cases where we need /// a value of this approximate order. Don't use this for loss recovery, /// only use it where a more precise value is not important. fn pto(&self) -> Duration { self.paths.primary().map_or_else( - || RttEstimate::default().pto(PacketNumberSpace::ApplicationData), - |p| p.borrow().rtt().pto(PacketNumberSpace::ApplicationData), + || RttEstimate::default().pto(self.confirmed()), + |p| p.borrow().rtt().pto(self.confirmed()), ) } @@ -1049,7 +1053,7 @@ impl Connection { return timeout.duration_since(now); } - let mut delays = SmallVec::<[_; 6]>::new(); + let mut delays = SmallVec::<[_; 7]>::new(); if let Some(ack_time) = self.acks.ack_time(now) { qtrace!([self], "Delayed ACK timer {:?}", ack_time); delays.push(ack_time); @@ -1058,12 +1062,19 @@ impl Connection { if let Some(p) = self.paths.primary() { let path = p.borrow(); let rtt = path.rtt(); - let pto = rtt.pto(PacketNumberSpace::ApplicationData); + let pto = rtt.pto(self.confirmed()); let idle_time = self.idle_timeout.expiry(now, pto); - qtrace!([self], "Idle/keepalive timer {:?}", idle_time); + qtrace!([self], "Idle timer {:?}", idle_time); delays.push(idle_time); + if self.streams.need_keep_alive() { + if let Some(keep_alive_time) = self.idle_timeout.next_keep_alive(now, pto) { + qtrace!([self], "Keep alive timer {:?}", keep_alive_time); + delays.push(keep_alive_time); + } + } + if let Some(lr_time) = self.loss_recovery.next_timeout(&path) { qtrace!([self], "Loss recovery timer {:?}", lr_time); delays.push(lr_time); @@ -1525,7 +1536,7 @@ impl Connection { let mut dcid = None; qtrace!([self], "{} input {}", path.borrow(), hex(&**d)); - let pto = path.borrow().rtt().pto(PacketNumberSpace::ApplicationData); + let pto = path.borrow().rtt().pto(self.confirmed()); // Handle each packet in the datagram. while !slc.is_empty() { @@ -2018,7 +2029,10 @@ impl Connection { // Count how many bytes in this range are non-zero. let pn_len = mem::size_of::() - usize::try_from(unacked_range.leading_zeros() / 8).unwrap(); - // pn_len can't be zero (unacked_range is > 0) + assert!( + pn_len > 0, + "pn_len can't be zero as unacked_range should be > 0, pn {pn}, largest_acknowledged {largest_acknowledged:?}, tx {tx}" + ); // TODO(mt) also use `4*path CWND/path MTU` to set a minimum length. builder.pn(pn, pn_len); pn @@ -2138,7 +2152,7 @@ impl Connection { // or the PTO timer fired: probe. true } else { - let pto = path.borrow().rtt().pto(PacketNumberSpace::ApplicationData); + let pto = path.borrow().rtt().pto(self.confirmed()); if !builder.packet_empty() { // The packet only contains an ACK. Check whether we want to // force an ACK with a PING so we can stop tracking packets. @@ -3425,7 +3439,7 @@ impl Connection { }; let path = self.paths.primary().ok_or(Error::NotAvailable)?; let mtu = path.borrow().plpmtu(); - let encoder = Encoder::with_capacity(mtu); + let encoder = Encoder::default(); let (_, mut builder) = Self::build_packet_header( &path.borrow(), diff --git a/neqo-transport/src/connection/tests/idle.rs b/neqo-transport/src/connection/tests/idle.rs index 8677d7f5d5..dfb59235c8 100644 --- a/neqo-transport/src/connection/tests/idle.rs +++ b/neqo-transport/src/connection/tests/idle.rs @@ -30,6 +30,10 @@ fn default_timeout() -> Duration { ConnectionParameters::default().get_idle_timeout() } +fn keep_alive_timeout() -> Duration { + default_timeout() / 2 +} + fn test_idle_timeout(client: &mut Connection, server: &mut Connection, timeout: Duration) { assert!(timeout > Duration::from_secs(1)); connect_force_idle(client, server); @@ -412,11 +416,12 @@ fn keep_alive_initiator() { let stream = create_stream_idle(&mut server, &mut client); let mut now = now(); + // Marking the stream for keep-alive changes the idle timeout. server.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut server, now, default_timeout()); + assert_idle(&mut server, now, keep_alive_timeout()); // Wait that long and the server should send a PING frame. - now += default_timeout() / 2; + now += keep_alive_timeout(); let pings_before = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); assert!(ping.is_some()); @@ -427,9 +432,9 @@ fn keep_alive_initiator() { let out = server.process(out.as_ref(), now).dgram(); assert!(client.process(out.as_ref(), now).dgram().is_none()); - // Check that there will be next keep-alive ping after default_timeout(). - assert_idle(&mut server, now, default_timeout()); - now += default_timeout() / 2; + // Check that there will be next keep-alive ping after keep_alive_timeout(). + assert_idle(&mut server, now, keep_alive_timeout()); + now += keep_alive_timeout(); let pings_before2 = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); assert!(ping.is_some()); @@ -446,10 +451,10 @@ fn keep_alive_lost() { let mut now = now(); server.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut server, now, default_timeout()); + assert_idle(&mut server, now, keep_alive_timeout()); // Wait that long and the server should send a PING frame. - now += default_timeout() / 2; + now += keep_alive_timeout(); let pings_before = server.stats().frame_tx.ping; let ping = server.process_output(now).dgram(); assert!(ping.is_some()); @@ -475,7 +480,7 @@ fn keep_alive_lost() { // return some small timeout for the recovry although it does not have // any outstanding data. Therefore we call it after AT_LEAST_PTO. now += AT_LEAST_PTO; - assert_idle(&mut server, now, default_timeout() - AT_LEAST_PTO); + assert_idle(&mut server, now, keep_alive_timeout() - AT_LEAST_PTO); } /// The other peer can also keep it alive. @@ -488,10 +493,11 @@ fn keep_alive_responder() { let mut now = now(); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now, default_timeout()); + assert_idle(&mut client, now, keep_alive_timeout()); // Wait that long and the client should send a PING frame. - now += default_timeout() / 2; + now += keep_alive_timeout(); + eprintln!("after wait"); let pings_before = client.stats().frame_tx.ping; let ping = client.process_output(now).dgram(); assert!(ping.is_some()); @@ -507,7 +513,7 @@ fn keep_alive_unmark() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_keep_alive(stream, false).unwrap(); assert_idle(&mut client, now(), default_timeout()); @@ -537,11 +543,11 @@ fn keep_alive_close() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_close_send(stream).unwrap(); transfer_force_idle(&mut client, &mut server); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); server.stream_close_send(stream).unwrap(); transfer_force_idle(&mut server, &mut client); @@ -558,19 +564,19 @@ fn keep_alive_reset() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_close_send(stream).unwrap(); transfer_force_idle(&mut client, &mut server); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); server.stream_reset_send(stream, 0).unwrap(); transfer_force_idle(&mut server, &mut client); assert_idle(&mut client, now(), default_timeout()); // The client will fade away from here. - let t = now() + (default_timeout() / 2); - assert_eq!(client.process_output(t).callback(), default_timeout() / 2); + let t = now() + keep_alive_timeout(); + assert_eq!(client.process_output(t).callback(), keep_alive_timeout()); let t = now() + default_timeout(); assert_eq!(client.process_output(t), Output::None); } @@ -584,7 +590,7 @@ fn keep_alive_stop_sending() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_close_send(stream).unwrap(); client.stream_stop_sending(stream, 0).unwrap(); @@ -608,14 +614,14 @@ fn keep_alive_multiple_stop() { let stream = create_stream_idle(&mut client, &mut server); client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); let other = client.stream_create(StreamType::BiDi).unwrap(); client.stream_keep_alive(other, true).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_keep_alive(stream, false).unwrap(); - assert_idle(&mut client, now(), default_timeout()); + assert_idle(&mut client, now(), keep_alive_timeout()); client.stream_keep_alive(other, false).unwrap(); assert_idle(&mut client, now(), default_timeout()); @@ -638,7 +644,7 @@ fn keep_alive_large_rtt() { endpoint.stream_keep_alive(stream, true).unwrap(); let delay = endpoint.process_output(now).callback(); qtrace!([endpoint], "new delay {:?}", delay); - assert!(delay > default_timeout() / 2); + assert!(delay > keep_alive_timeout()); assert!(delay > rtt); } } @@ -686,8 +692,9 @@ fn keep_alive_with_ack_eliciting_packet_lost() { // Create a stream. let stream = client.stream_create(StreamType::BiDi).unwrap(); + // Marking the stream for keep-alive changes the idle timeout. client.stream_keep_alive(stream, true).unwrap(); - assert_idle(&mut client, now, IDLE_TIMEOUT); + assert_idle(&mut client, now, IDLE_TIMEOUT / 2); // Send data on the stream that will be lost. _ = client.stream_send(stream, DEFAULT_STREAM_DATA).unwrap(); @@ -702,11 +709,13 @@ fn keep_alive_with_ack_eliciting_packet_lost() { let retransmit = client.process_output(now).dgram(); assert!(retransmit.is_some()); - // The timeout is the twice the PTO, because we've already sent one probe. - assert_eq!(client.process_output(now).callback(), pto * 2); + // The next callback should be for an idle PING. + assert_eq!( + client.process_output(now).callback(), + IDLE_TIMEOUT / 2 - pto + ); - // Wait for half the idle timeout (less the PTO we've already waited) - // so that we get a keep-alive. + // Wait that long and the client should send a PING frame. now += IDLE_TIMEOUT / 2 - pto; let pings_before = client.stats().frame_tx.ping; let ping = client.process_output(now).dgram(); diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 49c289f60b..71b9fa96d8 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -30,7 +30,6 @@ use crate::{ rtt::{RttEstimate, RttSource}, sender::PacketSender, stats::FrameStats, - tracking::PacketNumberSpace, Stats, }; @@ -1020,7 +1019,7 @@ impl Path { pub fn on_packets_lost( &mut self, prev_largest_acked_sent: Option, - space: PacketNumberSpace, + confirmed: bool, lost_packets: &[SentPacket], stats: &mut Stats, now: Instant, @@ -1030,7 +1029,7 @@ impl Path { let cwnd_reduced = self.sender.on_packets_lost( self.rtt.first_sample_time(), prev_largest_acked_sent, - self.rtt.pto(space), // Important: the base PTO, not adjusted. + self.rtt.pto(confirmed), // Important: the base PTO, not adjusted. lost_packets, stats, now, diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery/mod.rs index f2c3e8e298..a5753e6c84 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -568,6 +568,10 @@ impl LossRecovery { } } + const fn confirmed(&self) -> bool { + self.confirmed_time.is_some() + } + /// Returns (acked packets, lost packets) #[allow(clippy::too_many_arguments)] pub fn on_ack_received( @@ -627,7 +631,7 @@ impl LossRecovery { // as we rely on the count of in-flight packets to determine whether to send // another probe. Removing them too soon would result in not sending on PTO. let loss_delay = primary_path.borrow().rtt().loss_delay(); - let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space); + let cleanup_delay = self.pto_period(primary_path.borrow().rtt()); let mut lost = Vec::new(); self.spaces.get_mut(pn_space).unwrap().detect_lost_packets( now, @@ -642,7 +646,7 @@ impl LossRecovery { // backoff, so that we can determine persistent congestion. primary_path.borrow_mut().on_packets_lost( prev_largest_acked, - pn_space, + self.confirmed(), &lost, &mut self.stats.borrow_mut(), now, @@ -679,7 +683,7 @@ impl LossRecovery { dropped } - fn confirmed(&mut self, rtt: &RttEstimate, now: Instant) { + fn confirm(&mut self, rtt: &RttEstimate, now: Instant) { debug_assert!(self.confirmed_time.is_none()); self.confirmed_time = Some(now); // Up until now, the ApplicationData space has been ignored for PTO. @@ -716,7 +720,7 @@ impl LossRecovery { self.pto_state = None; if space == PacketNumberSpace::Handshake { - self.confirmed(path.rtt(), now); + self.confirm(path.rtt(), now); } } @@ -757,41 +761,40 @@ impl LossRecovery { fn pto_period_inner( rtt: &RttEstimate, pto_state: Option<&PtoState>, - pn_space: PacketNumberSpace, + confirmed: bool, fast_pto: u8, ) -> Duration { // This is a complicated (but safe) way of calculating: // base_pto * F * 2^pto_count // where F = fast_pto / FAST_PTO_SCALE (== 1 by default) let pto_count = pto_state.map_or(0, |p| u32::try_from(p.count).unwrap_or(0)); - rtt.pto(pn_space) + rtt.pto(confirmed) .checked_mul(u32::from(fast_pto) << min(pto_count, u32::BITS - u8::BITS)) .map_or(Duration::from_secs(3600), |p| p / u32::from(FAST_PTO_SCALE)) } /// Get the current PTO period for the given packet number space. /// Unlike calling `RttEstimate::pto` directly, this includes exponential backoff. - fn pto_period(&self, rtt: &RttEstimate, pn_space: PacketNumberSpace) -> Duration { - Self::pto_period_inner(rtt, self.pto_state.as_ref(), pn_space, self.fast_pto) + fn pto_period(&self, rtt: &RttEstimate) -> Duration { + Self::pto_period_inner( + rtt, + self.pto_state.as_ref(), + self.confirmed(), + self.fast_pto, + ) } // Calculate PTO time for the given space. fn pto_time(&self, rtt: &RttEstimate, pn_space: PacketNumberSpace) -> Option { - if self.confirmed_time.is_none() && pn_space == PacketNumberSpace::ApplicationData { - None - } else { - self.spaces.get(pn_space).and_then(|space| { - space - .pto_base_time() - .map(|t| t + self.pto_period(rtt, pn_space)) - }) - } + self.spaces + .get(pn_space) + .and_then(|space| space.pto_base_time().map(|t| t + self.pto_period(rtt))) } /// Find the earliest PTO time for all active packet number spaces. /// Ignore Application if either Initial or Handshake have an active PTO. fn earliest_pto(&self, rtt: &RttEstimate) -> Option { - if self.confirmed_time.is_some() { + if self.confirmed() { self.pto_time(rtt, PacketNumberSpace::ApplicationData) } else { self.pto_time(rtt, PacketNumberSpace::Initial) @@ -859,6 +862,7 @@ impl LossRecovery { qtrace!([self], "timeout {:?}", now); let loss_delay = primary_path.borrow().rtt().loss_delay(); + let confirmed = self.confirmed(); let mut lost_packets = Vec::new(); for space in self.spaces.iter_mut() { @@ -866,14 +870,14 @@ impl LossRecovery { let pto = Self::pto_period_inner( primary_path.borrow().rtt(), self.pto_state.as_ref(), - space.space, + confirmed, self.fast_pto, ); space.detect_lost_packets(now, loss_delay, pto, &mut lost_packets); primary_path.borrow_mut().on_packets_lost( space.largest_acked_sent_time, - space.space, + confirmed, &lost_packets[first..], &mut self.stats.borrow_mut(), now, @@ -950,7 +954,6 @@ mod tests { ecn::EcnCount, packet::{PacketNumber, PacketType}, path::{Path, PathRef}, - rtt::RttEstimate, stats::{Stats, StatsCell}, }; @@ -961,8 +964,8 @@ mod tests { const ON_SENT_SIZE: usize = 100; /// An initial RTT for using with `setup_lr`. - const TEST_RTT: Duration = ms(80); - const TEST_RTTVAR: Duration = ms(40); + const TEST_RTT: Duration = ms(7000); + const TEST_RTTVAR: Duration = ms(3500); struct Fixture { lr: LossRecovery, @@ -1033,6 +1036,7 @@ mod tests { ConnectionIdEntry::new(0, ConnectionId::from(&[1, 2, 3]), [0; 16]), ); path.set_primary(true); + path.rtt_mut().set_initial(TEST_RTT); Self { lr: LossRecovery::new(StatsCell::default(), FAST_PTO_SCALE), path: Rc::new(RefCell::new(path)), @@ -1510,13 +1514,13 @@ mod tests { ON_SENT_SIZE, )); - assert_eq!(lr.pto_time(PacketNumberSpace::ApplicationData), None); + assert!(lr.pto_time(PacketNumberSpace::ApplicationData).is_some()); lr.discard(PacketNumberSpace::Initial, pn_time(1)); - assert_eq!(lr.pto_time(PacketNumberSpace::ApplicationData), None); + assert!(lr.pto_time(PacketNumberSpace::ApplicationData).is_some()); // Expiring state after the PTO on the ApplicationData space has // expired should result in setting a PTO state. - let default_pto = RttEstimate::default().pto(PacketNumberSpace::ApplicationData); + let default_pto = lr.path.borrow().rtt().pto(true); let expected_pto = pn_time(2) + default_pto; lr.discard(PacketNumberSpace::Handshake, expected_pto); let profile = lr.send_profile(now()); @@ -1548,7 +1552,7 @@ mod tests { ON_SENT_SIZE, )); - let handshake_pto = RttEstimate::default().pto(PacketNumberSpace::Handshake); + let handshake_pto = lr.path.borrow().rtt().pto(false); let expected_pto = now() + handshake_pto; assert_eq!(lr.pto_time(PacketNumberSpace::Initial), Some(expected_pto)); let profile = lr.send_profile(now()); diff --git a/neqo-transport/src/recv_stream.rs b/neqo-transport/src/recv_stream.rs index 7b46a386bc..3add6e4d46 100644 --- a/neqo-transport/src/recv_stream.rs +++ b/neqo-transport/src/recv_stream.rs @@ -92,32 +92,19 @@ impl RecvStreams { } pub fn clear_terminal(&mut self, send_streams: &SendStreams, role: Role) -> (u64, u64) { - let recv_to_remove = self - .streams - .iter() - .filter_map(|(id, stream)| { - // Remove all streams for which the receiving is done (or aborted). - // But only if they are unidirectional, or we have finished sending. - if stream.is_terminal() && (id.is_uni() || !send_streams.exists(*id)) { - Some(*id) - } else { - None - } - }) - .collect::>(); - let mut removed_bidi = 0; let mut removed_uni = 0; - for id in &recv_to_remove { - self.streams.remove(id); - if id.is_remote_initiated(role) { + self.streams.retain(|id, s| { + let dead = s.is_terminal() && (id.is_uni() || !send_streams.exists(*id)); + if dead && id.is_remote_initiated(role) { if id.is_bidi() { removed_bidi += 1; } else { removed_uni += 1; } } - } + !dead + }); (removed_bidi, removed_uni) } diff --git a/neqo-transport/src/rtt.rs b/neqo-transport/src/rtt.rs index 027b574aad..ba623a1a1c 100644 --- a/neqo-transport/src/rtt.rs +++ b/neqo-transport/src/rtt.rs @@ -19,7 +19,6 @@ use crate::{ qlog::{self, QlogMetric}, recovery::RecoveryToken, stats::FrameStats, - tracking::PacketNumberSpace, }; /// The smallest time that the system timer (via `sleep()`, `nanosleep()`, @@ -163,9 +162,9 @@ impl RttEstimate { self.smoothed_rtt } - pub fn pto(&self, pn_space: PacketNumberSpace) -> Duration { + pub fn pto(&self, confirmed: bool) -> Duration { let mut t = self.estimate() + max(4 * self.rttvar, GRANULARITY); - if pn_space == PacketNumberSpace::ApplicationData { + if confirmed { t += self.ack_delay.max(); } t