diff --git a/.github/actions/maximize-build-space/action.yml b/.github/actions/maximize-build-space/action.yml new file mode 100644 index 0000000000..c9ada92815 --- /dev/null +++ b/.github/actions/maximize-build-space/action.yml @@ -0,0 +1,206 @@ +# https://github.com/easimon/maximize-build-space/blob/fadc013e293a3453768b4ddb9db8c85104752807/action.yml + +name: 'Maximize build disk space' +description: 'Maximize the available disk space for your build job' +branding: + icon: 'crop' + color: 'orange' +inputs: + root-reserve-mb: + description: 'Space to be left free on the root filesystem, in Megabytes.' + required: false + default: '1024' + temp-reserve-mb: + description: 'Space to be left free on the temp filesystem (/mnt), in Megabytes.' + required: false + default: '100' + swap-size-mb: + description: 'Swap space to create, in Megabytes.' + required: false + default: '4096' + overprovision-lvm: + description: | + Create the LVM disk images as sparse files, making the space required for the LVM image files *appear* unused on the + hosting volumes until actually allocated. Use with care, this can lead to surprising out-of-disk-space situations. + You should prefer adjusting root-reserve-mb/temp-reserve-mb over using this option. + required: false + default: 'false' + build-mount-path: + description: 'Absolute path to the mount point where the build space will be available, defaults to $GITHUB_WORKSPACE if unset.' + required: false + build-mount-path-ownership: + description: 'Ownership of the mount point path, defaults to standard "runner" user and group.' + required: false + default: 'runner:runner' + pv-loop-path: + description: 'Absolute file path for the LVM image created on the root filesystem, the default is usually fine.' + required: false + default: '/pv.img' + tmp-pv-loop-path: + description: 'Absolute file path for the LVM image created on the temp filesystem, the default is usually fine. Must reside on /mnt' + required: false + default: '/mnt/tmp-pv.img' + remove-dotnet: + description: 'Removes .NET runtime and libraries. (frees ~17 GB)' + required: false + default: 'false' + remove-android: + description: 'Removes Android SDKs and Tools. (frees ~11 GB)' + required: false + default: 'false' + remove-haskell: + description: 'Removes GHC (Haskell) artifacts. (frees ~2.7 GB)' + required: false + default: 'false' + remove-codeql: + description: 'Removes CodeQL Action Bundles. (frees ~5.4 GB)' + required: false + default: 'false' + remove-docker-images: + description: 'Removes cached Docker images. (frees ~3 GB)' + required: false + default: 'false' +runs: + using: "composite" + steps: + - name: Disk space report before modification + shell: bash + run: | + echo "Memory and swap:" + sudo free + echo + sudo swapon --show + echo + + echo "Available storage:" + sudo df -h + echo + + - name: Maximize build disk space + shell: bash + run: | + set -euo pipefail + + BUILD_MOUNT_PATH="${{ inputs.build-mount-path }}" + if [[ -z "${BUILD_MOUNT_PATH}" ]]; then + BUILD_MOUNT_PATH="${GITHUB_WORKSPACE}" + fi + + echo "Arguments:" + echo + echo " Root reserve: ${{ inputs.root-reserve-mb }} MiB" + echo " Temp reserve: ${{ inputs.temp-reserve-mb }} MiB" + echo " Swap space: ${{ inputs.swap-size-mb }} MiB" + echo " Overprovision LVM: ${{ inputs.overprovision-lvm }}" + echo " Mount path: ${BUILD_MOUNT_PATH}" + echo " Root PV loop path: ${{ inputs.pv-loop-path }}" + echo " Temp PV loop path: ${{ inputs.tmp-pv-loop-path }}" + echo -n " Removing: " + if [[ ${{ inputs.remove-dotnet }} == 'true' ]]; then + echo -n "dotnet " + fi + if [[ ${{ inputs.remove-android }} == 'true' ]]; then + echo -n "android " + fi + if [[ ${{ inputs.remove-haskell }} == 'true' ]]; then + echo -n "haskell " + fi + if [[ ${{ inputs.remove-codeql }} == 'true' ]]; then + echo -n "codeql " + fi + if [[ ${{ inputs.remove-docker-images }} == 'true' ]]; then + echo -n "docker " + fi + echo + + # store owner of $GITHUB_WORKSPACE in case the action deletes it + WORKSPACE_OWNER="$(stat -c '%U:%G' "${GITHUB_WORKSPACE}")" + + # ensure mount path exists before the action + sudo mkdir -p "${BUILD_MOUNT_PATH}" + sudo find "${BUILD_MOUNT_PATH}" -maxdepth 0 ! -empty -exec echo 'WARNING: directory [{}] is not empty, data loss might occur. Content:' \; -exec ls -al "{}" \; + + echo "Removing unwanted software... " + if [[ ${{ inputs.remove-dotnet }} == 'true' ]]; then + sudo rm -rf /usr/share/dotnet + fi + if [[ ${{ inputs.remove-android }} == 'true' ]]; then + sudo rm -rf /usr/local/lib/android + fi + if [[ ${{ inputs.remove-haskell }} == 'true' ]]; then + sudo rm -rf /opt/ghc + fi + if [[ ${{ inputs.remove-codeql }} == 'true' ]]; then + sudo rm -rf /opt/hostedtoolcache/CodeQL + fi + if [[ ${{ inputs.remove-docker-images }} == 'true' ]]; then + sudo docker image prune --all --force + fi + echo "... done" + + VG_NAME=buildvg + + # github runners have an active swap file in /mnt/swapfile + # we want to reuse the temp disk, so first unmount swap and clean the temp disk + echo "Unmounting and removing swap file." + sudo swapoff -a + sudo rm -f /mnt/swapfile + + echo "Creating LVM Volume." + echo " Creating LVM PV on root fs." + # create loop pv image on root fs + ROOT_RESERVE_KB=$(expr ${{ inputs.root-reserve-mb }} \* 1024) + ROOT_FREE_KB=$(df --block-size=1024 --output=avail / | tail -1) + ROOT_LVM_SIZE_KB=$(expr $ROOT_FREE_KB - $ROOT_RESERVE_KB) + ROOT_LVM_SIZE_BYTES=$(expr $ROOT_LVM_SIZE_KB \* 1024) + sudo touch "${{ inputs.pv-loop-path }}" && sudo fallocate -z -l "${ROOT_LVM_SIZE_BYTES}" "${{ inputs.pv-loop-path }}" + export ROOT_LOOP_DEV=$(sudo losetup --find --show "${{ inputs.pv-loop-path }}") + sudo pvcreate -f "${ROOT_LOOP_DEV}" + + # create pv on temp disk + echo " Creating LVM PV on temp fs." + TMP_RESERVE_KB=$(expr ${{ inputs.temp-reserve-mb }} \* 1024) + TMP_FREE_KB=$(df --block-size=1024 --output=avail /mnt | tail -1) + TMP_LVM_SIZE_KB=$(expr $TMP_FREE_KB - $TMP_RESERVE_KB) + TMP_LVM_SIZE_BYTES=$(expr $TMP_LVM_SIZE_KB \* 1024) + sudo touch "${{ inputs.tmp-pv-loop-path }}" && sudo fallocate -z -l "${TMP_LVM_SIZE_BYTES}" "${{ inputs.tmp-pv-loop-path }}" + export TMP_LOOP_DEV=$(sudo losetup --find --show "${{ inputs.tmp-pv-loop-path }}") + sudo pvcreate -f "${TMP_LOOP_DEV}" + + # create volume group from these pvs + sudo vgcreate "${VG_NAME}" "${TMP_LOOP_DEV}" "${ROOT_LOOP_DEV}" + + echo "Recreating swap" + # create and activate swap + sudo lvcreate -L "${{ inputs.swap-size-mb }}M" -n swap "${VG_NAME}" + sudo mkswap "/dev/mapper/${VG_NAME}-swap" + sudo swapon "/dev/mapper/${VG_NAME}-swap" + + echo "Creating build volume" + # create and mount build volume + sudo lvcreate -l 100%FREE -n buildlv "${VG_NAME}" + if [[ ${{ inputs.overprovision-lvm }} == 'true' ]]; then + sudo mkfs.ext4 -m0 "/dev/mapper/${VG_NAME}-buildlv" + else + sudo mkfs.ext4 -Enodiscard -m0 "/dev/mapper/${VG_NAME}-buildlv" + fi + sudo mount "/dev/mapper/${VG_NAME}-buildlv" "${BUILD_MOUNT_PATH}" + sudo chown -R "${{ inputs.build-mount-path-ownership }}" "${BUILD_MOUNT_PATH}" + + # if build mount path is a parent of $GITHUB_WORKSPACE, and has been deleted, recreate it + if [[ ! -d "${GITHUB_WORKSPACE}" ]]; then + sudo mkdir -p "${GITHUB_WORKSPACE}" + sudo chown -R "${WORKSPACE_OWNER}" "${GITHUB_WORKSPACE}" + fi + + - name: Disk space report after modification + shell: bash + run: | + echo "Memory and swap:" + sudo free + echo + sudo swapon --show + echo + + echo "Available storage:" + sudo df -h diff --git a/.github/actions/nss/action.yml b/.github/actions/nss/action.yml index 2fa61528a7..34ccae1d7d 100644 --- a/.github/actions/nss/action.yml +++ b/.github/actions/nss/action.yml @@ -49,14 +49,14 @@ runs: # # - name: Checkout NSPR # if: env.BUILD_NSS == '1' - # uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + # uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 # with: # repository: "nss-dev/nspr" # path: ${{ github.workspace }}/nspr # - name: Checkout NSS # if: env.BUILD_NSS == '1' - # uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + # uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 # with: # repository: "nss-dev/nss" # path: ${{ github.workspace }}/nss diff --git a/.github/actions/pr-comment/action.yml b/.github/actions/pr-comment/action.yml index 75eb547562..fce0bd3dd7 100644 --- a/.github/actions/pr-comment/action.yml +++ b/.github/actions/pr-comment/action.yml @@ -15,7 +15,7 @@ inputs: runs: using: composite steps: - - uses: actions/download-artifact@v4 + - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: run-id: ${{ github.event.workflow_run.id }} name: ${{ inputs.name }} @@ -32,7 +32,7 @@ runs: echo "[:arrow_down: Download logs]($(cat log-url))" >> contents fi - - uses: thollander/actions-comment-pull-request@v2 + - uses: thollander/actions-comment-pull-request@fabd468d3a1a0b97feee5f6b9e499eab0dd903f6 # v2.5.0 with: filePath: contents mode: ${{ inputs.mode }} diff --git a/.github/actions/quic-interop-runner/action.yml b/.github/actions/quic-interop-runner/action.yml index ec4db19fe1..d0874ef924 100644 --- a/.github/actions/quic-interop-runner/action.yml +++ b/.github/actions/quic-interop-runner/action.yml @@ -24,7 +24,7 @@ runs: using: "composite" steps: - name: Checkout quic-interop/quic-interop-runner repository - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: repository: 'quic-interop/quic-interop-runner' path: 'quic-interop-runner' @@ -40,7 +40,7 @@ runs: sudo apt-get install -y --no-install-recommends tshark shell: bash - - uses: actions/setup-python@v5 + - uses: actions/setup-python@82c7e631bb3cdc910f68e0081d67478d79c6982d # v5.1.0 with: python-version: 3.8 cache: 'pip' @@ -88,7 +88,7 @@ runs: mv result.json.tmp result.json shell: bash - - uses: actions/upload-artifact@v4 + - uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 with: name: '${{ inputs.client }} vs. ${{ inputs.server }} results' path: | diff --git a/.github/actions/rust/action.yml b/.github/actions/rust/action.yml index 4b03b37b8d..b363738cca 100644 --- a/.github/actions/rust/action.yml +++ b/.github/actions/rust/action.yml @@ -13,7 +13,7 @@ runs: using: composite steps: - name: Install Rust - uses: dtolnay/rust-toolchain@master + uses: dtolnay/rust-toolchain@bb45937a053e097f8591208d8e74c90db1873d07 # master with: toolchain: ${{ inputs.version }} components: ${{ inputs.components }} @@ -35,7 +35,7 @@ runs: # sccache slows CI down, so we leave it disabled. # Leaving the steps below commented out, so we can re-evaluate enabling it later. # - name: Use sccache - # uses: mozilla-actions/sccache-action@v0.0.4 + # uses: mozilla-actions/sccache-action@2e7f9ec7921547d4b46598398ca573513895d0bd # v0.0.4 # - name: Enable sscache # shell: bash @@ -53,6 +53,6 @@ runs: # Ditto for rust-cache. # - name: Use Rust cache - # uses: Swatinem/rust-cache@v2 + # uses: Swatinem/rust-cache@23bce251a8cd2ffc3c1075eaa2367cf899916d84 # v2.7.3 # with: # cache-all-crates: "true" diff --git a/.github/workflows/actionlint.yml b/.github/workflows/actionlint.yml index b258454518..107e5f0726 100644 --- a/.github/workflows/actionlint.yml +++ b/.github/workflows/actionlint.yml @@ -21,7 +21,7 @@ jobs: run: shell: bash steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - name: Download actionlint id: get_actionlint run: bash <(curl https://raw.githubusercontent.com/rhysd/actionlint/main/scripts/download-actionlint.bash) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 232ad4c6f1..e27be7daf9 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -26,10 +26,10 @@ jobs: steps: - name: Checkout neqo - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - name: Checkout msquic - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: repository: microsoft/msquic ref: main @@ -244,6 +244,6 @@ jobs: - name: Export PR comment data uses: ./.github/actions/pr-comment-data-export with: - name: bench + name: ${{ github.workflow }} contents: results.md log-url: ${{ steps.export.outputs.artifact-url }} diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 00971a6ac4..4d4b5898c2 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -25,8 +25,9 @@ jobs: matrix: os: [ubuntu-latest, macos-14, windows-latest] # Don't increase beyond what Firefox is currently using: - # https://firefox-source-docs.mozilla.org/writing-rust-code/update-policy.html#schedule - rust-toolchain: [1.74.0, stable, nightly] + # https://searchfox.org/mozilla-central/search?q=MINIMUM_RUST_VERSION&path=python/mozboot/mozboot/util.py + # Keep in sync with Cargo.toml + rust-toolchain: [1.76.0, stable, nightly] type: [debug] include: - os: ubuntu-latest @@ -41,7 +42,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - name: Install dependencies (Linux) if: runner.os == 'Linux' @@ -151,7 +152,7 @@ jobs: if: success() || failure() - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@84508663e988701840491b86de86b666e8a86bed # v4.3.0 + uses: codecov/codecov-action@5ecb98a3c6b747ed38dc09f787459979aebb39be # v4.3.1 with: file: lcov.info fail_ci_if_error: false diff --git a/.github/workflows/firefox.yml b/.github/workflows/firefox.yml new file mode 100644 index 0000000000..adf65a85e1 --- /dev/null +++ b/.github/workflows/firefox.yml @@ -0,0 +1,169 @@ +name: Firefox +on: + push: + branches: ["main"] + paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] + pull_request: + branches: ["main"] + paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] + merge_group: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref_name }} + cancel-in-progress: true + +env: + FIREFOX: Firefox + +jobs: + firefox: + name: Build Firefox + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-14, windows-latest] + type: [debug, release] + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash + env: + MOZBUILD_STATE_PATH: ${{ github.workspace }}/mozbuild + CARGO_HOME: ${{ github.workspace }}/cargo + + steps: + # We need to check out Neqo first, because the maximize-build-space action + # is vendored in. + - name: Check out Neqo + uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + + - name: Maximize build space + if: runner.os == 'Linux' + uses: ./.github/actions/maximize-build-space + with: + root-reserve-mb: 2048 + temp-reserve-mb: 2048 + swap-size-mb: 4096 + remove-dotnet: true + remove-android: true + remove-haskell: true + remove-docker-images: true + + # The previous step blew it away, so we need to check it out again. + - name: Check out Neqo again + if: runner.os == 'Linux' + uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + + - name: Check out Firefox + uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + with: + repository: mozilla/gecko-dev + path: mozilla-unified + + - name: Install deps (Windows) + if: runner.os == 'Windows' + run: choco install -y mozillabuild + + - name: Bootstrap Firefox + run: | + cd mozilla-unified + { + echo "mk_add_options MOZ_OBJDIR=../$FIREFOX" + echo "ac_add_options --with-ccache=sccache" + echo "ac_add_options --enable-application=browser" + # Work around https://bugzilla.mozilla.org/show_bug.cgi?id=1894031 + if [ "${{ runner.os }}" != "Windows" ] || [ "${{ matrix.type}}" != "debug" ]; then + echo "ac_add_options --disable-tests" + fi + echo "ac_add_options --enable-${{ matrix.type }}" + } >> mozconfig + ./mach bootstrap --application-choice browser + + - name: Plumb in Neqo + run: | + # Get qlog version used by neqo + cargo generate-lockfile + QLOG_VERSION=$(cargo pkgid qlog | cut -d@ -f2) + rm Cargo.lock + cd mozilla-unified + { + echo '[[audits.qlog]]' + echo 'who = "CI"' + echo 'criteria = "safe-to-deploy"' + echo "version = \"$QLOG_VERSION\"" + } >> supply-chain/audits.toml + sed -i'' -e "s/qlog =.*/qlog = \"$QLOG_VERSION\"/" netwerk/socket/neqo_glue/Cargo.toml + { + echo '[patch."https://github.com/mozilla/neqo"]' + echo 'neqo-http3 = { path = "../neqo-http3" }' + echo 'neqo-transport = { path = "../neqo-transport" }' + echo 'neqo-common = { path = "../neqo-common" }' + echo 'neqo-qpack = { path = "../neqo-qpack" }' + echo 'neqo-crypto = { path = "../neqo-crypto" }' + } >> Cargo.toml + cargo update neqo-http3 neqo-transport neqo-common neqo-qpack neqo-crypto + ./mach vendor rust --ignore-modified + + - name: Build Firefox + env: + NAME: ${{ runner.os == 'macOS' && 'Nightly' || 'bin' }} + TYPE: ${{ runner.os == 'macOS' && matrix.type == 'debug' && 'Debug' || '' }} + EXT: ${{ runner.os == 'macOS' && '.app' || '' }} + run: | + cd mozilla-unified + ./mach build && tar -cf "../$FIREFOX.tar" -C "../$FIREFOX/dist" "$NAME$TYPE$EXT" + exit 0 + + - name: Export binary + id: upload + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 + with: + name: ${{ runner.os }}-${{ env.FIREFOX }}-${{ matrix.type }}.tgz + path: ${{ env.FIREFOX }}.tar + compression-level: 9 + + - run: echo "${{ steps.upload.outputs.artifact-url }}" >> artifact + + - name: Export artifact URL + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 + with: + name: artifact-${{ runner.os }}-${{ env.FIREFOX }}-${{ matrix.type }} + path: artifact + retention-days: 1 + + comment: + name: Comment on PR + if: always() + needs: firefox + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + + - uses: actions/download-artifact@9c19ed7fe5d278cd354c7dfd5d3b88589c7e2395 # v4.1.6 + with: + pattern: 'artifact-*' + path: artifacts + + - run: | + { + echo "### Firefox builds for this PR" + echo "The following builds are available for testing. Crossed-out builds did not succeed." + for os in Linux macOS Windows; do + echo -n "* **$os**:" + for type in debug release; do + artifact="artifacts/artifact-$os-${{ env.FIREFOX }}-$type/artifact" + if [ -e "$artifact" ]; then + echo -n " [${type^}]($(cat $artifact))" + else + echo -n " ~~${type^}~~" + fi + done + echo + done + } > comment.md + cat comment.md > "$GITHUB_STEP_SUMMARY" + + - uses: ./.github/actions/pr-comment-data-export + with: + name: ${{ github.workflow }} + contents: comment.md diff --git a/.github/workflows/mutants.yml b/.github/workflows/mutants.yml index 1837749da8..e7ca9f55f0 100644 --- a/.github/workflows/mutants.yml +++ b/.github/workflows/mutants.yml @@ -17,7 +17,7 @@ jobs: mutants: runs-on: ubuntu-latest steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: fetch-depth: 0 diff --git a/.github/workflows/bench-comment.yml b/.github/workflows/pr-comment.yml similarity index 66% rename from .github/workflows/bench-comment.yml rename to .github/workflows/pr-comment.yml index dce7e25f5f..ac500f3831 100644 --- a/.github/workflows/bench-comment.yml +++ b/.github/workflows/pr-comment.yml @@ -4,11 +4,11 @@ # tests itself might run off of a fork, i.e., an untrusted environment and should # thus not be granted write permissions. -name: Benchmark Comment +name: PR Comment on: workflow_run: - workflows: ["CI"] + workflows: ["QNS", "CI", "Firefox"] types: - completed @@ -21,10 +21,10 @@ jobs: runs-on: ubuntu-latest if: | github.event.workflow_run.event == 'pull_request' && - github.event.workflow_run.conclusion == 'success' + (github.event.workflow_run.name != 'CI' || github.event.workflow_run.conclusion == 'success' ) steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 - uses: ./.github/actions/pr-comment with: - name: bench + name: ${{ github.event.workflow_run.name }} token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/qns-comment.yml b/.github/workflows/qns-comment.yml deleted file mode 100644 index f27d4904a1..0000000000 --- a/.github/workflows/qns-comment.yml +++ /dev/null @@ -1,29 +0,0 @@ -# Post test results as pull request comment. -# -# This is done as a separate workflow as it requires write permissions. The -# tests itself might run off of a fork, i.e., an untrusted environment and should -# thus not be granted write permissions. - -name: QUIC Network Simulator Comment - -on: - workflow_run: - workflows: ["QUIC Network Simulator"] - types: - - completed - -permissions: read-all - -jobs: - comment: - permissions: - pull-requests: write - runs-on: ubuntu-latest - if: | - github.event.workflow_run.event == 'pull_request' - steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 - - uses: ./.github/actions/pr-comment - with: - name: qns - token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index 5efd14c2ce..5a5e07b7fd 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -1,12 +1,17 @@ -name: QUIC Network Simulator +name: QNS on: - schedule: - - cron: '42 3 * * 2,5' # Runs at 03:42 UTC (m and h chosen arbitrarily) twice a week. - workflow_dispatch: + push: + branches: ["main"] + paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] pull_request: branches: ["main"] + paths-ignore: ["*.md", "*.png", "*.svg", "LICENSE-*"] merge_group: + schedule: + # Run at 1 AM each day + - cron: '0 1 * * *' + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.ref_name }} @@ -60,7 +65,6 @@ jobs: platforms: 'linux/amd64, linux/arm64' - uses: docker/build-push-action@2cdde995de11925a030ce8070c3d77a52ffcf1c0 # v5.3.0 - if: github.event_name == 'pull_request' id: docker_build_and_push with: tags: ${{ steps.meta.outputs.tags }} @@ -72,13 +76,11 @@ jobs: outputs: type=docker,dest=/tmp/${{ env.LATEST }}.tar - uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 - if: github.event_name == 'pull_request' with: name: '${{ env.LATEST }} Docker image' path: /tmp/${{ env.LATEST }}.tar implementations: - if: ${{ github.event_name == 'pull_request' }} name: Determine interop pairs needs: docker-image runs-on: ubuntu-latest @@ -120,7 +122,6 @@ jobs: } >> "$GITHUB_OUTPUT" run-qns: - if: ${{ github.event_name == 'pull_request' }} name: Run QNS needs: implementations strategy: @@ -129,7 +130,7 @@ jobs: pair: ${{ fromJson(needs.implementations.outputs.pairs) }} runs-on: ubuntu-latest steps: - - uses: actions/download-artifact@v4 + - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: name: '${{ env.LATEST }} Docker image' path: /tmp @@ -142,7 +143,7 @@ jobs: echo "client=$(echo "$PAIR" | cut -d% -f1)" >> "$GITHUB_OUTPUT" echo "server=$(echo "$PAIR" | cut -d% -f2)" >> "$GITHUB_OUTPUT" - - uses: actions/checkout@v4 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 # TODO: Replace once https://github.com/quic-interop/quic-interop-runner/pull/356 is merged. - uses: ./.github/actions/quic-interop-runner @@ -152,13 +153,12 @@ jobs: implementations: ${{ needs.implementations.outputs.implementations }} report: - if: ${{ always() && github.event_name == 'pull_request' }} name: Report results needs: run-qns runs-on: ubuntu-latest steps: - - uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3 - - uses: actions/download-artifact@v4 + - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 + - uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7 with: pattern: '*results' path: results @@ -196,20 +196,28 @@ jobs: done { echo "### Failed Interop Tests" - echo "[QUIC Interop Runner](https://github.com/quic-interop/quic-interop-runner), *client* vs. *server*" >> "$GROUP.md" - cat failed.md - echo "
Succeeded and unsupported tests" + if [ -e failed.md ]; then + echo "[QUIC Interop Runner](https://github.com/quic-interop/quic-interop-runner), *client* vs. *server*" + cat failed.md + else + echo "None :tada:" + fi + echo "
All results" + echo for GROUP in succeeded unsupported; do - echo echo "### ${GROUP^} Interop Tests" - echo - cat "$GROUP.md" - echo + if [ -e "$GROUP.md" ]; then + echo "[QUIC Interop Runner](https://github.com/quic-interop/quic-interop-runner), *client* vs. *server*" + cat "$GROUP.md" + else + echo "None :question:" + fi done + echo echo "
" } >> comment.md - uses: ./.github/actions/pr-comment-data-export with: - name: qns + name: ${{ github.workflow }} contents: comment.md diff --git a/Cargo.toml b/Cargo.toml index 017918b88c..9f9fd0d98b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,13 +15,14 @@ resolver = "2" homepage = "https://github.com/mozilla/neqo/" repository = "https://github.com/mozilla/neqo/" authors = ["The Neqo Authors "] -version = "0.7.5" +version = "0.7.7" # Keep in sync with `.rustfmt.toml` `edition`. edition = "2021" license = "MIT OR Apache-2.0" # Don't increase beyond what Firefox is currently using: # https://searchfox.org/mozilla-central/search?q=MINIMUM_RUST_VERSION&path=python/mozboot/mozboot/util.py -rust-version = "1.74.0" +# Keep in sync with .github/workflows/check.yml +rust-version = "1.76.0" [workspace.dependencies] log = { version = "0.4", default-features = false } diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index e9de5915a7..964e09c822 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -20,12 +20,12 @@ use std::{ use neqo_common::{event::Provider, qdebug, qinfo, qwarn, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ - Connection, ConnectionError, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, + CloseReason, Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, StreamType, }; use url::Url; -use super::{get_output_file, qlog_new, Args, Res}; +use super::{get_output_file, qlog_new, Args, CloseState, Res}; pub struct Handler<'a> { streams: HashMap>>, @@ -142,6 +142,26 @@ pub(crate) fn create_client( Ok(client) } +impl TryFrom<&State> for CloseState { + type Error = CloseReason; + + fn try_from(value: &State) -> Result { + let (state, error) = match value { + State::Closing { error, .. } | State::Draining { error, .. } => { + (CloseState::Closing, error) + } + State::Closed(error) => (CloseState::Closed, error), + _ => return Ok(CloseState::NotClosing), + }; + + if error.is_error() { + Err(error.clone()) + } else { + Ok(state) + } + } +} + impl super::Client for Connection { fn process_output(&mut self, now: Instant) -> Output { self.process_output(now) @@ -163,15 +183,8 @@ impl super::Client for Connection { } } - fn is_closed(&self) -> Result { - match self.state() { - State::Closed( - ConnectionError::Transport(neqo_transport::Error::NoError) - | ConnectionError::Application(0), - ) => Ok(true), - State::Closed(err) => Err(err.clone()), - _ => Ok(false), - } + fn is_closed(&self) -> Result { + self.state().try_into() } fn stats(&self) -> neqo_transport::Stats { diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index 5a77c92f0b..8284bd5d34 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -22,12 +22,12 @@ use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram, Header}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ - AppError, Connection, ConnectionError, EmptyConnectionIdGenerator, Error as TransportError, - Output, StreamId, + AppError, CloseReason, Connection, EmptyConnectionIdGenerator, Error as TransportError, Output, + StreamId, }; use url::Url; -use super::{get_output_file, qlog_new, Args, Res}; +use super::{get_output_file, qlog_new, Args, CloseState, Res}; pub(crate) struct Handler<'a> { #[allow( @@ -105,17 +105,28 @@ pub(crate) fn create_client( Ok(client) } -impl super::Client for Http3Client { - fn is_closed(&self) -> Result { - match self.state() { - Http3State::Closed( - ConnectionError::Transport(neqo_transport::Error::NoError) - | ConnectionError::Application(0), - ) => Ok(true), - Http3State::Closed(err) => Err(err.clone()), - _ => Ok(false), +impl TryFrom for CloseState { + type Error = CloseReason; + + fn try_from(value: Http3State) -> Result { + let (state, error) = match value { + Http3State::Closing(error) => (CloseState::Closing, error), + Http3State::Closed(error) => (CloseState::Closed, error), + _ => return Ok(CloseState::NotClosing), + }; + + if error.is_error() { + Err(error.clone()) + } else { + Ok(state) } } +} + +impl super::Client for Http3Client { + fn is_closed(&self) -> Result { + self.state().try_into() + } fn process_output(&mut self, now: Instant) -> Output { self.process_output(now) diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 6d0b5dad6f..f196a5e32e 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -27,7 +27,7 @@ use neqo_crypto::{ init, Cipher, ResumptionToken, }; use neqo_http3::Output; -use neqo_transport::{AppError, ConnectionError, ConnectionId, Version}; +use neqo_transport::{AppError, CloseReason, ConnectionId, Version}; use qlog::{events::EventImportance, streamer::QlogStreamer}; use tokio::time::Sleep; use url::{Origin, Url}; @@ -80,11 +80,11 @@ impl From for Error { } } -impl From for Error { - fn from(err: neqo_transport::ConnectionError) -> Self { +impl From for Error { + fn from(err: neqo_transport::CloseReason) -> Self { match err { - ConnectionError::Transport(e) => Self::TransportError(e), - ConnectionError::Application(e) => Self::ApplicationError(e), + CloseReason::Transport(e) => Self::TransportError(e), + CloseReason::Application(e) => Self::ApplicationError(e), } } } @@ -345,6 +345,12 @@ trait Handler { fn take_token(&mut self) -> Option; } +enum CloseState { + NotClosing, + Closing, + Closed, +} + /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { fn process_output(&mut self, now: Instant) -> Output; @@ -355,11 +361,7 @@ trait Client { fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; - /// Returns [`Some(_)`] if the connection is closed. - /// - /// Note that connection was closed without error on - /// [`Some(ConnectionError::Transport(TransportError::NoError))`]. - fn is_closed(&self) -> Result; + fn is_closed(&self) -> Result; fn stats(&self) -> neqo_transport::Stats; } @@ -381,16 +383,19 @@ impl<'a, H: Handler> Runner<'a, H> { continue; } + #[allow(clippy::match_same_arms)] match (handler_done, self.client.is_closed()?) { // more work (false, _) => {} // no more work, closing connection - (true, false) => { + (true, CloseState::NotClosing) => { self.client.close(Instant::now(), 0, "kthxbye!"); continue; } + // no more work, already closing connection + (true, CloseState::Closing) => {} // no more work, connection closed, terminating - (true, true) => break, + (true, CloseState::Closed) => break, } match ready(self.socket, self.timeout.as_mut()).await? { diff --git a/neqo-crypto/src/aead_null.rs b/neqo-crypto/src/aead_null.rs index 2d5656de73..6fcb72871f 100644 --- a/neqo-crypto/src/aead_null.rs +++ b/neqo-crypto/src/aead_null.rs @@ -4,8 +4,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![cfg(feature = "disable-encryption")] - use std::fmt; use crate::{ diff --git a/neqo-crypto/src/agentio.rs b/neqo-crypto/src/agentio.rs index 7c57a0ef45..3beede5c12 100644 --- a/neqo-crypto/src/agentio.rs +++ b/neqo-crypto/src/agentio.rs @@ -29,7 +29,7 @@ const PR_FAILURE: PrStatus = prio::PRStatus::PR_FAILURE; /// Convert a pinned, boxed object into a void pointer. pub fn as_c_void(pin: &mut Pin>) -> *mut c_void { - (Pin::into_inner(pin.as_mut()) as *mut T).cast() + (std::ptr::from_mut::(Pin::into_inner(pin.as_mut()))).cast() } /// A slice of the output. diff --git a/neqo-crypto/src/constants.rs b/neqo-crypto/src/constants.rs index 76db972290..daef3d3c56 100644 --- a/neqo-crypto/src/constants.rs +++ b/neqo-crypto/src/constants.rs @@ -27,7 +27,7 @@ pub const TLS_EPOCH_APPLICATION_DATA: Epoch = 3_u16; macro_rules! remap_enum { { $t:ident: $s:ty { $( $n:ident = $v:path ),+ $(,)? } } => { pub type $t = $s; - $( pub const $n: $t = $v as $t; )+ + $(#[allow(clippy::cast_possible_truncation)] pub const $n: $t = $v as $t; )+ }; { $t:ident: $s:ty => $e:ident { $( $n:ident = $v:ident ),+ $(,)? } } => { remap_enum!{ $t: $s { $( $n = $e::$v ),+ } } diff --git a/neqo-http3/src/buffered_send_stream.rs b/neqo-http3/src/buffered_send_stream.rs index 4f6761fa80..60da0512b5 100644 --- a/neqo-http3/src/buffered_send_stream.rs +++ b/neqo-http3/src/buffered_send_stream.rs @@ -7,7 +7,7 @@ use neqo_common::qtrace; use neqo_transport::{Connection, StreamId}; -use crate::Res; +use crate::{qlog, Res}; #[derive(Debug, PartialEq, Eq)] pub enum BufferedStream { @@ -38,7 +38,7 @@ impl BufferedStream { /// # Panics /// - /// If the `BufferedStream` is initialized more than one it will panic. + /// If the `BufferedStream` is initialized more than once, it will panic. pub fn init(&mut self, stream_id: StreamId) { debug_assert!(&Self::Uninitialized == self); *self = Self::Initialized { @@ -63,19 +63,23 @@ impl BufferedStream { /// Returns `neqo_transport` errors. pub fn send_buffer(&mut self, conn: &mut Connection) -> Res { let label = ::neqo_common::log_subject!(::log::Level::Debug, self); - let mut sent = 0; - if let Self::Initialized { stream_id, buf } = self { - if !buf.is_empty() { - qtrace!([label], "sending data."); - sent = conn.stream_send(*stream_id, &buf[..])?; - if sent == buf.len() { - buf.clear(); - } else { - let b = buf.split_off(sent); - *buf = b; - } - } + let Self::Initialized { stream_id, buf } = self else { + return Ok(0); + }; + if buf.is_empty() { + return Ok(0); + } + qtrace!([label], "sending data."); + let sent = conn.stream_send(*stream_id, &buf[..])?; + if sent == 0 { + return Ok(0); + } else if sent == buf.len() { + buf.clear(); + } else { + let b = buf.split_off(sent); + *buf = b; } + qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, sent); Ok(sent) } @@ -85,16 +89,17 @@ impl BufferedStream { pub fn send_atomic(&mut self, conn: &mut Connection, to_send: &[u8]) -> Res { // First try to send anything that is in the buffer. self.send_buffer(conn)?; - if let Self::Initialized { stream_id, buf } = self { - if buf.is_empty() { - let res = conn.stream_send_atomic(*stream_id, to_send)?; - Ok(res) - } else { - Ok(false) - } - } else { - Ok(false) + let Self::Initialized { stream_id, buf } = self else { + return Ok(false); + }; + if !buf.is_empty() { + return Ok(false); + } + let res = conn.stream_send_atomic(*stream_id, to_send)?; + if res { + qlog::h3_data_moved_down(conn.qlog_mut(), *stream_id, to_send.len()); } + Ok(res) } #[must_use] diff --git a/neqo-http3/src/connection.rs b/neqo-http3/src/connection.rs index dd45797baa..d14eb6f2a5 100644 --- a/neqo-http3/src/connection.rs +++ b/neqo-http3/src/connection.rs @@ -17,7 +17,7 @@ use std::{ use neqo_common::{qdebug, qerror, qinfo, qtrace, qwarn, Decoder, Header, MessageType, Role}; use neqo_qpack::{decoder::QPackDecoder, encoder::QPackEncoder}; use neqo_transport::{ - streams::SendOrder, AppError, Connection, ConnectionError, DatagramTracking, State, StreamId, + streams::SendOrder, AppError, CloseReason, Connection, DatagramTracking, State, StreamId, StreamType, ZeroRttState, }; @@ -81,22 +81,22 @@ enum Http3RemoteSettingsState { /// - `ZeroRtt`: 0-RTT has been enabled and is active /// - Connected /// - GoingAway(StreamId): The connection has received a `GOAWAY` frame -/// - Closing(ConnectionError): The connection is closed. The closing has been initiated by this end -/// of the connection, e.g., the `CONNECTION_CLOSE` frame has been sent. In this state, the +/// - Closing(CloseReason): The connection is closed. The closing has been initiated by this end of +/// the connection, e.g., the `CONNECTION_CLOSE` frame has been sent. In this state, the /// connection waits a certain amount of time to retransmit the `CONNECTION_CLOSE` frame if /// needed. -/// - Closed(ConnectionError): This is the final close state: closing has been initialized by the -/// peer and an ack for the `CONNECTION_CLOSE` frame has been sent or the closing has been -/// initiated by this end of the connection and the ack for the `CONNECTION_CLOSE` has been -/// received or the waiting time has passed. +/// - Closed(CloseReason): This is the final close state: closing has been initialized by the peer +/// and an ack for the `CONNECTION_CLOSE` frame has been sent or the closing has been initiated by +/// this end of the connection and the ack for the `CONNECTION_CLOSE` has been received or the +/// waiting time has passed. #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Clone)] pub enum Http3State { Initializing, ZeroRtt, Connected, GoingAway(StreamId), - Closing(ConnectionError), - Closed(ConnectionError), + Closing(CloseReason), + Closed(CloseReason), } impl Http3State { @@ -767,7 +767,7 @@ impl Http3Connection { /// This is called when an application closes the connection. pub fn close(&mut self, error: AppError) { qdebug!([self], "Close connection error {:?}.", error); - self.state = Http3State::Closing(ConnectionError::Application(error)); + self.state = Http3State::Closing(CloseReason::Application(error)); if (!self.send_streams.is_empty() || !self.recv_streams.is_empty()) && (error == 0) { qwarn!("close(0) called when streams still active"); } diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index 311bf1ea52..2a34be4e77 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -1291,8 +1291,8 @@ mod tests { use neqo_crypto::{AllowZeroRtt, AntiReplay, ResumptionToken}; use neqo_qpack::{encoder::QPackEncoder, QpackSettings}; use neqo_transport::{ - ConnectionError, ConnectionEvent, ConnectionParameters, Output, State, StreamId, - StreamType, Version, INITIAL_RECV_BUFFER_SIZE, INITIAL_SEND_BUFFER_SIZE, + CloseReason, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType, + Version, INITIAL_RECV_BUFFER_SIZE, INITIAL_SEND_BUFFER_SIZE, }; use test_fixture::{ anti_replay, default_server_h3, fixture_init, new_server, now, @@ -1314,7 +1314,7 @@ mod tests { fn assert_closed(client: &Http3Client, expected: &Error) { match client.state() { Http3State::Closing(err) | Http3State::Closed(err) => { - assert_eq!(err, ConnectionError::Application(expected.code())); + assert_eq!(err, CloseReason::Application(expected.code())); } _ => panic!("Wrong state {:?}", client.state()), }; @@ -4419,7 +4419,7 @@ mod tests { HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], - &Http3State::Closing(ConnectionError::Application(265)), + &Http3State::Closing(CloseReason::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } @@ -4437,7 +4437,7 @@ mod tests { HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], - &Http3State::Closing(ConnectionError::Application(265)), + &Http3State::Closing(CloseReason::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } @@ -4474,7 +4474,7 @@ mod tests { HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], - &Http3State::Closing(ConnectionError::Application(514)), + &Http3State::Closing(CloseReason::Application(514)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } @@ -4493,7 +4493,7 @@ mod tests { HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], - &Http3State::Closing(ConnectionError::Application(265)), + &Http3State::Closing(CloseReason::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } @@ -4531,7 +4531,7 @@ mod tests { HSetting::new(HSettingType::BlockedStreams, 50), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], - &Http3State::Closing(ConnectionError::Application(265)), + &Http3State::Closing(CloseReason::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } @@ -4569,7 +4569,7 @@ mod tests { HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 5000), ], - &Http3State::Closing(ConnectionError::Application(265)), + &Http3State::Closing(CloseReason::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } @@ -4626,7 +4626,7 @@ mod tests { HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], - &Http3State::Closing(ConnectionError::Application(265)), + &Http3State::Closing(CloseReason::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } diff --git a/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs b/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs index 27f669861d..9b54f1dc46 100644 --- a/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs +++ b/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs @@ -8,7 +8,7 @@ use std::time::Duration; use neqo_common::{event::Provider, Encoder}; use neqo_crypto::AuthenticationStatus; -use neqo_transport::{Connection, ConnectionError, StreamType}; +use neqo_transport::{CloseReason, Connection, StreamType}; use test_fixture::{default_server_h3, now}; use super::{connect, default_http3_client, default_http3_server, exchange_packets}; @@ -270,10 +270,7 @@ fn wrong_setting_value() { exchange_packets2(&mut client, &mut server); match client.state() { Http3State::Closing(err) | Http3State::Closed(err) => { - assert_eq!( - err, - ConnectionError::Application(Error::HttpSettings.code()) - ); + assert_eq!(err, CloseReason::Application(Error::HttpSettings.code())); } _ => panic!("Wrong state {:?}", client.state()), }; diff --git a/neqo-http3/src/send_message.rs b/neqo-http3/src/send_message.rs index 15965c44f6..6553d20432 100644 --- a/neqo-http3/src/send_message.rs +++ b/neqo-http3/src/send_message.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{cell::RefCell, cmp::min, fmt::Debug, rc::Rc}; +use std::{cell::RefCell, cmp::min, fmt::Debug, num::NonZeroUsize, rc::Rc}; use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType}; use neqo_qpack::encoder::QPackEncoder; @@ -13,10 +13,11 @@ use neqo_transport::{Connection, StreamId}; use crate::{ frames::HFrame, headers_checks::{headers_valid, is_interim, trailers_valid}, - qlog, BufferedStream, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpSendStream, Res, + BufferedStream, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpSendStream, Res, SendStream, SendStreamEvents, Stream, }; +const MIN_DATA_FRAME_SIZE: usize = 3; // Minimal DATA frame size: 2 (header) + 1 (payload) const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2 const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header) const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3 @@ -177,7 +178,14 @@ impl SendStream for SendMessage { let available = conn .stream_avail_send_space(self.stream_id()) .map_err(|e| Error::map_stream_send_errors(&e.into()))?; - if available <= 2 { + if available < MIN_DATA_FRAME_SIZE { + // Setting this once, instead of every time the available send space + // is exhausted, would suffice. That said, function call should be + // cheap, thus not worth optimizing. + conn.stream_set_writable_event_low_watermark( + self.stream_id(), + NonZeroUsize::new(MIN_DATA_FRAME_SIZE).unwrap(), + )?; return Ok(0); } let to_send = if available <= MAX_DATA_HEADER_SIZE_2_LIMIT { @@ -216,7 +224,6 @@ impl SendStream for SendMessage { .send_atomic(conn, &buf[..to_send]) .map_err(|e| Error::map_stream_send_errors(&e))?; debug_assert!(sent); - qlog::h3_data_moved_down(conn.qlog_mut(), self.stream_id(), to_send); Ok(to_send) } @@ -243,7 +250,6 @@ impl SendStream for SendMessage { /// info that the stream has been closed.) fn send(&mut self, conn: &mut Connection) -> Res<()> { let sent = Error::map_error(self.stream.send_buffer(conn), Error::HttpInternal(5))?; - qlog::h3_data_moved_down(conn.qlog_mut(), self.stream_id(), sent); qtrace!([self], "{} bytes sent", sent); if !self.stream.has_buffered_data() { diff --git a/neqo-http3/src/server.rs b/neqo-http3/src/server.rs index 1396a4e4cf..8fce803fb3 100644 --- a/neqo-http3/src/server.rs +++ b/neqo-http3/src/server.rs @@ -323,7 +323,7 @@ mod tests { use neqo_crypto::{AuthenticationStatus, ZeroRttCheckResult, ZeroRttChecker}; use neqo_qpack::{encoder::QPackEncoder, QpackSettings}; use neqo_transport::{ - Connection, ConnectionError, ConnectionEvent, State, StreamId, StreamType, ZeroRttState, + CloseReason, Connection, ConnectionEvent, State, StreamId, StreamType, ZeroRttState, }; use test_fixture::{ anti_replay, default_client, fixture_init, now, CountingConnectionIdGenerator, @@ -366,7 +366,7 @@ mod tests { } fn assert_closed(hconn: &mut Http3Server, expected: &Error) { - let err = ConnectionError::Application(expected.code()); + let err = CloseReason::Application(expected.code()); let closed = |e| matches!(e, Http3ServerEvent::StateChange{ state: Http3State::Closing(e) | Http3State::Closed(e), .. } if e == err); assert!(hconn.events().any(closed)); } diff --git a/neqo-http3/src/server_events.rs b/neqo-http3/src/server_events.rs index 214a48c757..119d9f9f39 100644 --- a/neqo-http3/src/server_events.rs +++ b/neqo-http3/src/server_events.rs @@ -84,6 +84,19 @@ impl StreamHandler { .send_data(self.stream_id(), buf, &mut self.conn.borrow_mut()) } + /// Bytes sendable on stream at the QUIC layer. + /// + /// Note that this does not yet account for HTTP3 frame headers. + /// + /// # Errors + /// + /// It may return `InvalidStreamId` if a stream does not exist anymore. + pub fn available(&mut self) -> Res { + let stream_id = self.stream_id(); + let n = self.conn.borrow_mut().stream_avail_send_space(stream_id)?; + Ok(n) + } + /// Close sending side. /// /// # Errors diff --git a/neqo-http3/tests/httpconn.rs b/neqo-http3/tests/httpconn.rs index a0b2bcdb80..8b9e7b42e8 100644 --- a/neqo-http3/tests/httpconn.rs +++ b/neqo-http3/tests/httpconn.rs @@ -17,7 +17,7 @@ use neqo_http3::{ Header, Http3Client, Http3ClientEvent, Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, Http3State, Priority, }; -use neqo_transport::{ConnectionError, ConnectionParameters, Error, Output, StreamType}; +use neqo_transport::{CloseReason, ConnectionParameters, Error, Output, StreamType}; use test_fixture::*; const RESPONSE_DATA: &[u8] = &[0x61, 0x62, 0x63]; @@ -246,6 +246,83 @@ fn test_103_response() { process_client_events(&mut hconn_c); } +/// Test [`neqo_http3::SendMessage::send_data`] to set +/// [`neqo_transport::SendStream::set_writable_event_low_watermark`]. +#[allow(clippy::cast_possible_truncation)] +#[test] +fn test_data_writable_events_low_watermark() -> Result<(), Box> { + const STREAM_LIMIT: u64 = 5000; + const DATA_FRAME_HEADER_SIZE: usize = 3; + + // Create a client and a server. + let mut hconn_c = http3_client_with_params(Http3Parameters::default().connection_parameters( + ConnectionParameters::default().max_stream_data(StreamType::BiDi, false, STREAM_LIMIT), + )); + let mut hconn_s = default_http3_server(); + mem::drop(connect_peers(&mut hconn_c, &mut hconn_s)); + + // Client sends GET to server. + let stream_id = hconn_c.fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default(), + )?; + hconn_c.stream_close_send(stream_id)?; + exchange_packets(&mut hconn_c, &mut hconn_s, None); + + // Server receives GET and responds with headers. + let mut request = receive_request(&mut hconn_s).unwrap(); + request.send_headers(&[Header::new(":status", "200")])?; + + // Sending these headers clears the server's send stream buffer and thus + // emits a DataWritable event. + exchange_packets(&mut hconn_c, &mut hconn_s, None); + let data_writable = |e| { + matches!( + e, + Http3ServerEvent::DataWritable { + stream + } if stream.stream_id() == stream_id + ) + }; + assert!(hconn_s.events().any(data_writable)); + + // Have server fill entire send buffer minus 1 byte. + let all_but_one = request.available()? - DATA_FRAME_HEADER_SIZE - 1; + let buf = vec![1; all_but_one]; + let sent = request.send_data(&buf)?; + assert_eq!(sent, all_but_one); + assert_eq!(request.available()?, 1); + + // Sending the buffered data clears the send stream buffer and thus emits a + // DataWritable event. + exchange_packets(&mut hconn_c, &mut hconn_s, None); + assert!(hconn_s.events().any(data_writable)); + + // Sending more fails, given that each data frame needs to be preceeded by a + // header, i.e. needs more than 1 byte of send space to send 1 byte payload. + assert_eq!(request.available()?, 1); + assert_eq!(request.send_data(&buf)?, 0); + + // Have the client read all the pending data. + let mut recv_buf = vec![0_u8; all_but_one]; + let (recvd, _) = hconn_c.read_data(now(), stream_id, &mut recv_buf)?; + assert_eq!(sent, recvd); + exchange_packets(&mut hconn_c, &mut hconn_s, None); + + // Expect the server's available send space to be back to the stream limit. + assert_eq!(request.available()?, STREAM_LIMIT as usize); + + // Expect the server to emit a DataWritable event, even though it always had + // at least 1 byte available to send, i.e. it never exhausted the entire + // available send space. + assert!(hconn_s.events().any(data_writable)); + + Ok(()) +} + #[test] fn test_data_writable_events() { const STREAM_LIMIT: u64 = 5000; @@ -448,7 +525,7 @@ fn fetch_noresponse_will_idletimeout() { if let Http3ClientEvent::StateChange(state) = event { match state { Http3State::Closing(error_code) | Http3State::Closed(error_code) => { - assert_eq!(error_code, ConnectionError::Transport(Error::IdleTimeout)); + assert_eq!(error_code, CloseReason::Transport(Error::IdleTimeout)); done = true; } _ => {} diff --git a/neqo-transport/Cargo.toml b/neqo-transport/Cargo.toml index 6095f3ac92..2abdbbfd95 100644 --- a/neqo-transport/Cargo.toml +++ b/neqo-transport/Cargo.toml @@ -14,7 +14,7 @@ workspace = true [dependencies] # Sync with https://searchfox.org/mozilla-central/source/Cargo.lock 2024-02-08 enum-map = { version = "2.7", default-features = false } -indexmap = { version = "1.9", default-features = false } +indexmap = { version = "2.2", default-features = false } # See https://github.com/mozilla/neqo/issues/1858 log = { workspace = true } neqo-common = { path = "../neqo-common" } neqo-crypto = { path = "../neqo-crypto" } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 660d430087..fe68a45029 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -12,6 +12,7 @@ use std::{ fmt::{self, Debug}, iter, mem, net::{IpAddr, SocketAddr}, + num::NonZeroUsize, ops::RangeInclusive, rc::{Rc, Weak}, time::{Duration, Instant}, @@ -56,9 +57,9 @@ use crate::{ self, TransportParameter, TransportParameterId, TransportParameters, TransportParametersHandler, }, - tracking::{AckTracker, PacketNumberSpace, SentPacket}, + tracking::{AckTracker, PacketNumberSpace, RecvdPackets, SentPacket}, version::{Version, WireVersion}, - AppError, ConnectionError, Error, Res, StreamId, + AppError, CloseReason, Error, Res, StreamId, }; mod dump; @@ -889,7 +890,7 @@ impl Connection { let msg = format!("{v:?}"); #[cfg(not(debug_assertions))] let msg = ""; - let error = ConnectionError::Transport(v.clone()); + let error = CloseReason::Transport(v.clone()); match &self.state { State::Closing { error: err, .. } | State::Draining { error: err, .. } @@ -960,9 +961,7 @@ impl Connection { let pto = self.pto(); if self.idle_timeout.expired(now, pto) { qinfo!([self], "idle timeout expired"); - self.set_state(State::Closed(ConnectionError::Transport( - Error::IdleTimeout, - ))); + self.set_state(State::Closed(CloseReason::Transport(Error::IdleTimeout))); return; } @@ -1206,7 +1205,7 @@ impl Connection { qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..])); self.state_signaling.reset(); self.set_state(State::Draining { - error: ConnectionError::Transport(Error::StatelessReset), + error: CloseReason::Transport(Error::StatelessReset), timeout: self.get_closing_period_time(now), }); Err(Error::StatelessReset) @@ -1283,7 +1282,7 @@ impl Connection { } else { qinfo!([self], "Version negotiation: failed with {:?}", supported); // This error goes straight to closed. - self.set_state(State::Closed(ConnectionError::Transport( + self.set_state(State::Closed(CloseReason::Transport( Error::VersionNegotiation, ))); Err(Error::VersionNegotiation) @@ -2201,6 +2200,40 @@ impl Connection { (tokens, ack_eliciting, padded) } + fn write_closing_frames( + &mut self, + close: &ClosingFrame, + builder: &mut PacketBuilder, + space: PacketNumberSpace, + now: Instant, + path: &PathRef, + tokens: &mut Vec, + ) { + if builder.remaining() > ClosingFrame::MIN_LENGTH + RecvdPackets::USEFUL_ACK_LEN { + // Include an ACK frame with the CONNECTION_CLOSE. + let limit = builder.limit(); + builder.set_limit(limit - ClosingFrame::MIN_LENGTH); + self.acks.immediate_ack(now); + self.acks.write_frame( + space, + now, + path.borrow().rtt().estimate(), + builder, + tokens, + &mut self.stats.borrow_mut().frame_tx, + ); + builder.set_limit(limit); + } + // CloseReason::Application is only allowed at 1RTT. + let sanitized = if space == PacketNumberSpace::ApplicationData { + None + } else { + close.sanitize() + }; + sanitized.as_ref().unwrap_or(close).write_frame(builder); + self.stats.borrow_mut().frame_tx.connection_close += 1; + } + /// Build a datagram, possibly from multiple packets (for different PN /// spaces) and each containing 1+ frames. #[allow(clippy::too_many_lines)] // Yeah, that's just the way it is. @@ -2264,17 +2297,7 @@ impl Connection { let payload_start = builder.len(); let (mut tokens, mut ack_eliciting, mut padded) = (Vec::new(), false, false); if let Some(ref close) = closing_frame { - // ConnectionError::Application is only allowed at 1RTT. - let sanitized = if *space == PacketNumberSpace::ApplicationData { - None - } else { - close.sanitize() - }; - sanitized - .as_ref() - .unwrap_or(close) - .write_frame(&mut builder); - self.stats.borrow_mut().frame_tx.connection_close += 1; + self.write_closing_frames(close, &mut builder, *space, now, path, &mut tokens); } else { (tokens, ack_eliciting, padded) = self.write_frames(path, *space, &profile, &mut builder, now); @@ -2417,7 +2440,7 @@ impl Connection { /// Close the connection. pub fn close(&mut self, now: Instant, app_error: AppError, msg: impl AsRef) { - let error = ConnectionError::Application(app_error); + let error = CloseReason::Application(app_error); let timeout = self.get_closing_period_time(now); if let Some(path) = self.paths.primary() { self.state_signaling.close(path, error.clone(), 0, msg); @@ -2816,7 +2839,6 @@ impl Connection { reason_phrase, } => { self.stats.borrow_mut().frame_rx.connection_close += 1; - let reason_phrase = String::from_utf8_lossy(&reason_phrase); qinfo!( [self], "ConnectionClose received. Error code: {:?} frame type {:x} reason {}", @@ -2837,7 +2859,7 @@ impl Connection { FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT, ) }; - let error = ConnectionError::Transport(detail); + let error = CloseReason::Transport(detail); self.state_signaling .drain(Rc::clone(path), error.clone(), frame_type, ""); self.set_state(State::Draining { @@ -3175,6 +3197,34 @@ impl Connection { Ok(self.streams.get_send_stream(stream_id)?.avail()) } + /// Set low watermark for [`ConnectionEvent::SendStreamWritable`] event. + /// + /// Stream emits a [`crate::ConnectionEvent::SendStreamWritable`] event + /// when: + /// - the available sendable bytes increased to or above the watermark + /// - and was previously below the watermark. + /// + /// Default value is `1`. In other words + /// [`crate::ConnectionEvent::SendStreamWritable`] is emitted whenever the + /// available sendable bytes was previously at `0` and now increased to `1` + /// or more. + /// + /// Use this when your protocol needs at least `watermark` amount of available + /// sendable bytes to make progress. + /// + /// # Errors + /// When the stream ID is invalid. + pub fn stream_set_writable_event_low_watermark( + &mut self, + stream_id: StreamId, + watermark: NonZeroUsize, + ) -> Res<()> { + self.streams + .get_send_stream_mut(stream_id)? + .set_writable_event_low_watermark(watermark); + Ok(()) + } + /// Close the stream. Enqueued data will be sent. /// # Errors /// When the stream ID is invalid. diff --git a/neqo-transport/src/connection/state.rs b/neqo-transport/src/connection/state.rs index cc2f6e30d2..e76f937938 100644 --- a/neqo-transport/src/connection/state.rs +++ b/neqo-transport/src/connection/state.rs @@ -21,7 +21,7 @@ use crate::{ packet::PacketBuilder, path::PathRef, recovery::RecoveryToken, - ConnectionError, Error, + CloseReason, Error, }; #[derive(Clone, Debug, PartialEq, Eq)] @@ -42,14 +42,14 @@ pub enum State { Connected, Confirmed, Closing { - error: ConnectionError, + error: CloseReason, timeout: Instant, }, Draining { - error: ConnectionError, + error: CloseReason, timeout: Instant, }, - Closed(ConnectionError), + Closed(CloseReason), } impl State { @@ -67,7 +67,7 @@ impl State { } #[must_use] - pub fn error(&self) -> Option<&ConnectionError> { + pub fn error(&self) -> Option<&CloseReason> { if let Self::Closing { error, .. } | Self::Draining { error, .. } | Self::Closed(error) = self { @@ -116,7 +116,7 @@ impl Ord for State { #[derive(Debug, Clone)] pub struct ClosingFrame { path: PathRef, - error: ConnectionError, + error: CloseReason, frame_type: FrameType, reason_phrase: Vec, } @@ -124,7 +124,7 @@ pub struct ClosingFrame { impl ClosingFrame { fn new( path: PathRef, - error: ConnectionError, + error: CloseReason, frame_type: FrameType, message: impl AsRef, ) -> Self { @@ -142,12 +142,12 @@ impl ClosingFrame { } pub fn sanitize(&self) -> Option { - if let ConnectionError::Application(_) = self.error { + if let CloseReason::Application(_) = self.error { // The default CONNECTION_CLOSE frame that is sent when an application // error code needs to be sent in an Initial or Handshake packet. Some(Self { path: Rc::clone(&self.path), - error: ConnectionError::Transport(Error::ApplicationError), + error: CloseReason::Transport(Error::ApplicationError), frame_type: 0, reason_phrase: Vec::new(), }) @@ -156,19 +156,22 @@ impl ClosingFrame { } } + /// Length of a closing frame with a truncated `reason_length`. Allow 8 bytes for the reason + /// phrase to ensure that if it needs to be truncated there is still at least a few bytes of + /// the value. + pub const MIN_LENGTH: usize = 1 + 8 + 8 + 2 + 8; + pub fn write_frame(&self, builder: &mut PacketBuilder) { - // Allow 8 bytes for the reason phrase to ensure that if it needs to be - // truncated there is still at least a few bytes of the value. - if builder.remaining() < 1 + 8 + 8 + 2 + 8 { + if builder.remaining() < ClosingFrame::MIN_LENGTH { return; } match &self.error { - ConnectionError::Transport(e) => { + CloseReason::Transport(e) => { builder.encode_varint(FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT); builder.encode_varint(e.code()); builder.encode_varint(self.frame_type); } - ConnectionError::Application(code) => { + CloseReason::Application(code) => { builder.encode_varint(FRAME_TYPE_CONNECTION_CLOSE_APPLICATION); builder.encode_varint(*code); } @@ -209,10 +212,6 @@ pub enum StateSignaling { impl StateSignaling { pub fn handshake_done(&mut self) { if !matches!(self, Self::Idle) { - debug_assert!( - false, - "StateSignaling must be in Idle state but is in {self:?} state.", - ); return; } *self = Self::HandshakeDone; @@ -231,7 +230,7 @@ impl StateSignaling { pub fn close( &mut self, path: PathRef, - error: ConnectionError, + error: CloseReason, frame_type: FrameType, message: impl AsRef, ) { @@ -243,7 +242,7 @@ impl StateSignaling { pub fn drain( &mut self, path: PathRef, - error: ConnectionError, + error: CloseReason, frame_type: FrameType, message: impl AsRef, ) { diff --git a/neqo-transport/src/connection/tests/close.rs b/neqo-transport/src/connection/tests/close.rs index 5351dd0d5c..7c620de17e 100644 --- a/neqo-transport/src/connection/tests/close.rs +++ b/neqo-transport/src/connection/tests/close.rs @@ -14,13 +14,13 @@ use super::{ }; use crate::{ tparams::{self, TransportParameter}, - AppError, ConnectionError, Error, ERROR_APPLICATION_CLOSE, + AppError, CloseReason, Error, ERROR_APPLICATION_CLOSE, }; fn assert_draining(c: &Connection, expected: &Error) { assert!(c.state().closed()); if let State::Draining { - error: ConnectionError::Transport(error), + error: CloseReason::Transport(error), .. } = c.state() { @@ -40,7 +40,14 @@ fn connection_close() { client.close(now, 42, ""); + let stats_before = client.stats().frame_tx; let out = client.process(None, now); + let stats_after = client.stats().frame_tx; + assert_eq!( + stats_after.connection_close, + stats_before.connection_close + 1 + ); + assert_eq!(stats_after.ack, stats_before.ack + 1); server.process_input(&out.dgram().unwrap(), now); assert_draining(&server, &Error::PeerApplicationError(42)); @@ -57,7 +64,14 @@ fn connection_close_with_long_reason_string() { let long_reason = String::from_utf8([0x61; 2048].to_vec()).unwrap(); client.close(now, 42, long_reason); + let stats_before = client.stats().frame_tx; let out = client.process(None, now); + let stats_after = client.stats().frame_tx; + assert_eq!( + stats_after.connection_close, + stats_before.connection_close + 1 + ); + assert_eq!(stats_after.ack, stats_before.ack + 1); server.process_input(&out.dgram().unwrap(), now); assert_draining(&server, &Error::PeerApplicationError(42)); @@ -100,7 +114,7 @@ fn bad_tls_version() { let dgram = server.process(dgram.as_ref(), now()).dgram(); assert_eq!( *server.state(), - State::Closed(ConnectionError::Transport(Error::ProtocolViolation)) + State::Closed(CloseReason::Transport(Error::ProtocolViolation)) ); assert!(dgram.is_some()); client.process_input(&dgram.unwrap(), now()); @@ -154,7 +168,6 @@ fn closing_and_draining() { assert!(client_close.is_some()); let client_close_timer = client.process(None, now()).callback(); assert_ne!(client_close_timer, Duration::from_secs(0)); - // The client will spit out the same packet in response to anything it receives. let p3 = send_something(&mut server, now()); let client_close2 = client.process(Some(&p3), now()).dgram(); @@ -168,7 +181,7 @@ fn closing_and_draining() { assert_eq!(end, Output::None); assert_eq!( *client.state(), - State::Closed(ConnectionError::Application(APP_ERROR)) + State::Closed(CloseReason::Application(APP_ERROR)) ); // When the server receives the close, it too should generate CONNECTION_CLOSE. @@ -186,7 +199,7 @@ fn closing_and_draining() { assert_eq!(end, Output::None); assert_eq!( *server.state(), - State::Closed(ConnectionError::Transport(Error::PeerApplicationError( + State::Closed(CloseReason::Transport(Error::PeerApplicationError( APP_ERROR ))) ); diff --git a/neqo-transport/src/connection/tests/datagram.rs b/neqo-transport/src/connection/tests/datagram.rs index f80c7d9104..f1b64b3c8f 100644 --- a/neqo-transport/src/connection/tests/datagram.rs +++ b/neqo-transport/src/connection/tests/datagram.rs @@ -19,7 +19,7 @@ use crate::{ packet::PacketBuilder, quic_datagrams::MAX_QUIC_DATAGRAM, send_stream::{RetransmissionPriority, TransmissionPriority}, - Connection, ConnectionError, ConnectionParameters, Error, StreamType, + CloseReason, Connection, ConnectionParameters, Error, StreamType, }; const DATAGRAM_LEN_MTU: u64 = 1310; @@ -362,10 +362,7 @@ fn dgram_no_allowed() { client.process_input(&out, now()); - assert_error( - &client, - &ConnectionError::Transport(Error::ProtocolViolation), - ); + assert_error(&client, &CloseReason::Transport(Error::ProtocolViolation)); } #[test] @@ -383,10 +380,7 @@ fn dgram_too_big() { client.process_input(&out, now()); - assert_error( - &client, - &ConnectionError::Transport(Error::ProtocolViolation), - ); + assert_error(&client, &CloseReason::Transport(Error::ProtocolViolation)); } #[test] diff --git a/neqo-transport/src/connection/tests/handshake.rs b/neqo-transport/src/connection/tests/handshake.rs index f2103523ec..c908340616 100644 --- a/neqo-transport/src/connection/tests/handshake.rs +++ b/neqo-transport/src/connection/tests/handshake.rs @@ -35,7 +35,7 @@ use crate::{ server::ValidateAddress, tparams::{TransportParameter, MIN_ACK_DELAY}, tracking::DEFAULT_ACK_DELAY, - ConnectionError, ConnectionParameters, EmptyConnectionIdGenerator, Error, StreamType, Version, + CloseReason, ConnectionParameters, EmptyConnectionIdGenerator, Error, StreamType, Version, }; const ECH_CONFIG_ID: u8 = 7; @@ -111,8 +111,8 @@ fn handshake_failed_authentication() { qdebug!("---- server: Alert(certificate_revoked)"); let out = server.process(out.as_dgram_ref(), now()); assert!(out.as_dgram_ref().is_some()); - assert_error(&client, &ConnectionError::Transport(Error::CryptoAlert(44))); - assert_error(&server, &ConnectionError::Transport(Error::PeerError(300))); + assert_error(&client, &CloseReason::Transport(Error::CryptoAlert(44))); + assert_error(&server, &CloseReason::Transport(Error::PeerError(300))); } #[test] @@ -133,11 +133,8 @@ fn no_alpn() { handshake(&mut client, &mut server, now(), Duration::new(0, 0)); // TODO (mt): errors are immediate, which means that we never send CONNECTION_CLOSE // and the client never sees the server's rejection of its handshake. - // assert_error(&client, ConnectionError::Transport(Error::CryptoAlert(120))); - assert_error( - &server, - &ConnectionError::Transport(Error::CryptoAlert(120)), - ); + // assert_error(&client, CloseReason::Transport(Error::CryptoAlert(120))); + assert_error(&server, &CloseReason::Transport(Error::CryptoAlert(120))); } #[test] @@ -934,10 +931,10 @@ fn ech_retry() { server.process_input(&dgram.unwrap(), now()); assert_eq!( server.state().error(), - Some(&ConnectionError::Transport(Error::PeerError(0x100 + 121))) + Some(&CloseReason::Transport(Error::PeerError(0x100 + 121))) ); - let Some(ConnectionError::Transport(Error::EchRetry(updated_config))) = client.state().error() + let Some(CloseReason::Transport(Error::EchRetry(updated_config))) = client.state().error() else { panic!( "Client state should be failed with EchRetry, is {:?}", @@ -984,7 +981,7 @@ fn ech_retry_fallback_rejected() { client.authenticated(AuthenticationStatus::PolicyRejection, now()); assert!(client.state().error().is_some()); - if let Some(ConnectionError::Transport(Error::EchRetry(_))) = client.state().error() { + if let Some(CloseReason::Transport(Error::EchRetry(_))) = client.state().error() { panic!("Client should not get EchRetry error"); } @@ -993,14 +990,13 @@ fn ech_retry_fallback_rejected() { server.process_input(&dgram.unwrap(), now()); assert_eq!( server.state().error(), - Some(&ConnectionError::Transport(Error::PeerError(298))) + Some(&CloseReason::Transport(Error::PeerError(298))) ); // A bad_certificate alert. } #[test] fn bad_min_ack_delay() { - const EXPECTED_ERROR: ConnectionError = - ConnectionError::Transport(Error::TransportParameterError); + const EXPECTED_ERROR: CloseReason = CloseReason::Transport(Error::TransportParameterError); let mut server = default_server(); let max_ad = u64::try_from(DEFAULT_ACK_DELAY.as_micros()).unwrap(); server @@ -1018,7 +1014,7 @@ fn bad_min_ack_delay() { server.process_input(&dgram.unwrap(), now()); assert_eq!( server.state().error(), - Some(&ConnectionError::Transport(Error::PeerError( + Some(&CloseReason::Transport(Error::PeerError( Error::TransportParameterError.code() ))) ); diff --git a/neqo-transport/src/connection/tests/keys.rs b/neqo-transport/src/connection/tests/keys.rs index 847b253284..c2ae9529bf 100644 --- a/neqo-transport/src/connection/tests/keys.rs +++ b/neqo-transport/src/connection/tests/keys.rs @@ -11,7 +11,7 @@ use test_fixture::now; use super::{ super::{ - super::{ConnectionError, ERROR_AEAD_LIMIT_REACHED}, + super::{CloseReason, ERROR_AEAD_LIMIT_REACHED}, Connection, ConnectionParameters, Error, Output, State, StreamType, }, connect, connect_force_idle, default_client, default_server, maybe_authenticate, @@ -269,7 +269,7 @@ fn exhaust_write_keys() { assert!(dgram.is_none()); assert!(matches!( client.state(), - State::Closed(ConnectionError::Transport(Error::KeysExhausted)) + State::Closed(CloseReason::Transport(Error::KeysExhausted)) )); } @@ -285,14 +285,14 @@ fn exhaust_read_keys() { let dgram = server.process(Some(&dgram), now()).dgram(); assert!(matches!( server.state(), - State::Closed(ConnectionError::Transport(Error::KeysExhausted)) + State::Closed(CloseReason::Transport(Error::KeysExhausted)) )); client.process_input(&dgram.unwrap(), now()); assert!(matches!( client.state(), State::Draining { - error: ConnectionError::Transport(Error::PeerError(ERROR_AEAD_LIMIT_REACHED)), + error: CloseReason::Transport(Error::PeerError(ERROR_AEAD_LIMIT_REACHED)), .. } )); @@ -341,6 +341,6 @@ fn automatic_update_write_keys_blocked() { assert!(dgram.is_none()); assert!(matches!( client.state(), - State::Closed(ConnectionError::Transport(Error::KeysExhausted)) + State::Closed(CloseReason::Transport(Error::KeysExhausted)) )); } diff --git a/neqo-transport/src/connection/tests/migration.rs b/neqo-transport/src/connection/tests/migration.rs index 5f7136ca9f..779cc78c53 100644 --- a/neqo-transport/src/connection/tests/migration.rs +++ b/neqo-transport/src/connection/tests/migration.rs @@ -30,7 +30,7 @@ use crate::{ packet::PacketBuilder, path::{PATH_MTU_V4, PATH_MTU_V6}, tparams::{self, PreferredAddress, TransportParameter}, - ConnectionError, ConnectionId, ConnectionIdDecoder, ConnectionIdGenerator, ConnectionIdRef, + CloseReason, ConnectionId, ConnectionIdDecoder, ConnectionIdGenerator, ConnectionIdRef, ConnectionParameters, EmptyConnectionIdGenerator, Error, }; @@ -357,7 +357,7 @@ fn migrate_same_fail() { assert!(matches!(res, Output::None)); assert!(matches!( client.state(), - State::Closed(ConnectionError::Transport(Error::NoAvailablePath)) + State::Closed(CloseReason::Transport(Error::NoAvailablePath)) )); } @@ -894,7 +894,7 @@ fn retire_prior_to_migration_failure() { assert!(matches!( client.state(), State::Closing { - error: ConnectionError::Transport(Error::InvalidMigration), + error: CloseReason::Transport(Error::InvalidMigration), .. } )); diff --git a/neqo-transport/src/connection/tests/mod.rs b/neqo-transport/src/connection/tests/mod.rs index 59c3898660..65283b8eb8 100644 --- a/neqo-transport/src/connection/tests/mod.rs +++ b/neqo-transport/src/connection/tests/mod.rs @@ -17,7 +17,7 @@ use neqo_common::{event::Provider, qdebug, qtrace, Datagram, Decoder, Role}; use neqo_crypto::{random, AllowZeroRtt, AuthenticationStatus, ResumptionToken}; use test_fixture::{fixture_init, new_neqo_qlog, now, DEFAULT_ADDR}; -use super::{Connection, ConnectionError, ConnectionId, Output, State}; +use super::{CloseReason, Connection, ConnectionId, Output, State}; use crate::{ addr_valid::{AddressValidation, ValidateAddress}, cc::{CWND_INITIAL_PKTS, CWND_MIN}, @@ -245,8 +245,8 @@ fn connect_fail( server_error: Error, ) { handshake(client, server, now(), Duration::new(0, 0)); - assert_error(client, &ConnectionError::Transport(client_error)); - assert_error(server, &ConnectionError::Transport(server_error)); + assert_error(client, &CloseReason::Transport(client_error)); + assert_error(server, &CloseReason::Transport(server_error)); } fn connect_with_rtt_and_modifier( @@ -284,7 +284,7 @@ fn connect(client: &mut Connection, server: &mut Connection) { connect_with_rtt(client, server, now(), Duration::new(0, 0)); } -fn assert_error(c: &Connection, expected: &ConnectionError) { +fn assert_error(c: &Connection, expected: &CloseReason) { match c.state() { State::Closing { error, .. } | State::Draining { error, .. } | State::Closed(error) => { assert_eq!(*error, *expected, "{c} error mismatch"); diff --git a/neqo-transport/src/connection/tests/stream.rs b/neqo-transport/src/connection/tests/stream.rs index 1df3c56fb9..0957d120ef 100644 --- a/neqo-transport/src/connection/tests/stream.rs +++ b/neqo-transport/src/connection/tests/stream.rs @@ -19,9 +19,9 @@ use crate::{ send_stream::{OrderGroup, SendStreamState, INITIAL_SEND_BUFFER_SIZE}, streams::{SendOrder, StreamOrder}, tparams::{self, TransportParameter}, + CloseReason, // tracking::DEFAULT_ACK_PACKET_TOLERANCE, Connection, - ConnectionError, ConnectionParameters, Error, StreamId, @@ -494,12 +494,9 @@ fn exceed_max_data() { assert_error( &client, - &ConnectionError::Transport(Error::PeerError(Error::FlowControlError.code())), - ); - assert_error( - &server, - &ConnectionError::Transport(Error::FlowControlError), + &CloseReason::Transport(Error::PeerError(Error::FlowControlError.code())), ); + assert_error(&server, &CloseReason::Transport(Error::FlowControlError)); } #[test] diff --git a/neqo-transport/src/connection/tests/vn.rs b/neqo-transport/src/connection/tests/vn.rs index 93872a94f4..815868d78d 100644 --- a/neqo-transport/src/connection/tests/vn.rs +++ b/neqo-transport/src/connection/tests/vn.rs @@ -10,7 +10,7 @@ use neqo_common::{event::Provider, Decoder, Encoder}; use test_fixture::{assertions, datagram, now}; use super::{ - super::{ConnectionError, ConnectionEvent, Output, State, ZeroRttState}, + super::{CloseReason, ConnectionEvent, Output, State, ZeroRttState}, connect, connect_fail, default_client, default_server, exchange_ticket, new_client, new_server, send_something, }; @@ -124,7 +124,7 @@ fn version_negotiation_only_reserved() { assert_eq!(client.process(Some(&dgram), now()), Output::None); match client.state() { State::Closed(err) => { - assert_eq!(*err, ConnectionError::Transport(Error::VersionNegotiation)); + assert_eq!(*err, CloseReason::Transport(Error::VersionNegotiation)); } _ => panic!("Invalid client state"), } @@ -183,7 +183,7 @@ fn version_negotiation_not_supported() { assert_eq!(client.process(Some(&dgram), now()), Output::None); match client.state() { State::Closed(err) => { - assert_eq!(*err, ConnectionError::Transport(Error::VersionNegotiation)); + assert_eq!(*err, CloseReason::Transport(Error::VersionNegotiation)); } _ => panic!("Invalid client state"), } @@ -338,7 +338,7 @@ fn invalid_server_version() { // The server effectively hasn't reacted here. match server.state() { State::Closed(err) => { - assert_eq!(*err, ConnectionError::Transport(Error::CryptoAlert(47))); + assert_eq!(*err, CloseReason::Transport(Error::CryptoAlert(47))); } _ => panic!("invalid server state"), } diff --git a/neqo-transport/src/events.rs b/neqo-transport/src/events.rs index a892e384b9..68ef0d6798 100644 --- a/neqo-transport/src/events.rs +++ b/neqo-transport/src/events.rs @@ -256,7 +256,7 @@ impl EventProvider for ConnectionEvents { mod tests { use neqo_common::event::Provider; - use crate::{ConnectionError, ConnectionEvent, ConnectionEvents, Error, State, StreamId}; + use crate::{CloseReason, ConnectionEvent, ConnectionEvents, Error, State, StreamId}; #[test] fn event_culling() { @@ -314,7 +314,7 @@ mod tests { evts.send_stream_writable(9.into()); evts.send_stream_stop_sending(10.into(), 55); - evts.connection_state_change(State::Closed(ConnectionError::Transport( + evts.connection_state_change(State::Closed(CloseReason::Transport( Error::StreamStateError, ))); assert_eq!(evts.events().count(), 1); diff --git a/neqo-transport/src/fc.rs b/neqo-transport/src/fc.rs index 730badcd3a..a80df7b1e5 100644 --- a/neqo-transport/src/fc.rs +++ b/neqo-transport/src/fc.rs @@ -71,17 +71,18 @@ where } } - /// Update the maximum. Returns `true` if the change was an increase. + /// Update the maximum. Returns `Some` with the updated available flow + /// control if the change was an increase and `None` otherwise. // // TODO: Impose a limit? Otherwise attacker can set large max thus local node allocates large send buffer. - pub fn update(&mut self, limit: u64) -> bool { + pub fn update(&mut self, limit: u64) -> Option { debug_assert!(limit < u64::MAX); if limit > self.limit { self.limit = limit; self.blocked_frame = false; - true + Some(self.available()) } else { - false + None } } diff --git a/neqo-transport/src/frame.rs b/neqo-transport/src/frame.rs index eba7009d4b..7d009f3b46 100644 --- a/neqo-transport/src/frame.rs +++ b/neqo-transport/src/frame.rs @@ -15,7 +15,7 @@ use crate::{ ecn::EcnCount, packet::PacketType, stream_id::{StreamId, StreamType}, - AppError, ConnectionError, Error, Res, TransportError, + AppError, CloseReason, Error, Res, TransportError, }; #[allow(clippy::module_name_repetitions)] @@ -87,11 +87,11 @@ impl CloseError { } } -impl From for CloseError { - fn from(err: ConnectionError) -> Self { +impl From for CloseError { + fn from(err: CloseReason) -> Self { match err { - ConnectionError::Transport(c) => Self::Transport(c.code()), - ConnectionError::Application(c) => Self::Application(c), + CloseReason::Transport(c) => Self::Transport(c.code()), + CloseReason::Application(c) => Self::Application(c), } } } @@ -184,7 +184,7 @@ pub enum Frame<'a> { frame_type: u64, // Not a reference as we use this to hold the value. // This is not used in optimized builds anyway. - reason_phrase: Vec, + reason_phrase: String, }, HandshakeDone, AckFrequency { @@ -614,7 +614,7 @@ impl<'a> Frame<'a> { 0 }; // We can tolerate this copy for now. - let reason_phrase = d(dec.decode_vvec())?.to_vec(); + let reason_phrase = String::from_utf8_lossy(d(dec.decode_vvec())?).to_string(); Ok(Self::ConnectionClose { error_code, frame_type, @@ -925,7 +925,7 @@ mod tests { let f = Frame::ConnectionClose { error_code: CloseError::Transport(0x5678), frame_type: 0x1234, - reason_phrase: vec![0x01, 0x02, 0x03], + reason_phrase: String::from("\x01\x02\x03"), }; just_dec(&f, "1c80005678523403010203"); @@ -936,7 +936,7 @@ mod tests { let f = Frame::ConnectionClose { error_code: CloseError::Application(0x5678), frame_type: 0, - reason_phrase: vec![0x01, 0x02, 0x03], + reason_phrase: String::from("\x01\x02\x03"), }; just_dec(&f, "1d8000567803010203"); diff --git a/neqo-transport/src/lib.rs b/neqo-transport/src/lib.rs index d71641c9cd..2129ac9491 100644 --- a/neqo-transport/src/lib.rs +++ b/neqo-transport/src/lib.rs @@ -209,13 +209,17 @@ impl ::std::fmt::Display for Error { pub type AppError = u64; +#[deprecated(note = "use `CloseReason` instead")] +pub type ConnectionError = CloseReason; + +/// Reason why a connection closed. #[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq)] -pub enum ConnectionError { +pub enum CloseReason { Transport(Error), Application(AppError), } -impl ConnectionError { +impl CloseReason { #[must_use] pub fn app_code(&self) -> Option { match self { @@ -223,9 +227,19 @@ impl ConnectionError { Self::Transport(_) => None, } } + + /// Checks enclosed error for [`Error::NoError`] and + /// [`CloseReason::Application(0)`]. + #[must_use] + pub fn is_error(&self) -> bool { + !matches!( + self, + CloseReason::Transport(Error::NoError) | CloseReason::Application(0), + ) + } } -impl From for ConnectionError { +impl From for CloseReason { fn from(err: CloseError) -> Self { match err { CloseError::Transport(c) => Self::Transport(Error::PeerError(c)), diff --git a/neqo-transport/src/qlog.rs b/neqo-transport/src/qlog.rs index fa1d56815c..715ba85e81 100644 --- a/neqo-transport/src/qlog.rs +++ b/neqo-transport/src/qlog.rs @@ -205,7 +205,7 @@ pub fn packet_sent( let mut frames = SmallVec::new(); while d.remaining() > 0 { if let Ok(f) = Frame::decode(&mut d) { - frames.push(QuicFrame::from(&f)); + frames.push(QuicFrame::from(f)); } else { qinfo!("qlog: invalid frame"); break; @@ -293,7 +293,7 @@ pub fn packet_received( while d.remaining() > 0 { if let Ok(f) = Frame::decode(&mut d) { - frames.push(QuicFrame::from(&f)); + frames.push(QuicFrame::from(f)); } else { qinfo!("qlog: invalid frame"); break; @@ -387,12 +387,12 @@ pub fn metrics_updated(qlog: &mut NeqoQlog, updated_metrics: &[QlogMetric]) { #[allow(clippy::too_many_lines)] // Yeah, but it's a nice match. #[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)] // No choice here. -impl From<&Frame<'_>> for QuicFrame { - fn from(frame: &Frame) -> Self { +impl From> for QuicFrame { + fn from(frame: Frame) -> Self { match frame { Frame::Padding(len) => QuicFrame::Padding { length: None, - payload_length: u32::from(*len), + payload_length: u32::from(len), }, Frame::Ping => QuicFrame::Ping { length: None, @@ -406,7 +406,7 @@ impl From<&Frame<'_>> for QuicFrame { ecn_count, } => { let ranges = - Frame::decode_ack_frame(*largest_acknowledged, *first_ack_range, ack_ranges) + Frame::decode_ack_frame(largest_acknowledged, first_ack_range, &ack_ranges) .ok(); let acked_ranges = ranges.map(|all| { @@ -418,7 +418,7 @@ impl From<&Frame<'_>> for QuicFrame { }); QuicFrame::Ack { - ack_delay: Some(*ack_delay as f32 / 1000.0), + ack_delay: Some(ack_delay as f32 / 1000.0), acked_ranges, ect1: ecn_count.map(|c| c[IpTosEcn::Ect1]), ect0: ecn_count.map(|c| c[IpTosEcn::Ect0]), @@ -433,8 +433,8 @@ impl From<&Frame<'_>> for QuicFrame { final_size, } => QuicFrame::ResetStream { stream_id: stream_id.as_u64(), - error_code: *application_error_code, - final_size: *final_size, + error_code: application_error_code, + final_size, length: None, payload_length: None, }, @@ -443,12 +443,12 @@ impl From<&Frame<'_>> for QuicFrame { application_error_code, } => QuicFrame::StopSending { stream_id: stream_id.as_u64(), - error_code: *application_error_code, + error_code: application_error_code, length: None, payload_length: None, }, Frame::Crypto { offset, data } => QuicFrame::Crypto { - offset: *offset, + offset, length: data.len() as u64, }, Frame::NewToken { token } => QuicFrame::NewToken { @@ -470,20 +470,20 @@ impl From<&Frame<'_>> for QuicFrame { .. } => QuicFrame::Stream { stream_id: stream_id.as_u64(), - offset: *offset, + offset, length: data.len() as u64, - fin: Some(*fin), + fin: Some(fin), raw: None, }, Frame::MaxData { maximum_data } => QuicFrame::MaxData { - maximum: *maximum_data, + maximum: maximum_data, }, Frame::MaxStreamData { stream_id, maximum_stream_data, } => QuicFrame::MaxStreamData { stream_id: stream_id.as_u64(), - maximum: *maximum_stream_data, + maximum: maximum_stream_data, }, Frame::MaxStreams { stream_type, @@ -493,15 +493,15 @@ impl From<&Frame<'_>> for QuicFrame { NeqoStreamType::BiDi => StreamType::Bidirectional, NeqoStreamType::UniDi => StreamType::Unidirectional, }, - maximum: *maximum_streams, + maximum: maximum_streams, }, - Frame::DataBlocked { data_limit } => QuicFrame::DataBlocked { limit: *data_limit }, + Frame::DataBlocked { data_limit } => QuicFrame::DataBlocked { limit: data_limit }, Frame::StreamDataBlocked { stream_id, stream_data_limit, } => QuicFrame::StreamDataBlocked { stream_id: stream_id.as_u64(), - limit: *stream_data_limit, + limit: stream_data_limit, }, Frame::StreamsBlocked { stream_type, @@ -511,7 +511,7 @@ impl From<&Frame<'_>> for QuicFrame { NeqoStreamType::BiDi => StreamType::Bidirectional, NeqoStreamType::UniDi => StreamType::Unidirectional, }, - limit: *stream_limit, + limit: stream_limit, }, Frame::NewConnectionId { sequence_number, @@ -519,14 +519,14 @@ impl From<&Frame<'_>> for QuicFrame { connection_id, stateless_reset_token, } => QuicFrame::NewConnectionId { - sequence_number: *sequence_number as u32, - retire_prior_to: *retire_prior as u32, + sequence_number: sequence_number as u32, + retire_prior_to: retire_prior as u32, connection_id_length: Some(connection_id.len() as u8), connection_id: hex(connection_id), stateless_reset_token: Some(hex(stateless_reset_token)), }, Frame::RetireConnectionId { sequence_number } => QuicFrame::RetireConnectionId { - sequence_number: *sequence_number as u32, + sequence_number: sequence_number as u32, }, Frame::PathChallenge { data } => QuicFrame::PathChallenge { data: Some(hex(data)), @@ -545,8 +545,8 @@ impl From<&Frame<'_>> for QuicFrame { }, error_code: Some(error_code.code()), error_code_value: Some(0), - reason: Some(String::from_utf8_lossy(reason_phrase).to_string()), - trigger_frame_type: Some(*frame_type), + reason: Some(reason_phrase), + trigger_frame_type: Some(frame_type), }, Frame::HandshakeDone => QuicFrame::HandshakeDone, Frame::AckFrequency { .. } => QuicFrame::Unknown { diff --git a/neqo-transport/src/send_stream.rs b/neqo-transport/src/send_stream.rs index 6110a04ecc..ed3809e90c 100644 --- a/neqo-transport/src/send_stream.rs +++ b/neqo-transport/src/send_stream.rs @@ -12,6 +12,7 @@ use std::{ collections::{btree_map::Entry, BTreeMap, VecDeque}, hash::{Hash, Hasher}, mem, + num::NonZeroUsize, ops::Add, rc::Rc, }; @@ -696,6 +697,7 @@ pub struct SendStream { sendorder: Option, bytes_sent: u64, fair: bool, + writable_event_low_watermark: NonZeroUsize, } impl Hash for SendStream { @@ -712,6 +714,7 @@ impl PartialEq for SendStream { impl Eq for SendStream {} impl SendStream { + #[allow(clippy::missing_panics_doc)] // not possible pub fn new( stream_id: StreamId, max_stream_data: u64, @@ -731,6 +734,7 @@ impl SendStream { sendorder: None, bytes_sent: 0, fair: false, + writable_event_low_watermark: 1.try_into().unwrap(), }; if ss.avail() > 0 { ss.conn_events.send_stream_writable(stream_id); @@ -1111,13 +1115,12 @@ impl SendStream { #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) { match self.state { - SendStreamState::Send { - ref mut send_buf, .. - } => { - send_buf.mark_as_acked(offset, len); - if self.avail() > 0 { - self.conn_events.send_stream_writable(self.stream_id); - } + SendStreamState::Send { .. } => { + // TODO: Double check removing is fine here. + // let previous_limit = send_buf.avail(); + // send_buf.mark_as_acked(offset, len); + // let current_limit = send_buf.avail(); + // self.maybe_emit_writable_event(previous_limit, current_limit); } SendStreamState::DataSent { ref mut send_buf, @@ -1186,15 +1189,22 @@ impl SendStream { } } + /// Set low watermark for [`crate::ConnectionEvent::SendStreamWritable`] + /// event. + /// + /// See [`crate::Connection::stream_set_writable_event_low_watermark`]. + pub fn set_writable_event_low_watermark(&mut self, watermark: NonZeroUsize) { + self.writable_event_low_watermark = watermark; + } + pub fn set_max_stream_data(&mut self, limit: u64) { qdebug!("setting max_stream_data to {limit}"); if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = &mut self.state { - let stream_was_blocked = fc.available() == 0; - fc.update(limit); - if stream_was_blocked && self.avail() > 0 { - self.conn_events.send_stream_writable(self.stream_id); + let previous_limit = fc.available(); + if let Some(current_limit) = fc.update(limit) { + self.maybe_emit_writable_event(previous_limit, current_limit); } } } @@ -1353,6 +1363,27 @@ impl SendStream { pub(crate) fn state(&mut self) -> &mut SendStreamState { &mut self.state } + + pub(crate) fn maybe_emit_writable_event( + &mut self, + previous_limit: usize, + current_limit: usize, + ) { + let low_watermark = self.writable_event_low_watermark.get(); + + // Skip if: + // - stream was not constrained by limit before, + // - or stream is still constrained by limit, + // - or stream is constrained by different limit. + if low_watermark < previous_limit + || current_limit < low_watermark + || self.avail() < low_watermark + { + return; + } + + self.conn_events.send_stream_writable(self.stream_id); + } } impl ::std::fmt::Display for SendStream { @@ -1740,7 +1771,7 @@ pub struct SendStreamRecoveryToken { #[cfg(test)] mod tests { - use std::{cell::RefCell, collections::VecDeque, rc::Rc}; + use std::{cell::RefCell, collections::VecDeque, num::NonZeroUsize, rc::Rc}; use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder}; @@ -2434,7 +2465,7 @@ mod tests { // Increasing conn max (conn:4, stream:4) will unblock but not emit // event b/c that happens in Connection::emit_frame() (tested in // connection.rs) - assert!(conn_fc.borrow_mut().update(4)); + assert!(conn_fc.borrow_mut().update(4).is_some()); assert_eq!(conn_events.events().count(), 0); assert_eq!(s.avail(), 2); assert_eq!(s.send(b"hello").unwrap(), 2); @@ -2460,6 +2491,53 @@ mod tests { assert_eq!(s.send(b"hello").unwrap(), 0); } + #[test] + fn send_stream_writable_event_gen_with_watermark() { + let conn_fc = connection_fc(0); + let mut conn_events = ConnectionEvents::default(); + + let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone()); + // Set watermark at 3. + s.set_writable_event_low_watermark(NonZeroUsize::new(3).unwrap()); + + // Stream is initially blocked (conn:0, stream:0, watermark: 3) and will + // not accept data. + assert_eq!(s.avail(), 0); + assert_eq!(s.send(b"hi!").unwrap(), 0); + + // Increasing the connection limit (conn:10, stream:0, watermark: 3) will not generate + // event or allow sending anything. Stream is constrained by stream limit. + assert!(conn_fc.borrow_mut().update(10).is_some()); + assert_eq!(s.avail(), 0); + assert_eq!(conn_events.events().count(), 0); + + // Increasing the connection limit further (conn:11, stream:0, watermark: 3) will not + // generate event or allow sending anything. Stream wasn't constrained by connection + // limit before. + assert!(conn_fc.borrow_mut().update(11).is_some()); + assert_eq!(s.avail(), 0); + assert_eq!(conn_events.events().count(), 0); + + // Increasing to (conn:11, stream:2, watermark: 3) will allow 2 bytes + // but not generate a SendStreamWritable event as it is still below the + // configured watermark. + s.set_max_stream_data(2); + assert_eq!(conn_events.events().count(), 0); + assert_eq!(s.avail(), 2); + + // Increasing to (conn:11, stream:3, watermark: 3) will generate an + // event as available sendable bytes are >= watermark. + s.set_max_stream_data(3); + let evts = conn_events.events().collect::>(); + assert_eq!(evts.len(), 1); + assert!(matches!( + evts[0], + ConnectionEvent::SendStreamWritable { .. } + )); + + assert_eq!(s.send(b"hi!").unwrap(), 3); + } + #[test] fn send_stream_writable_event_new_stream() { let conn_fc = connection_fc(2); diff --git a/neqo-transport/src/streams.rs b/neqo-transport/src/streams.rs index aebe01c194..4827f1dbc9 100644 --- a/neqo-transport/src/streams.rs +++ b/neqo-transport/src/streams.rs @@ -485,17 +485,13 @@ impl Streams { } pub fn handle_max_data(&mut self, maximum_data: u64) { - let conn_was_blocked = self.sender_fc.borrow().available() == 0; - let conn_credit_increased = self.sender_fc.borrow_mut().update(maximum_data); - - if conn_was_blocked && conn_credit_increased { - for (id, ss) in &mut self.send { - if ss.avail() > 0 { - // These may not actually all be writable if one - // uses up all the conn credit. Not our fault. - self.events.send_stream_writable(*id); - } - } + let previous_limit = self.sender_fc.borrow().available(); + let Some(current_limit) = self.sender_fc.borrow_mut().update(maximum_data) else { + return; + }; + + for (_id, ss) in &mut self.send { + ss.maybe_emit_writable_event(previous_limit, current_limit); } } @@ -540,7 +536,10 @@ impl Streams { } pub fn handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64) { - if self.local_stream_limits[stream_type].update(maximum_streams) { + let increased = self.local_stream_limits[stream_type] + .update(maximum_streams) + .is_some(); + if increased { self.events.send_stream_creatable(stream_type); } } diff --git a/neqo-transport/src/tracking.rs b/neqo-transport/src/tracking.rs index d0723bbcbe..6643d516e3 100644 --- a/neqo-transport/src/tracking.rs +++ b/neqo-transport/src/tracking.rs @@ -559,6 +559,10 @@ impl RecvdPackets { } } + /// Length of the worst possible ACK frame, assuming only one range and ECN counts. + /// Note that this assumes one byte for the type and count of extra ranges. + pub const USEFUL_ACK_LEN: usize = 1 + 8 + 8 + 1 + 8 + 3 * 8; + /// Generate an ACK frame for this packet number space. /// /// Unlike other frame generators this doesn't modify the underlying instance @@ -577,10 +581,6 @@ impl RecvdPackets { tokens: &mut Vec, stats: &mut FrameStats, ) { - // The worst possible ACK frame, assuming only one range. - // Note that this assumes one byte for the type and count of extra ranges. - const LONGEST_ACK_HEADER: usize = 1 + 8 + 8 + 1 + 8; - // Check that we aren't delaying ACKs. if !self.ack_now(now, rtt) { return; @@ -592,7 +592,10 @@ impl RecvdPackets { // When congestion limited, ACK-only packets are 255 bytes at most // (`recovery::ACK_ONLY_SIZE_LIMIT - 1`). This results in limiting the // ranges to 13 here. - let max_ranges = if let Some(avail) = builder.remaining().checked_sub(LONGEST_ACK_HEADER) { + let max_ranges = if let Some(avail) = builder + .remaining() + .checked_sub(RecvdPackets::USEFUL_ACK_LEN) + { // Apply a hard maximum to keep plenty of space for other stuff. min(1 + (avail / 16), MAX_ACKS_PER_FRAME) } else { @@ -1158,7 +1161,9 @@ mod tests { .is_some()); let mut builder = PacketBuilder::short(Encoder::new(), false, []); - builder.set_limit(32); + // The code pessimistically assumes that each range needs 16 bytes to express. + // So this won't be enough for a second range. + builder.set_limit(RecvdPackets::USEFUL_ACK_LEN + 8); let mut stats = FrameStats::default(); tracker.write_frame( diff --git a/neqo-transport/tests/connection.rs b/neqo-transport/tests/connection.rs index d08d946cf8..3cc711f80b 100644 --- a/neqo-transport/tests/connection.rs +++ b/neqo-transport/tests/connection.rs @@ -7,7 +7,7 @@ mod common; use neqo_common::{Datagram, Decoder, Encoder, Role}; -use neqo_transport::{ConnectionError, ConnectionParameters, Error, State, Version}; +use neqo_transport::{CloseReason, ConnectionParameters, Error, State, Version}; use test_fixture::{ default_client, default_server, header_protection::{ @@ -180,7 +180,7 @@ fn packet_without_frames() { client.process_input(&modified, now()); assert_eq!( client.state(), - &State::Closed(ConnectionError::Transport(Error::ProtocolViolation)) + &State::Closed(CloseReason::Transport(Error::ProtocolViolation)) ); } @@ -266,10 +266,7 @@ fn overflow_crypto() { client.process_input(&dgram, now()); if let State::Closing { error, .. } = client.state() { assert!( - matches!( - error, - ConnectionError::Transport(Error::CryptoBufferExceeded), - ), + matches!(error, CloseReason::Transport(Error::CryptoBufferExceeded),), "the connection need to abort on crypto buffer" ); assert!(pn > 64, "at least 64000 bytes of data is buffered"); diff --git a/neqo-transport/tests/network.rs b/neqo-transport/tests/network.rs index e35a358d44..fffcd1f752 100644 --- a/neqo-transport/tests/network.rs +++ b/neqo-transport/tests/network.rs @@ -6,7 +6,7 @@ use std::{ops::Range, time::Duration}; -use neqo_transport::{ConnectionError, ConnectionParameters, Error, State}; +use neqo_transport::{CloseReason, ConnectionParameters, Error, State}; use test_fixture::{ boxed, sim::{ @@ -48,10 +48,10 @@ simulate!( idle_timeout, [ ConnectionNode::default_client(boxed![ReachState::new(State::Closed( - ConnectionError::Transport(Error::IdleTimeout) + CloseReason::Transport(Error::IdleTimeout) ))]), ConnectionNode::default_server(boxed![ReachState::new(State::Closed( - ConnectionError::Transport(Error::IdleTimeout) + CloseReason::Transport(Error::IdleTimeout) ))]), ] ); @@ -62,7 +62,7 @@ simulate!( ConnectionNode::new_client( ConnectionParameters::default().idle_timeout(weeks(1000)), boxed![ReachState::new(State::Confirmed),], - boxed![ReachState::new(State::Closed(ConnectionError::Transport( + boxed![ReachState::new(State::Closed(CloseReason::Transport( Error::IdleTimeout )))] ), @@ -71,7 +71,7 @@ simulate!( ConnectionNode::new_server( ConnectionParameters::default().idle_timeout(weeks(1000)), boxed![ReachState::new(State::Confirmed),], - boxed![ReachState::new(State::Closed(ConnectionError::Transport( + boxed![ReachState::new(State::Closed(CloseReason::Transport( Error::IdleTimeout )))] ), diff --git a/neqo-transport/tests/retry.rs b/neqo-transport/tests/retry.rs index 0cfc48f051..3f95511c3e 100644 --- a/neqo-transport/tests/retry.rs +++ b/neqo-transport/tests/retry.rs @@ -17,7 +17,7 @@ use std::{ use common::{connected_server, default_server, generate_ticket}; use neqo_common::{hex_with_len, qdebug, qtrace, Datagram, Encoder, Role}; use neqo_crypto::AuthenticationStatus; -use neqo_transport::{server::ValidateAddress, ConnectionError, Error, State, StreamType}; +use neqo_transport::{server::ValidateAddress, CloseReason, Error, State, StreamType}; use test_fixture::{ assertions, datagram, default_client, header_protection::{ @@ -469,7 +469,7 @@ fn mitm_retry() { assert!(matches!( *client.state(), State::Closing { - error: ConnectionError::Transport(Error::ProtocolViolation), + error: CloseReason::Transport(Error::ProtocolViolation), .. } )); diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 3c43a9105d..4740d26ded 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -15,7 +15,7 @@ use neqo_crypto::{ }; use neqo_transport::{ server::{ActiveConnectionRef, Server, ValidateAddress}, - Connection, ConnectionError, ConnectionParameters, Error, Output, State, StreamType, Version, + CloseReason, Connection, ConnectionParameters, Error, Output, State, StreamType, Version, }; use test_fixture::{ assertions, datagram, default_client, @@ -463,13 +463,13 @@ fn bad_client_initial() { assert_ne!(delay, Duration::from_secs(0)); assert!(matches!( *client.state(), - State::Draining { error: ConnectionError::Transport(Error::PeerError(code)), .. } if code == Error::ProtocolViolation.code() + State::Draining { error: CloseReason::Transport(Error::PeerError(code)), .. } if code == Error::ProtocolViolation.code() )); for server in server.active_connections() { assert_eq!( *server.borrow().state(), - State::Closed(ConnectionError::Transport(Error::ProtocolViolation)) + State::Closed(CloseReason::Transport(Error::ProtocolViolation)) ); } diff --git a/test-fixture/src/sim/net.rs b/test-fixture/src/sim/net.rs deleted file mode 100644 index 754426f895..0000000000 --- a/test-fixture/src/sim/net.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use super::rng::RandomDuration; -use super::{Node, Rng}; -use neqo_common::Datagram; -use neqo_transport::Output; -use std::collections::BTreeMap; -use std::fmt::{self, Debug}; -use std::iter; -use std::ops::Range; -use std::time::{Duration, Instant}; - -/// -pub struct RandomDrop { - threshold: u64, - max: u64, - rng: Rng, -} - -impl RandomDuration { - /// Make a new random `Duration` generator. This asserts if the range provided - /// is inverted (i.e., `bounds.start > bounds.end`), or spans 2^64 - /// or more nanoseconds. - /// A zero-length range means that random values won't be taken from the Rng - pub fn new(bounds: Range, rng: Rng) -> Self { - let max = u64::try_from((bounds.end - bounds.start).as_nanos()).unwrap(); - Self { - start: bounds.start, - max, - rng, - } - } - - fn next(&mut self) -> Duration { - let r = if self.max == 0 { - Duration::new(0, 0) - } else { - self.rng.borrow_mut().random_from(0..self.max) - } - self.start + Duration::from_nanos(r) - } -} - -enum DelayState { - New(Range), - Ready(RandomDuration), -} - -pub struct Delay { - state: DelayState, - queue: BTreeMap, -} - -impl Delay -{ - pub fn new(bounds: Range) -> Self - { - Self { - State: DelayState::New(bounds), - queue: BTreeMap::default(), - } - } - - fn insert(&mut self, d: Datagram, now: Instant) { - let mut t = if let State::Ready(r) = self.state { - now + self.source.next() - } else { - unreachable!(); - } - while self.queue.contains_key(&t) { - // This is a little inefficient, but it avoids drops on collisions, - // which are super-common for a fixed delay. - t += Duration::from_nanos(1); - } - self.queue.insert(t, d); - } -} - -impl Node for Delay -{ - fn init(&mut self, rng: Rng, now: Instant) { - if let DelayState::New(bounds) = self.state { - self.state = RandomDuration::new(bounds); - } else { - unreachable!(); - } - } - - fn process(&mut self, d: Option, now: Instant) -> Output { - if let Some(dgram) = d { - self.insert(dgram, now); - } - if let Some((&k, _)) = self.queue.range(..now).nth(0) { - Output::Datagram(self.queue.remove(&k).unwrap()) - } else if let Some(&t) = self.queue.keys().nth(0) { - Output::Callback(t - now) - } else { - Output::None - } - } -} - -impl Debug for Delay { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("delay") - } -} diff --git a/test/test.sh b/test/test.sh index dc02b2161c..2c22e9b6c8 100755 --- a/test/test.sh +++ b/test/test.sh @@ -17,7 +17,7 @@ cargo build --bin neqo-client --bin neqo-server addr=127.0.0.1 port=4433 path=/20000 -flags="--verbose --qlog-dir $tmp --use-old-http --alpn hq-interop --quic-version 1" +flags="--verbose --verbose --verbose --qlog-dir $tmp --use-old-http --alpn hq-interop --quic-version 1" if [ "$(uname -s)" != "Linux" ]; then iface=lo0 else