From 0ca6f34a4a7796075b1b7e9657fe829347d3c1c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 4 Mar 2024 17:50:20 +0100 Subject: [PATCH] feat: Implement `car-mirror-wasm` and `car-mirror-axum` crates (#44) * feat: Succinctly encode `PullRequest` and `PushResponse` * feat: Write `car-mirror-axum` crate * refactor: Move car-mirror-reqwest integration test into doctest * fix: PullRequest/PushResponse dagcbor roundtripping * chore: Add necessary features * feat: Write car-mirror-wasm crate & playwright tests * feat: Serve both HTTP + HTTPS with self signed certs * refactor: Disable crate for non-wasm in a different way * feat: Auto-detect whether to use streaming or not * chore: Run tests for both HTTP and HTTPS * feat: Run playwright tests in github actions * fix: Don't require confirmation on cargo binstall command * fix: correct playwright action commands * fix: Cache JS deps & setup wasm32 target in action * chore: Try some stuf * chore: Fix lints * fix: Support webkit that doesn't have BYOBReader support * chore: fmt * fix: Reintroduce `car_mirror_axum::serve` * chore: Run reqwest axum integration with coverage * chore: Test more lines in `messages.rs`: `indicates_finished` * chore: Write `car-mirror-axum` documentation * fix: Pin version of wasm-bindgen in CI * feat: Switch to `playwright-test` as test runner * chore: Remove unused `example` crate + update wasm version --- .github/workflows/tests_and_checks.yml | 42 ++-- Cargo.lock | 227 +++++++++++++++++++- Cargo.toml | 7 +- car-mirror-axum/.dockerignore | 7 + car-mirror-axum/Cargo.toml | 55 +++++ car-mirror-axum/LICENSE-APACHE | 201 +++++++++++++++++ car-mirror-axum/LICENSE-MIT | 23 ++ car-mirror-axum/README.md | 53 +++++ car-mirror-axum/examples/serve_test_data.rs | 41 ++++ car-mirror-axum/src/error.rs | 125 +++++++++++ car-mirror-axum/src/extract/dag_cbor.rs | 133 ++++++++++++ car-mirror-axum/src/extract/mod.rs | 3 + car-mirror-axum/src/lib.rs | 24 +++ car-mirror-axum/src/server.rs | 183 ++++++++++++++++ car-mirror-reqwest/Cargo.toml | 8 +- car-mirror-reqwest/integration/axum.rs | 147 ------------- car-mirror-reqwest/src/error.rs | 9 + car-mirror-reqwest/src/lib.rs | 21 +- car-mirror-reqwest/src/request.rs | 88 ++++---- car-mirror-reqwest/tests/integration.rs | 31 +++ car-mirror-wasm/.gitignore | 7 + car-mirror-wasm/Cargo.toml | 33 ++- car-mirror-wasm/README.md | 51 +---- car-mirror-wasm/index.html | 12 ++ car-mirror-wasm/package.json | 198 +++++++++++++++++ car-mirror-wasm/src/blockstore.rs | 93 ++++++++ car-mirror-wasm/src/exports.rs | 172 +++++++++++++++ car-mirror-wasm/src/lib.rs | 50 ++--- car-mirror-wasm/src/messages.rs | 77 +++++++ car-mirror-wasm/src/utils.rs | 15 ++ car-mirror-wasm/test/index.js | 144 +++++++++++++ car-mirror-wasm/test/protocol.spec.js | 66 ++++++ car-mirror-wasm/tests/web.rs | 12 -- car-mirror/Cargo.toml | 3 + car-mirror/src/common.rs | 58 ++--- car-mirror/src/dag_walk.rs | 4 +- car-mirror/src/lib.rs | 3 + car-mirror/src/messages.rs | 159 ++++++++++++-- car-mirror/src/serde_bloom_bytes.rs | 81 +++++++ car-mirror/src/serde_cid_vec.rs | 28 +++ examples/Cargo.toml | 13 -- examples/counterparts.rs | 6 - 42 files changed, 2310 insertions(+), 403 deletions(-) create mode 100644 car-mirror-axum/.dockerignore create mode 100644 car-mirror-axum/Cargo.toml create mode 100644 car-mirror-axum/LICENSE-APACHE create mode 100644 car-mirror-axum/LICENSE-MIT create mode 100644 car-mirror-axum/README.md create mode 100644 car-mirror-axum/examples/serve_test_data.rs create mode 100644 car-mirror-axum/src/error.rs create mode 100644 car-mirror-axum/src/extract/dag_cbor.rs create mode 100644 car-mirror-axum/src/extract/mod.rs create mode 100644 car-mirror-axum/src/lib.rs create mode 100644 car-mirror-axum/src/server.rs delete mode 100644 car-mirror-reqwest/integration/axum.rs create mode 100644 car-mirror-reqwest/tests/integration.rs create mode 100644 car-mirror-wasm/.gitignore create mode 100644 car-mirror-wasm/index.html create mode 100644 car-mirror-wasm/package.json create mode 100644 car-mirror-wasm/src/blockstore.rs create mode 100644 car-mirror-wasm/src/exports.rs create mode 100644 car-mirror-wasm/src/messages.rs create mode 100644 car-mirror-wasm/src/utils.rs create mode 100644 car-mirror-wasm/test/index.js create mode 100644 car-mirror-wasm/test/protocol.spec.js delete mode 100644 car-mirror-wasm/tests/web.rs create mode 100644 car-mirror/src/serde_bloom_bytes.rs create mode 100644 car-mirror/src/serde_cid_vec.rs delete mode 100644 examples/Cargo.toml delete mode 100644 examples/counterparts.rs diff --git a/.github/workflows/tests_and_checks.yml b/.github/workflows/tests_and_checks.yml index ad179e2..6644643 100644 --- a/.github/workflows/tests_and_checks.yml +++ b/.github/workflows/tests_and_checks.yml @@ -105,18 +105,8 @@ jobs: - name: Run Tests run: cargo test --manifest-path car-mirror/Cargo.toml --all-features - run-headless-tests: - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: [ ubuntu-latest ] - browser: [ firefox, chrome ] - - # include: - # bug w/ wasm-bindgen: https://github.com/rustwasm/wasm-bindgen/issues/3004 - # - os: macos-latest - # browser: safari + run-playwright-tests: + runs-on: ubuntu-latest defaults: run: @@ -126,17 +116,37 @@ jobs: - name: Checkout Repository uses: actions/checkout@v3 - - name: Install wasm-pack - run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh + - name: Install cargo-binstall + uses: cargo-bins/cargo-binstall@v1.6.3 + + - name: Install wasm-bindgen + run: cargo binstall wasm-bindgen-cli --version 0.2.91 -y + + - name: Install wasm-opt (via binaryen) + run: sudo apt-get install -y binaryen + + - name: Install NPM dependencies + run: npm i + + - name: Install Playwright Browsers + run: npx playwright install --with-deps - name: Cache Project uses: Swatinem/rust-cache@v2 + - name: Cache Project (JavaScript) + uses: actions/setup-node@v4 + with: + node-version: '18.x' + - name: Install Rust Toolchain uses: actions-rs/toolchain@v1 with: override: true toolchain: stable - - name: Run Rust Headless Browser Tests - run: wasm-pack test --headless --${{ matrix.browser }} + - name: Setup WebAssembly Toolchain + run: rustup target add wasm32-unknown-unknown + + - name: Run Playwright tests + run: npm test diff --git a/Cargo.lock b/Cargo.lock index 43a717c..be4157f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,12 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "arrayref" version = "0.3.7" @@ -368,6 +374,47 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "axum-server" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ad46c3ec4e12f4a4b6835e173ba21c25e484c9d02b49770bf006ce5367c036" +dependencies = [ + "arc-swap", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "pin-project-lite", + "rustls", + "rustls-pemfile 2.0.0", + "tokio", + "tokio-rustls", + "tower", + "tower-service", +] + +[[package]] +name = "axum-server-dual-protocol" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ea4cd08ae2a5f075d28fa31190163c8106a1d2d3189442494bae22b39040a0d" +dependencies = [ + "axum-server", + "bytes", + "http 1.0.0", + "http-body-util", + "pin-project", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -532,6 +579,7 @@ dependencies = [ "async-stream", "bytes", "car-mirror", + "data-encoding", "deterministic-bloom", "futures", "iroh-car", @@ -541,6 +589,41 @@ dependencies = [ "quick_cache", "roaring-graphs", "serde", + "serde_bytes", + "serde_ipld_dagcbor", + "serde_json", + "test-log", + "test-strategy", + "testresult", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "wnfs-common", + "wnfs-unixfs-file", +] + +[[package]] +name = "car-mirror-axum" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "axum", + "axum-macros", + "axum-server", + "axum-server-dual-protocol", + "bytes", + "car-mirror", + "futures", + "http 1.0.0", + "libipld", + "mime", + "rand", + "rand_chacha", + "rcgen", + "serde", "serde_ipld_dagcbor", "test-log", "test-strategy", @@ -548,6 +631,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tower-http", "tracing", "tracing-subscriber", "wnfs-common", @@ -577,10 +661,13 @@ dependencies = [ "axum-macros", "bytes", "car-mirror", + "car-mirror-axum", "futures", + "http 0.2.11", "libipld", "reqwest", "reqwest-middleware", + "serde_ipld_dagcbor", "test-log", "test-strategy", "testresult", @@ -590,19 +677,30 @@ dependencies = [ "tracing", "tracing-subscriber", "wnfs-common", + "wnfs-unixfs-file", ] [[package]] name = "car-mirror-wasm" version = "0.1.0" dependencies = [ + "anyhow", + "bytes", + "car-mirror", "console_error_panic_hook", + "futures", "js-sys", + "libipld", + "serde-wasm-bindgen", + "serde_json", + "tokio-util", "tracing", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", + "wasm-streams", "web-sys", + "wnfs-common", ] [[package]] @@ -869,6 +967,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "deterministic-bloom" version = "0.1.0" @@ -973,13 +1080,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "examples" -version = "0.1.0" -dependencies = [ - "car-mirror", -] - [[package]] name = "fastrand" version = "1.9.0" @@ -1803,6 +1903,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.18" @@ -1906,6 +2012,16 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "pem" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" +dependencies = [ + "base64", + "serde", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1985,6 +2101,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2158,6 +2280,18 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rcgen" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48406db8ac1f3cbc7dcdb56ec355343817958a356ff430259bb07baf7607e1e1" +dependencies = [ + "pem", + "ring", + "time", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2236,7 +2370,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2361,6 +2495,22 @@ dependencies = [ "base64", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2444,6 +2594,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde-wasm-bindgen" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8302e169f0eddcc139c70f139d19d6467353af16f9fce27e8c30158036a1e16b" +dependencies = [ + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "serde_bytes" version = "0.11.14" @@ -2478,9 +2639,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -2775,6 +2936,25 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +dependencies = [ + "deranged", + "num-conv", + "powerfmt", + "serde", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + [[package]] name = "tinytemplate" version = "1.2.1" @@ -2848,6 +3028,7 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -2879,6 +3060,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.4.2", + "bytes", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -3469,6 +3667,15 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +dependencies = [ + "time", +] + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/Cargo.toml b/Cargo.toml index faa1847..acc9bbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ "car-mirror", + "car-mirror-axum", "car-mirror-benches", "car-mirror-reqwest", "car-mirror-wasm", - "examples", ] [workspace.dependencies] @@ -15,6 +15,11 @@ futures = "0.3" libipld = "0.16" libipld-core = "0.16" serde_ipld_dagcbor = "0.4" +serde_bytes = "0.11" +serde_json = "1.0" +tokio-util = "0.7.8" +wasm-bindgen = { version = "0.2", features = ["serde-serialize"] } +wasm-bindgen-futures = { version = "0.4" } wnfs-common = { version = "0.2.0" } wnfs-unixfs-file = { version = "0.2.0" } diff --git a/car-mirror-axum/.dockerignore b/car-mirror-axum/.dockerignore new file mode 100644 index 0000000..b94f2b7 --- /dev/null +++ b/car-mirror-axum/.dockerignore @@ -0,0 +1,7 @@ +* + +!Cargo.lock +!Cargo.toml +!src + +src/bin diff --git a/car-mirror-axum/Cargo.toml b/car-mirror-axum/Cargo.toml new file mode 100644 index 0000000..60486d2 --- /dev/null +++ b/car-mirror-axum/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "car-mirror-axum" +version = "0.1.0" +description = "A library for building axum webservers that use car-mirror and a test axum webserver binary" +keywords = [] +categories = [] +include = ["/src", "README.md", "LICENSE-APACHE", "LICENSE-MIT"] +license = "Apache-2.0" +readme = "README.md" +edition = "2021" +rust-version = "1.66" +documentation = "https://docs.rs/car-mirror-axum" +repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror-axum" +authors = ["Philipp Krüger "] + +[lib] +path = "src/lib.rs" +doctest = true + +[dependencies] +anyhow = "1.0" +async-trait = "0.1" +axum = { version = "0.7", features = ["http1", "http2"] } +axum-macros = "0.4" +axum-server = { version = "0.6.0", features = ["tls-rustls"] } +bytes = "1.4" +car-mirror = { version = "0.1", path = "../car-mirror", features = ["quick_cache"] } +futures = "0.3" +http = "1.0" +libipld = "0.16" +mime = "0.3" +rcgen = "0.12.1" +serde = "^1" +serde_ipld_dagcbor = { workspace = true } +thiserror = "1.0" +tokio = { version = "1.0", features = ["rt-multi-thread"] } +tokio-util = { version = "0.7", features = ["io"] } +tower-http = { version = "0.5", features = ["cors", "trace"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot", "registry"] } +wnfs-common = { workspace = true } + +[dev-dependencies] +axum-server-dual-protocol = "0.6.0" +rand = "0.8" +rand_chacha = "0.3" +test-log = { version = "0.2", default-features = false, features = ["trace"] } +test-strategy = "0.3" +testresult = "0.3" +wnfs-unixfs-file = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +# defines the configuration attribute `docsrs` +rustdoc-args = ["--cfg", "docsrs"] diff --git a/car-mirror-axum/LICENSE-APACHE b/car-mirror-axum/LICENSE-APACHE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/car-mirror-axum/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/car-mirror-axum/LICENSE-MIT b/car-mirror-axum/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/car-mirror-axum/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/car-mirror-axum/README.md b/car-mirror-axum/README.md new file mode 100644 index 0000000..df6beaa --- /dev/null +++ b/car-mirror-axum/README.md @@ -0,0 +1,53 @@ +
+

car-mirror-axum

+ +

+ + Crate + + + Code Coverage + + + Build Status + + + License-Apache + + + License-MIT + + + Docs + + + Discord + +

+
+ +
:warning: Work in progress :warning:
+ +## car-mirror-axum + +Description. + +## License + +This project is licensed under either of + +- Apache License, Version 2.0, ([LICENSE-APACHE](./LICENSE-APACHE) or [http://www.apache.org/licenses/LICENSE-2.0][apache]) +- MIT license ([LICENSE-MIT](./LICENSE-MIT) or [http://opensource.org/licenses/MIT][mit]) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally +submitted for inclusion in the work by you, as defined in the Apache-2.0 +license, shall be dual licensed as above, without any additional terms or +conditions. + + +[apache]: https://www.apache.org/licenses/LICENSE-2.0 +[mit]: http://opensource.org/licenses/MIT diff --git a/car-mirror-axum/examples/serve_test_data.rs b/car-mirror-axum/examples/serve_test_data.rs new file mode 100644 index 0000000..2866726 --- /dev/null +++ b/car-mirror-axum/examples/serve_test_data.rs @@ -0,0 +1,41 @@ +use anyhow::Result; +use axum_server::tls_rustls::RustlsConfig; +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use std::future::IntoFuture; +use wnfs_common::MemoryBlockStore; + +#[test_log::test(tokio::main)] +async fn main() -> Result<()> { + tracing::info!("Starting"); + + tracing::info!("Generating self-signed certificate"); + let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?; + let tls_config = RustlsConfig::from_der( + vec![cert.serialize_der()?], + cert.serialize_private_key_der(), + ) + .await?; + tracing::info!("Successfully generated self-signed configuration"); + + tracing::info!("Generating test data"); + let store = MemoryBlockStore::new(); + let mut test_file = vec![0u8; 100_000_000]; + ChaCha8Rng::seed_from_u64(0).fill_bytes(&mut test_file); + tracing::info!("Test file size: {} bytes", test_file.len()); + let test_root = wnfs_unixfs_file::builder::FileBuilder::new() + .content_bytes(test_file) + .build()? + .store(&store) + .await?; + tracing::info!("Serving test root {test_root}"); + + let addr = "0.0.0.0:3344".parse()?; + let handle = tokio::spawn( + axum_server_dual_protocol::bind_dual_protocol(addr, tls_config) + .serve(car_mirror_axum::app(store).into_make_service()), + ); + println!("Listening on {addr}"); + handle.into_future().await??; + Ok(()) +} diff --git a/car-mirror-axum/src/error.rs b/car-mirror-axum/src/error.rs new file mode 100644 index 0000000..e820d7d --- /dev/null +++ b/car-mirror-axum/src/error.rs @@ -0,0 +1,125 @@ +//! Basic anyhow-based error webserver errors + +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, +}; +use std::fmt::Display; + +/// A basic anyhow error type wrapper that returns +/// internal server errors if something goes wrong. +#[derive(Debug)] +pub struct AppError { + status_code: StatusCode, + error_msg: String, +} + +impl Display for AppError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.error_msg.fmt(f) + } +} + +impl AppError { + /// Construct a new error from a status code and an error message + pub fn new(status_code: StatusCode, msg: impl ToString) -> Self { + Self { + status_code, + error_msg: msg.to_string(), + } + } +} + +/// Helper type alias that defaults the error type to `AppError` +pub type AppResult = Result; + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + (self.status_code, self.error_msg).into_response() + } +} + +impl From for AppError { + fn from(err: anyhow::Error) -> Self { + Self::from(&err) + } +} + +impl From<&anyhow::Error> for AppError { + fn from(err: &anyhow::Error) -> Self { + Self::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", err), + ) + } +} + +impl From for AppError { + fn from(err: car_mirror::Error) -> Self { + Self::from(&err) + } +} + +impl From<&car_mirror::Error> for AppError { + fn from(err: &car_mirror::Error) -> Self { + use car_mirror::Error; + match err { + Error::TooManyBytes { .. } => Self::new(StatusCode::PAYLOAD_TOO_LARGE, err), + Error::BlockSizeExceeded { .. } => Self::new(StatusCode::PAYLOAD_TOO_LARGE, err), + Error::UnsupportedCodec { .. } => Self::new(StatusCode::BAD_REQUEST, err), + Error::UnsupportedHashCode { .. } => Self::new(StatusCode::BAD_REQUEST, err), + Error::BlockStoreError(err) => Self::from(err), + Error::ParsingError(_) => Self::new(StatusCode::UNPROCESSABLE_ENTITY, err), + Error::IncrementalVerificationError(_) => Self::new(StatusCode::BAD_REQUEST, err), + Error::CarFileError(_) => Self::new(StatusCode::BAD_REQUEST, err), + } + } +} + +impl From for AppError { + fn from(err: wnfs_common::BlockStoreError) -> Self { + Self::from(&err) + } +} + +impl From<&wnfs_common::BlockStoreError> for AppError { + fn from(err: &wnfs_common::BlockStoreError) -> Self { + use wnfs_common::BlockStoreError; + match err { + BlockStoreError::MaximumBlockSizeExceeded(_) => { + Self::new(StatusCode::PAYLOAD_TOO_LARGE, err) + } + BlockStoreError::CIDNotFound(_) => Self::new(StatusCode::NOT_FOUND, err), + BlockStoreError::CIDError(_) => Self::new(StatusCode::INTERNAL_SERVER_ERROR, err), + BlockStoreError::Custom(_) => Self::new(StatusCode::INTERNAL_SERVER_ERROR, err), + } + } +} + +impl From for AppError { + fn from(err: libipld::cid::Error) -> Self { + Self::from(&err) + } +} + +impl From<&libipld::cid::Error> for AppError { + fn from(err: &libipld::cid::Error) -> Self { + Self::new(StatusCode::BAD_REQUEST, err) + } +} + +impl From for AppError { + fn from(err: std::io::Error) -> Self { + if let Some(err) = err.get_ref() { + if let Some(err) = err.downcast_ref::() { + return Self::from(err); + } + + if let Some(err) = err.downcast_ref::() { + return Self::from(err); + } + } + + Self::new(StatusCode::INTERNAL_SERVER_ERROR, err) + } +} diff --git a/car-mirror-axum/src/extract/dag_cbor.rs b/car-mirror-axum/src/extract/dag_cbor.rs new file mode 100644 index 0000000..7f79424 --- /dev/null +++ b/car-mirror-axum/src/extract/dag_cbor.rs @@ -0,0 +1,133 @@ +//! Axum extractor that serializes and deserializes DagCbor data using serde + +use anyhow::Result; +use axum::{ + extract::{rejection::BytesRejection, FromRequest, Request}, + http::{ + header::{ToStrError, CONTENT_TYPE}, + HeaderValue, StatusCode, + }, + response::{IntoResponse, Response}, +}; +use bytes::Bytes; +use serde::{de::DeserializeOwned, Serialize}; +use serde_ipld_dagcbor::DecodeError; +use std::{convert::Infallible, fmt::Debug}; + +/// Newtype wrapper around dag-cbor (de-)serializable data +#[derive(Debug, Clone)] +pub struct DagCbor(pub M); + +/// Errors that can occur during dag-cbor deserialization +#[derive(Debug, thiserror::Error)] +pub enum DagCborRejection { + /// When the Content-Type header is missing + #[error("Missing Content-Type header on request, expected application/vnd.ipld.dag-cbor, but got nothing")] + MissingContentType, + + /// When a Content-Type header was set, but unexpected. + #[error("Incorrect mime type, expected application/vnd.ipld.dag-cbor, but got {0}")] + UnexpectedContentType(mime::Mime), + + /// When the Content-Type header was set, but couldn't be parsed as a mime type + #[error( + "Failed parsing Content-Type header as mime type, expected application/vnd.ipld.dag-cbor" + )] + FailedToParseMime, + + /// When the request body couldn't be loaded before deserialization + #[error("Unable to buffer the request body, perhaps it exceeded the 2MB limit")] + FailedParsingRequestBytes, + + /// When dag-cbor deserialization into the target type fails + #[error("Failed decoding dag-cbor: {0}")] + FailedDecoding(#[from] DecodeError), +} + +impl IntoResponse for DagCborRejection { + fn into_response(self) -> Response { + ( + match &self { + Self::MissingContentType => StatusCode::BAD_REQUEST, + Self::UnexpectedContentType(_) => StatusCode::BAD_REQUEST, + Self::FailedToParseMime => StatusCode::BAD_REQUEST, + Self::FailedParsingRequestBytes => StatusCode::PAYLOAD_TOO_LARGE, + Self::FailedDecoding(_) => StatusCode::BAD_REQUEST, + }, + self.to_string(), + ) + .into_response() + } +} + +impl From for DagCborRejection { + fn from(_err: ToStrError) -> Self { + Self::FailedToParseMime + } +} + +impl From for DagCborRejection { + fn from(_err: mime::FromStrError) -> Self { + Self::FailedToParseMime + } +} + +impl From for DagCborRejection { + fn from(_err: BytesRejection) -> Self { + Self::FailedParsingRequestBytes + } +} + +#[async_trait::async_trait] +impl FromRequest for DagCbor +where + M: DeserializeOwned + Debug, + S: Send + Sync, +{ + type Rejection = DagCborRejection; + + #[tracing::instrument(skip_all, ret, err)] + #[allow(clippy::style)] // clippy::blocks_in_conditions in tracing::instrument https://github.com/rust-lang/rust-clippy/issues/12281 + async fn from_request(req: Request, state: &S) -> Result { + let mime = req + .headers() + .get(CONTENT_TYPE) + .ok_or(DagCborRejection::MissingContentType)? + .to_str()? + .parse::()?; + + if mime.essence_str() != "application/vnd.ipld.dag-cbor" { + return Err(DagCborRejection::UnexpectedContentType(mime)); + } + + let bytes = Bytes::from_request(req, state).await?; + Ok(DagCbor(serde_ipld_dagcbor::from_slice(bytes.as_ref())?)) + } +} + +impl IntoResponse for DagCbor +where + M: Serialize, +{ + fn into_response(self) -> Response { + match serde_ipld_dagcbor::to_vec(&self.0) { + Ok(bytes) => ( + [( + CONTENT_TYPE, + HeaderValue::from_static("application/vnd.ipld.dag-cbor"), + )], + bytes, + ) + .into_response(), + Err(err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + [( + CONTENT_TYPE, + HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref()), + )], + format!("Failed to encode dag-cbor: {err}"), + ) + .into_response(), + } + } +} diff --git a/car-mirror-axum/src/extract/mod.rs b/car-mirror-axum/src/extract/mod.rs new file mode 100644 index 0000000..e98e1d0 --- /dev/null +++ b/car-mirror-axum/src/extract/mod.rs @@ -0,0 +1,3 @@ +//! Axum extractor utilities + +pub mod dag_cbor; diff --git a/car-mirror-axum/src/lib.rs b/car-mirror-axum/src/lib.rs new file mode 100644 index 0000000..b117e57 --- /dev/null +++ b/car-mirror-axum/src/lib.rs @@ -0,0 +1,24 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] +#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] +#![deny(unreachable_pub)] + +//! # car-mirror-axum +//! +//! This crate exposes a very basic car mirror server. +//! It accepts `GET /dag/pull/:cid`, `POST /dag/pull/:cid` and `POST /dag/push/:cid` requests +//! with streaming car file request and response types, respectively. +//! +//! It is roughly based on the [car-mirror-http specification](https://github.com/wnfs-wg/car-mirror-http-spec). +//! +//! It also exposes some utilities with which it's easier to build a car-mirror axum server. +//! +//! At the moment, it's recommended to only make use of the `extract` module, and mostly +//! use the rest of the library for tests or treat the rest of the code as an example +//! to copy code from for actual production use. + +mod error; +pub mod extract; +mod server; + +pub use error::*; +pub use server::*; diff --git a/car-mirror-axum/src/server.rs b/car-mirror-axum/src/server.rs new file mode 100644 index 0000000..464f774 --- /dev/null +++ b/car-mirror-axum/src/server.rs @@ -0,0 +1,183 @@ +use crate::{extract::dag_cbor::DagCbor, AppResult}; +use anyhow::Result; +use axum::{ + body::{Body, HttpBody}, + extract::{Path, State}, + http::StatusCode, + routing::{get, post}, + Router, +}; +use car_mirror::{ + cache::InMemoryCache, + common::Config, + messages::{PullRequest, PushResponse}, +}; +use futures::TryStreamExt; +use libipld::Cid; +use std::str::FromStr; +use tokio_util::io::StreamReader; +use tower_http::{ + cors::{Any, CorsLayer}, + trace::{DefaultMakeSpan, TraceLayer}, +}; +use wnfs_common::BlockStore; + +/// Serve a basic car mirror server that serves the routes from `app` +/// with given blockstore at `127.0.0.1:3344`. +/// +/// When the server is ready to accept connections, it will print a +/// message to the console: "Listening on 127.0.0.1.3344". +/// +/// This is a simple function mostly useful for tests. If you want to +/// customize its function, copy its source and create a modified copy +/// as needed. +/// +/// This is not intended for production usage, for multiple reasons: +/// - There is no rate-limiting on the requests, so such a service would +/// be susceptible to DoS attacks. +/// - The `push` route should usually only be available behind +/// authorization or perhaps be heavily rate-limited, otherwise it +/// can cause unbounded memory or disk growth remotely. +pub async fn serve(store: impl BlockStore + Clone + 'static) -> Result<()> { + let listener = tokio::net::TcpListener::bind("127.0.0.1:3344").await?; + let addr = listener.local_addr()?; + println!("Listening on {addr}"); + axum::serve(listener, app(store)).await?; + Ok(()) +} + +/// This will serve the routes from `dag_router` nested under `/dag`, but with +/// tracing and cors headers. +pub fn app(store: impl BlockStore + Clone + 'static) -> Router { + let cors = CorsLayer::new() + .allow_methods(Any) + .allow_headers(Any) + .allow_origin(Any); + + Router::new() + .nest("/dag", dag_router(store)) + .layer(cors) + .layer( + TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::new().include_headers(true)), + ) + .fallback(not_found) +} + +/// Returns a router for car mirror requests with the +/// given blockstore as well as a new 10MB cache as state. +/// +/// This serves following routes: +/// - `GET /pull/:cid` for pull requests (GET is generally not recommended here) +/// - `POST /pull/:cid` for pull requests +/// - `POST /push/:cid` for push requests +pub fn dag_router(store: impl BlockStore + Clone + 'static) -> Router { + Router::new() + .route("/pull/:cid", get(car_mirror_pull)) + .route("/pull/:cid", post(car_mirror_pull)) + .route("/push/:cid", post(car_mirror_push)) + .with_state(ServerState::new(store)) +} + +/// The server state used for a basic car mirror server. +/// +/// Stores a block store and a car mirror operations cache. +#[derive(Debug, Clone)] +pub struct ServerState { + store: B, + cache: InMemoryCache, +} + +impl ServerState { + /// Initialize the server state with given blockstore and + /// a roughly 10MB car mirror operations cache. + pub fn new(store: B) -> ServerState { + Self { + store, + cache: InMemoryCache::new(100_000), + } + } +} + +/// Handle a POST request for car mirror pushes. +/// +/// This will consume the incoming body as a car file stream. +#[tracing::instrument(skip(state), err, ret)] +pub async fn car_mirror_push( + State(state): State>, + Path(cid_string): Path, + body: Body, +) -> AppResult<(StatusCode, DagCbor)> +where { + let cid = Cid::from_str(&cid_string)?; + + let content_length = body.size_hint().exact(); + let body_stream = body.into_data_stream(); + + tracing::info!(content_length, "Parsed content length hint"); + + let mut reader = StreamReader::new( + body_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + let response = car_mirror::push::response_streaming( + cid, + &mut reader, + &Config::default(), + &state.store, + &state.cache, + ) + .await?; + + if content_length.is_some() { + tracing::info!("Draining request"); + // If the client provided a `Content-Length` value, then + // we know the client didn't stream the request. + // In that case, it's common that the client doesn't support + // getting a response before it finished finished sending, + // because the socket closes early, before the client manages + // to read the response. + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; + } + + if response.indicates_finished() { + Ok((StatusCode::OK, DagCbor(response))) + } else { + Ok((StatusCode::ACCEPTED, DagCbor(response))) + } +} + +/// Handle an incoming GET or POST request for a car mirror pull. +/// +/// The response body will contain a stream of car file chunks. +#[tracing::instrument(skip(state), err, ret)] +pub async fn car_mirror_pull( + State(state): State>, + Path(cid_string): Path, + pull_request: Option>, +) -> AppResult<(StatusCode, Body)> { + let cid = Cid::from_str(&cid_string)?; + + let DagCbor(request) = pull_request.unwrap_or_else(|| { + DagCbor(PullRequest { + resources: vec![cid], + bloom_hash_count: 3, + bloom_bytes: vec![], + }) + }); + + let car_chunks = car_mirror::pull::response_streaming( + cid, + request, + state.store.clone(), + state.cache.clone(), + ) + .await?; + + Ok((StatusCode::OK, Body::from_stream(car_chunks))) +} + +#[axum_macros::debug_handler] +async fn not_found() -> (StatusCode, &'static str) { + tracing::info!("Hit 404"); + (StatusCode::NOT_FOUND, "404 Not Found") +} diff --git a/car-mirror-reqwest/Cargo.toml b/car-mirror-reqwest/Cargo.toml index 45084b4..6df20fa 100644 --- a/car-mirror-reqwest/Cargo.toml +++ b/car-mirror-reqwest/Cargo.toml @@ -25,6 +25,7 @@ futures = "0.3" libipld = "0.16" reqwest = { version = "0.11", default-features = false, features = ["json", "stream"] } reqwest-middleware = "0.2" +serde_ipld_dagcbor = { workspace = true } thiserror = "1.0" tokio-util = { version = "0.7", features = ["io"] } tracing = "0.1" @@ -34,14 +35,15 @@ wnfs-common = { workspace = true } axum = "0.7" axum-macros = "0.4" car-mirror = { version = "0.1", path = "../car-mirror", features = ["quick_cache"] } +car-mirror-axum = { path = "../car-mirror-axum" } +http = "0.2" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } test-log = { version = "0.2", default-features = false, features = ["trace"] } test-strategy = "0.3" testresult = "0.3" tokio = { version = "1.0", features = ["full"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot", "registry"] } - -[features] +wnfs-unixfs-file = { workspace = true } [package.metadata.docs.rs] all-features = true @@ -50,4 +52,4 @@ rustdoc-args = ["--cfg", "docsrs"] [[test]] name = "integration" -path = "integration/axum.rs" +path = "tests/integration.rs" diff --git a/car-mirror-reqwest/integration/axum.rs b/car-mirror-reqwest/integration/axum.rs deleted file mode 100644 index b43b18d..0000000 --- a/car-mirror-reqwest/integration/axum.rs +++ /dev/null @@ -1,147 +0,0 @@ -use anyhow::Result; -use axum::{ - body::Body, - extract::{Path, State}, - http::StatusCode, - response::{IntoResponse, Response}, - routing::{get, post}, - Json, Router, -}; -use car_mirror::{ - cache::{InMemoryCache, NoCache}, - common::Config, - messages::{PullRequest, PushResponse}, -}; -use car_mirror_reqwest::RequestBuilderExt; -use futures::TryStreamExt; -use libipld::Cid; -use reqwest::Client; -use std::{future::IntoFuture, str::FromStr}; -use tokio_util::io::StreamReader; -use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; - -#[test_log::test(tokio::test)] -async fn main() -> Result<()> { - // Say, you have a webserver running like so: - let app = Router::new() - .route("/dag/pull/:cid", get(car_mirror_pull)) - .route("/dag/push/:cid", post(car_mirror_push)) - .with_state(ServerState::new()); - - let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await?; - let port = listener.local_addr()?.port(); - tokio::spawn(axum::serve(listener, app).into_future()); - - // You can issue requests from your client like so: - let store = MemoryBlockStore::new(); - let data = b"Hello, world!".to_vec(); - let root = store.put_block(data, CODEC_RAW).await?; - - let config = &Config::default(); - - let client = Client::new(); - client - .post(format!("http://localhost:{port}/dag/push/{root}")) - .run_car_mirror_push(root, &store, &NoCache) // rounds of push protocol - .await?; - - let store = MemoryBlockStore::new(); // clear out data - client - .get(format!("http://localhost:{port}/dag/pull/{root}")) - .run_car_mirror_pull(root, config, &store, &NoCache) // rounds of pull protocol - .await?; - - assert!(store.has_block(&root).await?); - - Ok(()) -} - -// Server details: - -#[derive(Debug, Clone)] -struct ServerState { - store: MemoryBlockStore, - cache: InMemoryCache, -} - -impl ServerState { - fn new() -> Self { - Self { - store: MemoryBlockStore::new(), - cache: InMemoryCache::new(100_000), - } - } -} - -#[axum_macros::debug_handler] -async fn car_mirror_push( - State(state): State, - Path(cid_string): Path, - body: Body, -) -> Result<(StatusCode, Json), AppError> -where { - let cid = Cid::from_str(&cid_string)?; - - let body_stream = body.into_data_stream(); - - let reader = StreamReader::new( - body_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), - ); - - let response = car_mirror::push::response_streaming( - cid, - reader, - &Config::default(), - &state.store, - &state.cache, - ) - .await?; - - if response.indicates_finished() { - Ok((StatusCode::OK, Json(response))) - } else { - Ok((StatusCode::ACCEPTED, Json(response))) - } -} - -#[axum_macros::debug_handler] -async fn car_mirror_pull( - State(state): State, - Path(cid_string): Path, - Json(request): Json, -) -> Result<(StatusCode, Body), AppError> { - let cid = Cid::from_str(&cid_string)?; - - let car_chunks = car_mirror::pull::response_streaming( - cid, - request, - state.store.clone(), - state.cache.clone(), - ) - .await?; - - Ok((StatusCode::OK, Body::from_stream(car_chunks))) -} - -// Basic anyhow error handling: - -struct AppError(anyhow::Error); - -impl IntoResponse for AppError { - fn into_response(self) -> Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Something went wrong: {}", self.0), - ) - .into_response() - } -} - -impl From for AppError -where - E: Into, -{ - fn from(err: E) -> Self { - Self(err.into()) - } -} diff --git a/car-mirror-reqwest/src/error.rs b/car-mirror-reqwest/src/error.rs index a74ff80..c238a2b 100644 --- a/car-mirror-reqwest/src/error.rs +++ b/car-mirror-reqwest/src/error.rs @@ -1,4 +1,5 @@ use reqwest::Response; +use std::{collections::TryReserveError, convert::Infallible}; /// Possible errors raised in this library #[derive(thiserror::Error, Debug)] @@ -29,4 +30,12 @@ pub enum Error { /// car-mirror errors #[error(transparent)] CarMirrorError(#[from] car_mirror::Error), + + /// dag-cbor decoding errors + #[error(transparent)] + DagCborDecodeError(#[from] serde_ipld_dagcbor::DecodeError), + + /// dag-cbor encoding errors + #[error(transparent)] + DagCborEncodeError(#[from] serde_ipld_dagcbor::EncodeError), } diff --git a/car-mirror-reqwest/src/lib.rs b/car-mirror-reqwest/src/lib.rs index d18aa59..3682350 100644 --- a/car-mirror-reqwest/src/lib.rs +++ b/car-mirror-reqwest/src/lib.rs @@ -6,21 +6,26 @@ //! //! A helper library that helps making car-mirror client requests using reqwest. //! -//! ```no_run +//! ## Examples +//! +//! ``` //! # use anyhow::Result; //! use car_mirror::{cache::NoCache, common::Config}; //! use car_mirror_reqwest::RequestBuilderExt; +//! use reqwest::Client; //! use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; //! -//! # #[tokio::main] +//! # #[test_log::test(tokio::main)] //! # async fn main() -> Result<()> { +//! // Say, you have a webserver that supports car-mirror requests running: +//! tokio::spawn(car_mirror_axum::serve(MemoryBlockStore::new())); +//! +//! // You can issue requests from your client like so: //! let store = MemoryBlockStore::new(); //! let data = b"Hello, world!".to_vec(); //! let root = store.put_block(data, CODEC_RAW).await?; //! -//! let config = &Config::default(); -//! -//! let client = reqwest::Client::new(); +//! let client = Client::new(); //! client //! .post(format!("http://localhost:3344/dag/push/{root}")) //! .run_car_mirror_push(root, &store, &NoCache) // rounds of push protocol @@ -28,16 +33,14 @@ //! //! let store = MemoryBlockStore::new(); // clear out data //! client -//! .get(format!("http://localhost:3344/dag/pull/{root}")) -//! .run_car_mirror_pull(root, config, &store, &NoCache) // rounds of pull protocol +//! .post(format!("http://localhost:3344/dag/pull/{root}")) +//! .run_car_mirror_pull(root, &Config::default(), &store, &NoCache) // rounds of pull protocol //! .await?; //! //! assert!(store.has_block(&root).await?); //! # Ok(()) //! # } //! ``` -//! -//! For the full example, please see `integration/axum.rs` in the source repository. mod error; mod request; diff --git a/car-mirror-reqwest/src/request.rs b/car-mirror-reqwest/src/request.rs index 641183a..7032994 100644 --- a/car-mirror-reqwest/src/request.rs +++ b/car-mirror-reqwest/src/request.rs @@ -1,9 +1,10 @@ use crate::Error; use anyhow::Result; -use car_mirror::{cache::Cache, common::Config, messages::PullRequest}; +use car_mirror::{cache::Cache, common::Config, messages::PushResponse}; use futures::{Future, TryStreamExt}; use libipld::Cid; use reqwest::{Body, Response, StatusCode}; +use std::{collections::TryReserveError, convert::Infallible}; use tokio_util::io::StreamReader; use wnfs_common::BlockStore; @@ -71,14 +72,8 @@ impl RequestBuilderExt for reqwest_middleware::RequestBuilder { store: &(impl BlockStore + Clone + 'static), cache: &(impl Cache + Clone + 'static), ) -> Result<(), Error> { - push_with(root, store, cache, |body| async { - Ok::<_, Error>( - self.try_clone() - .ok_or(Error::RequestBuilderBodyAlreadySet)? - .body(body) - .send() - .await?, - ) + push_with(root, store, cache, |body| { + send_middleware_reqwest(self, body) }) .await } @@ -90,19 +85,26 @@ impl RequestBuilderExt for reqwest_middleware::RequestBuilder { store: &impl BlockStore, cache: &impl Cache, ) -> Result<(), Error> { - pull_with(root, config, store, cache, |pull_request| async move { - Ok::<_, Error>( - self.try_clone() - .ok_or(Error::RequestBuilderBodyAlreadySet)? - .json(&pull_request) - .send() - .await?, - ) + pull_with(root, config, store, cache, |body| { + send_middleware_reqwest(self, body) }) .await } } +async fn send_middleware_reqwest( + builder: &reqwest_middleware::RequestBuilder, + body: reqwest::Body, +) -> Result { + Ok(builder + .try_clone() + .ok_or(Error::RequestBuilderBodyAlreadySet)? + .header("Content-Type", "application/vnd.ipld.dag-cbor") + .body(body) + .send() + .await?) +} + impl RequestBuilderExt for reqwest::RequestBuilder { async fn run_car_mirror_push( &self, @@ -110,16 +112,7 @@ impl RequestBuilderExt for reqwest::RequestBuilder { store: &(impl BlockStore + Clone + 'static), cache: &(impl Cache + Clone + 'static), ) -> Result<(), Error> { - push_with(root, store, cache, |body| async { - Ok::<_, Error>( - self.try_clone() - .ok_or(Error::RequestBuilderBodyAlreadySet)? - .body(body) - .send() - .await?, - ) - }) - .await + push_with(root, store, cache, |body| send_reqwest(self, body)).await } async fn run_car_mirror_pull( @@ -129,19 +122,23 @@ impl RequestBuilderExt for reqwest::RequestBuilder { store: &impl BlockStore, cache: &impl Cache, ) -> Result<(), Error> { - pull_with(root, config, store, cache, |pull_request| async move { - Ok::<_, Error>( - self.try_clone() - .ok_or(Error::RequestBuilderBodyAlreadySet)? - .json(&pull_request) - .send() - .await?, - ) - }) - .await + pull_with(root, config, store, cache, |body| send_reqwest(self, body)).await } } +async fn send_reqwest( + builder: &reqwest::RequestBuilder, + body: reqwest::Body, +) -> Result { + Ok(builder + .try_clone() + .ok_or(Error::RequestBuilderBodyAlreadySet)? + .header("Content-Type", "application/vnd.ipld.dag-cbor") + .body(body) + .send() + .await?) +} + /// Run (possibly multiple rounds of) the car mirror push protocol. /// /// See `run_car_mirror_push` for a more ergonomic interface. @@ -160,6 +157,7 @@ where E: From, E: From, E: From, + E: From>, { let mut push_state = None; @@ -184,7 +182,11 @@ where } } - push_state = Some(response.json().await?); + let response_bytes = response.bytes().await?; + + let push_response = PushResponse::from_dag_cbor(&response_bytes)?; + + push_state = Some(push_response); } } @@ -194,6 +196,9 @@ where /// /// Unlike `run_car_mirror_pull`, this allows customizing the /// request every time it gets built, e.g. to refresh authentication tokens. +/// +/// **Important:** Don't forget to set the `Content-Type` header to +/// `application/vnd.ipld.dag-cbor` on your requests. pub async fn pull_with( root: Cid, config: &Config, @@ -202,15 +207,18 @@ pub async fn pull_with( mut make_request: F, ) -> Result<(), E> where - F: FnMut(PullRequest) -> Fut, + F: FnMut(reqwest::Body) -> Fut, Fut: Future>, E: From, E: From, + E: From>, { let mut pull_request = car_mirror::pull::request(root, None, config, store, cache).await?; while !pull_request.indicates_finished() { - let answer = make_request(pull_request).await?.error_for_status()?; + let answer = make_request(pull_request.to_dag_cbor()?.into()) + .await? + .error_for_status()?; let stream = StreamReader::new(answer.bytes_stream().map_err(std::io::Error::other)); diff --git a/car-mirror-reqwest/tests/integration.rs b/car-mirror-reqwest/tests/integration.rs new file mode 100644 index 0000000..557116e --- /dev/null +++ b/car-mirror-reqwest/tests/integration.rs @@ -0,0 +1,31 @@ +//! A copy of the doctest in lib.rs, because code coverage is buggy +//! with doctests. +use car_mirror::{cache::NoCache, common::Config}; +use car_mirror_reqwest::RequestBuilderExt; +use reqwest::Client; +use testresult::TestResult; +use wnfs_common::{BlockStore, MemoryBlockStore, CODEC_RAW}; + +#[test_log::test(tokio::test)] +async fn test_car_mirror_reqwest_axum_integration() -> TestResult { + tokio::spawn(car_mirror_axum::serve(MemoryBlockStore::new())); + + let store = MemoryBlockStore::new(); + let data = b"Hello, world!".to_vec(); + let root = store.put_block(data, CODEC_RAW).await?; + + let client = Client::new(); + client + .post(format!("http://localhost:3344/dag/push/{root}")) + .run_car_mirror_push(root, &store, &NoCache) + .await?; + + let store = MemoryBlockStore::new(); // clear out data + client + .post(format!("http://localhost:3344/dag/pull/{root}")) + .run_car_mirror_pull(root, &Config::default(), &store, &NoCache) + .await?; + + assert!(store.has_block(&root).await?); + Ok(()) +} diff --git a/car-mirror-wasm/.gitignore b/car-mirror-wasm/.gitignore new file mode 100644 index 0000000..f3ac103 --- /dev/null +++ b/car-mirror-wasm/.gitignore @@ -0,0 +1,7 @@ +.wireit/ +dist/ +node_modules/ +/test-results/ +/playwright-report/ +/blob-report/ +/playwright/.cache/ diff --git a/car-mirror-wasm/Cargo.toml b/car-mirror-wasm/Cargo.toml index 3ae7cf7..6ee76bb 100644 --- a/car-mirror-wasm/Cargo.toml +++ b/car-mirror-wasm/Cargo.toml @@ -11,38 +11,37 @@ edition = "2021" rust-version = "1.75" documentation = "https://docs.rs/car-mirror-wasm" repository = "https://github.com/fission-codes/rs-car-mirror/tree/main/car-mirror-wasm" -authors = ["Philipp Krüger "] +authors = ["Philipp Krüger "] [lib] crate-type = ["cdylib", "rlib"] path = "src/lib.rs" [dependencies] +anyhow = { workspace = true } +bytes = { workspace = true } +car-mirror = { path = "../car-mirror" } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires # all the `std::fmt` and `std::panicking` infrastructure, so isn't great for # code size when deploying. -console_error_panic_hook = { version = "0.1", optional = true } -js-sys = { version = "0.3", optional = true } +console_error_panic_hook = { version = "0.1" } +futures = { workspace = true } +js-sys = { version = "0.3" } +libipld = { workspace = true } +serde-wasm-bindgen = "0.6.5" +serde_json = { workspace = true } +tokio-util = { workspace = true, features = ["compat", "io"] } tracing = "0.1" -wasm-bindgen = { version = "0.2", optional = true, features = ["serde-serialize"] } -wasm-bindgen-futures = { version = "0.4", optional = true } -web-sys = { version = "0.3", optional = true } +wasm-bindgen = { workspace = true } +wasm-bindgen-futures = { workspace = true } +wasm-streams = "0.4" +web-sys = { version = "0.3" } +wnfs-common = { workspace = true } [dev-dependencies] wasm-bindgen-test = "0.3" -[features] -default = ["js"] -full = ["js", "web"] -js = [ - "console_error_panic_hook", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures" -] -web = ["web-sys"] - [package.metadata.docs.rs] all-features = true # defines the configuration attribute `docsrs` diff --git a/car-mirror-wasm/README.md b/car-mirror-wasm/README.md index 35a32f8..35d4fea 100644 --- a/car-mirror-wasm/README.md +++ b/car-mirror-wasm/README.md @@ -49,62 +49,19 @@ Description. ## Set-up -We'll use [`wasm-pack`][wasm-pack] for building, testing, and publishing -our Wasm project. +TODO ### Build for Javascript -The `wasm-pack build` command will compile the code in this directory into -Wasm and generate a `pkg` folder by default, containing the Wasm binary, a -Javascript-wrapper file, the car-mirror-wasm README (and version), and a -`package.json` file. - -- Targetting node: - - ```console - wasm-pack build --target nodejs - ``` - -- Targetting browswers: - - ```console - wasm-pack build --target web - ``` - -- Targetting bundlers like [webpack][webpack]: - - ```console - wasm-pack build --target bundler - ``` +TODO ## Testing the Project -For running tests in the current directory, use one of these commands: - -- Run tests expected to execute in [Node.js][node-js]: - -```console -wasm-pack test --node -``` - -- Run browser tests in a headless browwer: - -```console -wasm-pack test --headless --firefox --chrome --safari -``` - -*Note*: Make sure you have the appropriate browser installed when running -locally. +TODO ## Publishing a Package -Once you've [built the package](#build-for-javascript), which lives under -`pkg` by default (or a sub-directory of your choosing), you can pack and -publish it to [npm][npm] via (given credentials): - -```console -wasm-pack publish -``` +TODO ## License diff --git a/car-mirror-wasm/index.html b/car-mirror-wasm/index.html new file mode 100644 index 0000000..8c83653 --- /dev/null +++ b/car-mirror-wasm/index.html @@ -0,0 +1,12 @@ + + + + + + + + + + + diff --git a/car-mirror-wasm/package.json b/car-mirror-wasm/package.json new file mode 100644 index 0000000..a72e35c --- /dev/null +++ b/car-mirror-wasm/package.json @@ -0,0 +1,198 @@ +{ + "name": "car-mirror", + "version": "0.1.0", + "description": "Wasm bindings to the rust implementation of the car mirror protocol", + "main": "index.js", + "type": "module", + "module": "dist/bundler/car_mirror_wasm.js", + "types": "dist/nodejs/car_mirror_wasm.d.ts", + "exports": { + ".": { + "browser": "./dist/bundler/car_mirror_wasm.js", + "node": "./dist/nodejs/car_mirror_wasm.cjs", + "default": "./dist/bundler/car_mirror_wasm.js", + "types": "./dist/nodejs/car_mirror_wasm.d.ts" + }, + "./nodejs": { + "default": "./dist/nodejs/car_mirror_wasm.cjs", + "types": "./dist/nodejs/car_mirror_wasm.d.ts" + }, + "./web": { + "default": "./dist/web/car_mirror_wasm.js", + "types": "./dist/web/car_mirror_wasm.d.ts" + } + }, + "directories": { + "test": "tests" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/fission-codes/rs-car-mirror.git" + }, + "author": "Philipp Krüger ", + "license": "Apache-2.0", + "bugs": { + "url": "https://github.com/fission-codes/rs-car-mirror/issues" + }, + "homepage": "https://github.com/fission-codes/rs-car-mirror#readme", + "dependencies": { + "@ipld/unixfs": "^3.0.0", + "fetch-h2": "^3.0.2", + "multiformats": "^13.1.0", + "playwright-test": "^14.1.1" + }, + "devDependencies": { + "@playwright/test": "^1.42.0", + "@types/node": "^20.11.24", + "http-server": "^14.1.1", + "wireit": "^0.14.4" + }, + "scripts": { + "build": "wireit", + "test": "wireit", + "http-server": "wireit", + "release": "export PROFILE=release && export TARGET_DIR=release && npm run build" + }, + "wireit": { + "compile": { + "command": "cargo build --target wasm32-unknown-unknown --profile $PROFILE", + "files": [ + "src/**/*.rs", + "src/*.rs" + ], + "env": { + "PROFILE": { + "external": true, + "default": "dev" + } + } + }, + "opt": { + "command": "wasm-opt -O1 ../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm.wasm -o ../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm-opt.wasm", + "files": [ + "../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm.wasm" + ], + "output": [ + "../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm-opt.wasm" + ], + "env": { + "TARGET_DIR": { + "external": true, + "default": "debug" + } + }, + "dependencies": [ + "compile" + ] + }, + "bindgen:bundler": { + "command": "wasm-bindgen --weak-refs --target bundler --out-dir dist/bundler ../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm.wasm", + "files": [ + "../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm-opt.wasm" + ], + "env": { + "TARGET_DIR": { + "external": true, + "default": "debug" + } + }, + "dependencies": [ + "opt" + ], + "output": [ + "dist/bundler" + ] + }, + "bindgen:nodejs": { + "command": "wasm-bindgen --weak-refs --target nodejs --out-dir dist/nodejs ../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm.wasm", + "env": { + "TARGET_DIR": { + "external": true, + "default": "debug" + } + }, + "dependencies": [ + "opt" + ], + "output": [ + "dist/nodejs" + ] + }, + "bindgen:web": { + "command": "wasm-bindgen --weak-refs --target web --out-dir dist/web ../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm.wasm", + "env": { + "TARGET_DIR": { + "external": true, + "default": "debug" + } + }, + "dependencies": [ + "opt" + ], + "output": [ + "dist/web" + ] + }, + "bindgen:browser": { + "command": "wasm-bindgen --weak-refs --target web --browser --out-dir dist/web ../target/wasm32-unknown-unknown/$TARGET_DIR/car_mirror_wasm.wasm", + "env": { + "TARGET_DIR": { + "external": true, + "default": "debug" + } + }, + "dependencies": [ + "opt" + ], + "output": [ + "dist/web" + ] + }, + "build": { + "dependencies": [ + "bindgen:bundler", + "bindgen:nodejs", + "bindgen:web" + ] + }, + "http-server": { + "command": "http-server . --port 8081", + "service": { + "readyWhen": { + "lineMatches": "Available on:.*" + } + }, + "dependencies": [ + "bindgen:web" + ] + }, + "car-mirror-server": { + "command": "cargo run -p car-mirror-axum --example serve_test_data", + "env": { + "RUST_LOG": { + "external": true, + "default": "car_mirror_axum=debug,car_mirror=debug" + } + }, + "service": { + "readyWhen": { + "lineMatches": "Listening on 0.0.0.0:3344" + } + } + }, + "test": { + "command": "playwright-test 'test/**/*.spec.js'", + "dependencies": [ + "bindgen:bundler", + "http-server", + "car-mirror-server" + ] + } + }, + "playwright-test": { + "browserContextOptions": { + "baseURL": "http://127.0.0.1:8081", + "ignoreHTTPSErrors": true + } + } +} diff --git a/car-mirror-wasm/src/blockstore.rs b/car-mirror-wasm/src/blockstore.rs new file mode 100644 index 0000000..8e028b8 --- /dev/null +++ b/car-mirror-wasm/src/blockstore.rs @@ -0,0 +1,93 @@ +//! The bindgen API for WNFS block store. + +use anyhow::{anyhow, Result}; +use bytes::Bytes; +use js_sys::{Promise, Uint8Array}; +use libipld::cid::Cid; +use wasm_bindgen::{prelude::wasm_bindgen, JsCast}; +use wasm_bindgen_futures::JsFuture; +use wnfs_common::{BlockStore as WnfsBlockStore, BlockStoreError}; + +//-------------------------------------------------------------------------------------------------- +// Externs +//-------------------------------------------------------------------------------------------------- + +#[wasm_bindgen(typescript_custom_section)] +const TS_BLOCKSTORE: &'static str = r#" +export interface BlockStore { + putBlockKeyed(cid: Uint8Array, bytes: Uint8Array): Promise; + getBlock(cid: Uint8Array): Promise; + hasBlock(cid: Uint8Array): Promise; +} +"#; + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(typescript_type = "BlockStore")] + pub type BlockStore; + + #[wasm_bindgen(method, js_name = "putBlockKeyed")] + pub(crate) fn put_block_keyed(store: &BlockStore, cid: Vec, bytes: Vec) -> Promise; + + #[wasm_bindgen(method, js_name = "getBlock")] + pub(crate) fn get_block(store: &BlockStore, cid: Vec) -> Promise; + + #[wasm_bindgen(method, js_name = "hasBlock")] + pub(crate) fn has_block(store: &BlockStore, cid: Vec) -> Promise; +} + +//-------------------------------------------------------------------------------------------------- +// Type Definitions +//-------------------------------------------------------------------------------------------------- + +/// A block store provided by the host (JavaScript) for custom implementation like connection to the IPFS network. +#[wasm_bindgen] +pub struct ForeignBlockStore(pub(crate) BlockStore); + +impl Clone for ForeignBlockStore { + fn clone(&self) -> Self { + Self(BlockStore::unchecked_from_js(self.0.clone())) + } +} + +//-------------------------------------------------------------------------------------------------- +// Implementations +//-------------------------------------------------------------------------------------------------- + +impl WnfsBlockStore for ForeignBlockStore { + async fn put_block_keyed( + &self, + cid: Cid, + bytes: impl Into, + ) -> Result<(), BlockStoreError> { + let bytes: Bytes = bytes.into(); + + JsFuture::from(self.0.put_block_keyed(cid.to_bytes(), bytes.into())) + .await + .map_err(|e| anyhow!("Cannot put block: {e:?}"))?; + + Ok(()) + } + + async fn get_block(&self, cid: &Cid) -> Result { + let value = JsFuture::from(self.0.get_block(cid.to_bytes())) + .await + .map_err(|e| anyhow!("Cannot get block: {e:?}"))?; + + if value.is_undefined() { + return Err(BlockStoreError::CIDNotFound(*cid)); + } + + // Convert the value to a vector of bytes. + let bytes = Uint8Array::new(&value).to_vec(); + Ok(Bytes::from(bytes)) + } + + async fn has_block(&self, cid: &Cid) -> Result { + let value = JsFuture::from(self.0.has_block(cid.to_bytes())) + .await + .map_err(|e| anyhow!("Cannot run has_block: {e:?}"))?; + + Ok(js_sys::Boolean::from(value).value_of()) + } +} diff --git a/car-mirror-wasm/src/exports.rs b/car-mirror-wasm/src/exports.rs new file mode 100644 index 0000000..b570ab3 --- /dev/null +++ b/car-mirror-wasm/src/exports.rs @@ -0,0 +1,172 @@ +use crate::{ + blockstore::{BlockStore, ForeignBlockStore}, + messages::{PullRequest, PushResponse}, + utils::{handle_jserr, parse_cid}, +}; +use bytes::BytesMut; +use car_mirror::{cache::NoCache, common::Config}; +use futures::{StreamExt, TryStreamExt}; +use js_sys::{Error, Promise, Uint8Array}; +use std::rc::Rc; +use tokio_util::compat::FuturesAsyncReadCompatExt; +use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; +use wasm_bindgen_futures::future_to_promise; +use wasm_streams::ReadableStream; + +/// Compute the bytes for a non-streaming push request, given +/// the byte-encoded root CID, the PushResponse from the last round, +/// except in the case of the first round, and a BlockStore. +/// +/// Returns a promise that resolves to a `Uint8Array` of car file +/// bytes. +#[wasm_bindgen] +pub fn push_request( + root_cid: Vec, + last_response: Option, + store: BlockStore, +) -> Result { + let store = ForeignBlockStore(store); + let root = parse_cid(root_cid)?; + let last_response = if let Some(push_response) = last_response { + Some(Rc::try_unwrap(push_response.0).unwrap_or_else(|rc| rc.as_ref().clone())) + } else { + None + }; + + Ok(future_to_promise(async move { + let car_file = + car_mirror::push::request(root, last_response, &Config::default(), &store, NoCache) + .await + .map_err(handle_jserr)?; + + let uint8array = Uint8Array::from(car_file.bytes.as_ref()); + + Ok(uint8array.into()) + })) +} + +/// Creates a stream of bytes for a streaming push request, given +/// the byte-encoded root CID, the PushResponse from the last round, +/// except in the case of the first round, and a BlockStore. +/// +/// Returns a promise that resolves to a `ReadableStream` +/// of car file frames. +/// +/// This function is unlikely to work in browsers, unless you're +/// using a Chrome-based browser that supports half-duplex fetch +/// requests and the car mirror server supports HTTP2. +#[wasm_bindgen] +pub fn push_request_streaming( + root_cid: Vec, + last_response: Option, + store: BlockStore, +) -> Result { + let store = ForeignBlockStore(store); + let root = parse_cid(root_cid)?; + let last_response = if let Some(push_response) = last_response { + Some(Rc::try_unwrap(push_response.0).unwrap_or_else(|rc| rc.as_ref().clone())) + } else { + None + }; + + Ok(future_to_promise(async move { + let car_stream = + car_mirror::push::request_streaming(root, last_response, store.clone(), NoCache) + .await + .map_err(handle_jserr)?; + + let js_car_stream = car_stream + .map_ok(|bytes| JsValue::from(Uint8Array::from(bytes.as_ref()))) + .map_err(handle_jserr); + + Ok(ReadableStream::from_stream(js_car_stream).into_raw().into()) + })) +} + +/// Compute the pull request for given byte-encoded root CID with +/// given BlockStore state. +/// +/// Returns a promise that resolves to an instance of the `PullRequest` +/// class. +#[wasm_bindgen] +pub fn pull_request(root_cid: Vec, store: BlockStore) -> Result { + let store = ForeignBlockStore(store); + let root = parse_cid(root_cid)?; + + Ok(future_to_promise(async move { + let pull_request = + car_mirror::pull::request(root, None, &Config::default(), store, NoCache) + .await + .map_err(handle_jserr)?; + + Ok(PullRequest(Rc::new(pull_request)).into()) + })) +} + +/// Handle a response from a car-mirror pull request in a streaming way, +/// givena byte-encoded root CID, a `ReadableStream` and a +/// `BlockStore`. +/// +/// This function may return before draining the whole `stream` with +/// updates about the latest receiver state. +/// +/// In that case, the request should be interrupted and a new one should +/// be started. +/// +/// Returns a promise that resolves to an instance of the `PullRequest` +/// class. +#[wasm_bindgen] +pub fn pull_handle_response_streaming( + root_cid: Vec, + readable_stream: web_sys::ReadableStream, + store: BlockStore, +) -> Result { + let store = ForeignBlockStore(store); + let root = parse_cid(root_cid)?; + let readable_stream = ReadableStream::from_raw(readable_stream); + + Ok(future_to_promise(async move { + let pull_request = match readable_stream.try_into_async_read() { + Ok(async_read) => car_mirror::pull::handle_response_streaming( + root, + async_read.compat(), + &Config::default(), + store, + NoCache, + ) + .await + .map_err(handle_jserr)?, + + // If BYOB readers are unsupported: + Err((_, readable_stream)) => { + let stream = readable_stream + .into_stream() + .map(|result| result.and_then(convert_jsvalue_to_bytes)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Error while trying to read item from stream or trying to convert the item into bytes on the rust side.")); + + let async_read = tokio_util::io::StreamReader::new(stream); + + car_mirror::pull::handle_response_streaming( + root, + async_read, + &Config::default(), + store, + NoCache, + ) + .await + .map_err(handle_jserr)? + } + }; + Ok(PullRequest(Rc::new(pull_request)).into()) + })) +} + +fn convert_jsvalue_to_bytes(js_value: JsValue) -> Result { + let uint8array = Uint8Array::new(&js_value); + + let mut result = BytesMut::with_capacity(uint8array.length() as usize); + result.resize(uint8array.length() as usize, 0); + uint8array.copy_to(&mut result); + + Ok(result) +} diff --git a/car-mirror-wasm/src/lib.rs b/car-mirror-wasm/src/lib.rs index 0432b03..d5781fa 100644 --- a/car-mirror-wasm/src/lib.rs +++ b/car-mirror-wasm/src/lib.rs @@ -1,16 +1,21 @@ +//! # car-mirror-wasm +//! +//! This crate exposes wasm bindings to car-mirror *client* functions. + #![cfg_attr(docsrs, feature(doc_cfg))] -#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)] +#![warn(missing_docs, rust_2018_idioms)] #![deny(unreachable_pub)] +#![cfg(target_arch = "wasm32")] -//! car-mirror +/// A `BlockStore` implementation based on a JS interface +pub mod blockstore; +/// Bindings to the request and response messages used in car mirror +pub mod messages; -use wasm_bindgen::prelude::wasm_bindgen; +mod exports; +mod utils; -/// Add two integers together. -#[wasm_bindgen] -pub fn add(a: i32, b: i32) -> i32 { - a + b -} +pub use exports::*; //------------------------------------------------------------------------------ // Utilities @@ -20,35 +25,8 @@ pub fn add(a: i32, b: i32) -> i32 { /// /// For more details see /// -#[wasm_bindgen(js_name = "setPanicHook")] +#[wasm_bindgen::prelude::wasm_bindgen(js_name = "setPanicHook")] pub fn set_panic_hook() { #[cfg(feature = "console_error_panic_hook")] console_error_panic_hook::set_once(); } - -#[wasm_bindgen] -extern "C" { - // For alerting - pub(crate) fn alert(s: &str); - // For logging in the console. - #[wasm_bindgen(js_namespace = console)] - pub fn log(s: &str); -} - -//------------------------------------------------------------------------------ -// Macros -//------------------------------------------------------------------------------ - -/// Return a representation of an object owned by JS. -#[macro_export] -macro_rules! value { - ($value:expr) => { - wasm_bindgen::JsValue::from($value) - }; -} - -/// Calls the wasm_bindgen console.log. -#[macro_export] -macro_rules! console_log { - ($($t:tt)*) => ($crate::log(&format_args!($($t)*).to_string())) -} diff --git a/car-mirror-wasm/src/messages.rs b/car-mirror-wasm/src/messages.rs new file mode 100644 index 0000000..97984ab --- /dev/null +++ b/car-mirror-wasm/src/messages.rs @@ -0,0 +1,77 @@ +use std::rc::Rc; + +use crate::utils::handle_err; +use js_sys::Error; +use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; + +/// Bindings to the `PullRequest` message type from car mirror +#[wasm_bindgen] +pub struct PullRequest(pub(crate) Rc); + +/// Bindings to the `PushResponse` message type from car mirror +#[wasm_bindgen] +pub struct PushResponse(pub(crate) Rc); + +#[wasm_bindgen] +impl PullRequest { + /// Decode a pull request from a javascript object + #[wasm_bindgen(js_name = "fromJSON")] + pub fn from_json(value: JsValue) -> Result { + Ok(Self(Rc::new( + serde_wasm_bindgen::from_value(value).map_err(handle_err)?, + ))) + } + + /// Encode this pull request as a javascript object + #[wasm_bindgen(js_name = "toJSON")] + pub fn to_json(&self) -> Result { + serde_wasm_bindgen::to_value(self.0.as_ref()).map_err(handle_err) + } + + /// Encode this pull request as bytes. + /// This is the efficient representation that should be sent over the wire. + #[wasm_bindgen(js_name = "encode")] + pub fn encode(&self) -> Result, Error> { + self.0.to_dag_cbor().map_err(handle_err) + } + + /// Given this pull request as the latest state in the protocol, returns + /// whether the protocol need to continue another round. + #[wasm_bindgen(js_name = "indicatesFinished")] + pub fn indicates_finished(&self) -> bool { + self.0.indicates_finished() + } +} + +#[wasm_bindgen] +impl PushResponse { + /// Decode a push response from a javascript object + #[wasm_bindgen(js_name = "fromJSON")] + pub fn from_json(value: JsValue) -> Result { + Ok(Self(Rc::new( + serde_wasm_bindgen::from_value(value).map_err(handle_err)?, + ))) + } + + /// Encode this push response as a javascript object + #[wasm_bindgen(js_name = "toJSON")] + pub fn to_json(&self) -> Result { + serde_wasm_bindgen::to_value(self.0.as_ref()).map_err(handle_err) + } + + /// Decode a push response from bytes. + /// This decodes the efficient representation that is sent over the wire. + #[wasm_bindgen(js_name = "decode")] + pub fn decode(bytes: Vec) -> Result { + let response = + car_mirror::messages::PushResponse::from_dag_cbor(&bytes).map_err(handle_err)?; + Ok(Self(Rc::new(response))) + } + + /// Given this push response as the latest state in the protocol, returns + /// whether the protocol need to continue another round. + #[wasm_bindgen(js_name = "indicatesFinished")] + pub fn indicates_finished(&self) -> bool { + self.0.indicates_finished() + } +} diff --git a/car-mirror-wasm/src/utils.rs b/car-mirror-wasm/src/utils.rs new file mode 100644 index 0000000..cfeaf7c --- /dev/null +++ b/car-mirror-wasm/src/utils.rs @@ -0,0 +1,15 @@ +use js_sys::Error; +use libipld::Cid; +use wasm_bindgen::JsValue; + +pub(crate) fn parse_cid(bytes: Vec) -> Result { + Cid::read_bytes(&bytes[..]).map_err(|e| Error::new(&format!("Couldn't parse CID: {e:?}"))) +} + +pub(crate) fn handle_jserr(e: E) -> JsValue { + JsValue::from(Error::new(&e.to_string())) +} + +pub(crate) fn handle_err(e: E) -> Error { + Error::new(&e.to_string()) +} diff --git a/car-mirror-wasm/test/index.js b/car-mirror-wasm/test/index.js new file mode 100644 index 0000000..47b6fdf --- /dev/null +++ b/car-mirror-wasm/test/index.js @@ -0,0 +1,144 @@ +import { setPanicHook, push_request, push_request_streaming, pull_request, pull_handle_response_streaming, PushResponse } from "../dist/bundler/car_mirror_wasm.js" +import { CID } from "multiformats" +import * as UnixFS from "@ipld/unixfs" + +setPanicHook(); + +const supportsRequestStreams = (() => { + let duplexAccessed = false; + + const hasContentType = new Request('', { + body: new ReadableStream(), + method: 'POST', + get duplex() { + duplexAccessed = true; + return 'half'; + }, + }).headers.has('Content-Type'); + + return duplexAccessed && !hasContentType; +})(); + +export class MemoryBlockStore { + store + + /** Creates a new in-memory block store. */ + constructor() { + this.store = new Map(); + } + + /** Stores an array of bytes in the block store. */ + async getBlock(cid) { + const decodedCid = CID.decode(cid); + return this.store.get(decodedCid.toString()); + } + + /** Retrieves an array of bytes from the block store with given CID. */ + async putBlockKeyed(cid, bytes) { + const decodedCid = CID.decode(cid); + this.store.set(decodedCid.toString(), bytes); + } + + /** Finds out whether a block is retrievable from this blockstore */ + async hasBlock(cid) { + const decodedCid = CID.decode(cid); + return this.store.has(decodedCid.toString()); + } +} + +export async function runCarMirrorPull(serverUrl, cidString, store) { + const cid = CID.parse(cidString); + const url = new URL(serverUrl); + url.pathname = `/dag/pull/${cid.toString()}`; + + let request = await pull_request(cid.bytes, store); + while (!request.indicatesFinished()) { + console.debug("Initiating request", url.toString(), JSON.stringify(request.toJSON())) + const response = await fetch(url, { + headers: { + "Accept": "application/vnd.ipld.car", + "Content-Type": "application/vnd.ipld.dag-cbor", + }, + method: "POST", + body: request.encode() + }); + console.debug("Got response status", response.status); + if (200 <= response.status && response.status < 300) { + request = await pull_handle_response_streaming(cid.bytes, response.body, store); + } else { + throw new Error(`Unexpected status code in car-mirror pull response: ${response.status}, body: ${await response.text()}`); + } + } + console.debug("Finished pulling", cidString); +} + +export async function runCarMirrorPush(serverUrl, cidString, store) { + const cid = CID.parse(cidString); + const url = new URL(serverUrl); + url.pathname = `/dag/push/${cid.toString()}`; + + const isHTTPS = url.protocol.toLowerCase() === "https"; + const useStreaming = supportsRequestStreams && isHTTPS; + + let lastResponse = null; + while (!lastResponse?.indicatesFinished()) { + console.debug(`Creating push request body (supports streams? ${supportsRequestStreams} isHTTPS? ${isHTTPS})`) + /** @type {ReadableStream | Uint8Array} */ + const body = useStreaming + ? await push_request_streaming(cid.bytes, lastResponse == null ? undefined : lastResponse, store) + : await push_request(cid.bytes, lastResponse == null ? undefined : lastResponse, store); + + console.debug("Initiating request", url.toString(), body.length ?? "(can't print length of stream)"); + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/vnd.ipld.car", + "Accept": "application/vnd.ipld.dag-cbor", + }, + duplex: useStreaming ? "half" : undefined, + body, + }); + + if (!(200 <= response.status && response.status < 300)) { + throw new Error(`Unexpected status code in car-mirror push response: ${response.status}, body: ${await response.text()}`); + } + + const responseBytes = new Uint8Array(await response.arrayBuffer()); + + lastResponse = PushResponse.decode(responseBytes); + console.debug(`Got response (status ${response.status}):`, JSON.stringify(lastResponse.toJSON())); + } + console.debug("Finished pushing", cidString); +} + +export async function exampleFile(store, writes) { + const { readable, writable } = new TransformStream({}, UnixFS.withCapacity(1048576 * 32)); + // Asynchronously write blocks to blockstore + const finishReading = (async () => { + const reader = readable.getReader(); + + while (true) { + const { done, value: block } = await reader.read(); + + if (done) { + return; + } + + console.debug("Adding block", block.cid.toString(), block.bytes.length); + await store.putBlockKeyed(block.cid.bytes, block.bytes); + } + })(); + + const writer = UnixFS.createWriter({ writable }); + const file = UnixFS.createFileWriter(writer); + + await writes(file); + + const { cid } = await file.close(); + console.debug("Computed CID", cid.toString()); + writer.close(); + + await finishReading; + + return cid; +} diff --git a/car-mirror-wasm/test/protocol.spec.js b/car-mirror-wasm/test/protocol.spec.js new file mode 100644 index 0000000..0148e31 --- /dev/null +++ b/car-mirror-wasm/test/protocol.spec.js @@ -0,0 +1,66 @@ +import { MemoryBlockStore, exampleFile, runCarMirrorPull, runCarMirrorPush } from "./index.js" +import { CID } from "multiformats" +import { assert, suite } from 'playwright-test/taps' + +const test = suite("protocol"); + +test("car mirror pull http", () => testPull("http")); +test("car mirror pull https", () => testPull("https")); + +test("car mirror incremental push http", () => testIncrementalPush("http")); +test("car mirror incremental push https", () => testIncrementalPush("https")); + +test("car mirror both push then pull http", () => testPushThenPull("http")); +test("car mirror both push then pull https", () => testPushThenPull("https")); + + +async function testPull(protocol) { + const store = new MemoryBlockStore(); + // This is the sample data that the server serves. 100MB of ChaCha8 data from the 0 seed encoded as UnixFS file + const cid = CID.parse("bafyb4ifjd76kkpos2uiv5mqifs4vi2xtywhf7pnu2pqtlg2vzmtmpyzdfa"); + await runCarMirrorPull(`${protocol}://localhost:3344/dag/pull`, cid.toString(), store); + + assert.equal(await store.hasBlock(cid.bytes), true); + assert.equal(store.store.size > 300, true); +} + + +async function testIncrementalPush(protocol) { + const storeSmall = new MemoryBlockStore(); + const wasmCidSmall = await exampleFile(storeSmall, async (file) => { + const wasm = await (await fetch("./dist/web/car_mirror_wasm_bg.wasm")).arrayBuffer(); + for (let i = 0; i < 5; i++) { + file.write(new Uint8Array(wasm)); + } + }); + await runCarMirrorPush(`${protocol}://localhost:3344/dag/push`, wasmCidSmall.toString(), storeSmall); + + // and then push another time, this time more data, but sharing data with the previous push + const storeBig = new MemoryBlockStore(); + const wasmCidBig = await exampleFile(storeBig, async (file) => { + const wasm = await (await fetch("./dist/web/car_mirror_wasm_bg.wasm")).arrayBuffer(); + for (let i = 0; i < 10; i++) { + file.write(new Uint8Array(wasm)); + } + }); + await runCarMirrorPush(`${protocol}://localhost:3344/dag/push`, wasmCidBig.toString(), storeBig); +} + + +async function testPushThenPull(protocol) { + let store = new MemoryBlockStore(); + const wasmCid = await exampleFile(store, async (file) => { + const wasm = await (await fetch("./dist/web/car_mirror_wasm_bg.wasm")).arrayBuffer(); + for (let i = 0; i < 5; i++) { + file.write(new Uint8Array(wasm)); + } + }); + await runCarMirrorPush(`${protocol}://localhost:3344/dag/push`, wasmCid.toString(), store); + + // Clear the store + store = new MemoryBlockStore(); + await runCarMirrorPull(`${protocol}://localhost:3344/dag/pull`, wasmCid.toString(), store); + + assert.equal(await store.hasBlock(wasmCid.bytes), true); + assert.equal(store.store.size > 10, true); +} diff --git a/car-mirror-wasm/tests/web.rs b/car-mirror-wasm/tests/web.rs deleted file mode 100644 index a58c626..0000000 --- a/car-mirror-wasm/tests/web.rs +++ /dev/null @@ -1,12 +0,0 @@ -#![cfg(target_arch = "wasm32")] - -//! Test suite for the Web and headless browsers. - -use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; -wasm_bindgen_test_configure!(run_in_browser); - -#[wasm_bindgen_test] -fn test_add() { - assert_eq!(car_mirror_wasm::add(3, 2), 5); - car_mirror_wasm::console_log!("{}", "Test passes!"); -} diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 06f99c3..4ecfebd 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -21,6 +21,7 @@ doctest = true anyhow = { workspace = true } async-stream = { workspace = true } bytes = { workspace = true } +data-encoding = "2.5.0" deterministic-bloom = "0.1" futures = { workspace = true } iroh-car = "0.4" @@ -30,6 +31,7 @@ proptest = { version = "1.1", optional = true } quick_cache = { version = "0.4", optional = true } roaring-graphs = { version = "0.12", optional = true } serde = "^1" +serde_bytes = { workspace = true } serde_ipld_dagcbor = { workspace = true } thiserror = "1.0" tokio = { version = "^1", default-features = false } @@ -42,6 +44,7 @@ async-std = { version = "1.11", features = ["attributes"] } car-mirror = { path = ".", features = ["quick_cache", "test_utils"] } proptest = "1.1" roaring-graphs = "0.12" +serde_json = { workspace = true } test-log = { version = "0.2", default-features = false, features = ["trace"] } test-strategy = "0.3" testresult = "0.3" diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index c757177..f5a6bb9 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -1,3 +1,10 @@ +use crate::{ + cache::Cache, + dag_walk::DagWalk, + error::Error, + incremental_verification::{BlockState, IncrementalDagVerification}, + messages::{PullRequest, PushResponse}, +}; use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; use futures::{StreamExt, TryStreamExt}; @@ -10,14 +17,6 @@ use wnfs_common::{ BlockStore, }; -use crate::{ - cache::Cache, - dag_walk::DagWalk, - error::Error, - incremental_verification::{BlockState, IncrementalDagVerification}, - messages::{Bloom, PullRequest, PushResponse}, -}; - //-------------------------------------------------------------------------------------------------- // Types //-------------------------------------------------------------------------------------------------- @@ -517,23 +516,28 @@ impl From for ReceiverState { fn from(push: PushResponse) -> Self { let PushResponse { subgraph_roots, - bloom, + bloom_hash_count: hash_count, + bloom_bytes: bytes, } = push; Self { missing_subgraph_roots: subgraph_roots, - have_cids_bloom: Self::bloom_deserialize(bloom), + have_cids_bloom: Self::bloom_deserialize(hash_count, bytes), } } } impl From for ReceiverState { fn from(pull: PullRequest) -> Self { - let PullRequest { resources, bloom } = pull; + let PullRequest { + resources, + bloom_hash_count: hash_count, + bloom_bytes: bytes, + } = pull; Self { missing_subgraph_roots: resources, - have_cids_bloom: Self::bloom_deserialize(bloom), + have_cids_bloom: Self::bloom_deserialize(hash_count, bytes), } } } @@ -545,11 +549,12 @@ impl From for PushResponse { have_cids_bloom, } = receiver_state; - let bloom = ReceiverState::bloom_serialize(have_cids_bloom); + let (hash_count, bytes) = ReceiverState::bloom_serialize(have_cids_bloom); PushResponse { subgraph_roots: missing_subgraph_roots, - bloom, + bloom_hash_count: hash_count, + bloom_bytes: bytes, } } } @@ -561,36 +566,31 @@ impl From for PullRequest { have_cids_bloom, } = receiver_state; - let bloom = ReceiverState::bloom_serialize(have_cids_bloom); + let (hash_count, bytes) = ReceiverState::bloom_serialize(have_cids_bloom); PullRequest { resources: missing_subgraph_roots, - bloom, + bloom_hash_count: hash_count, + bloom_bytes: bytes, } } } impl ReceiverState { - fn bloom_serialize(bloom: Option) -> Bloom { + fn bloom_serialize(bloom: Option) -> (u32, Vec) { match bloom { - Some(bloom) => Bloom { - hash_count: bloom.hash_count() as u32, - bytes: bloom.as_bytes().to_vec(), - }, - None => Bloom { - hash_count: 3, - bytes: Vec::new(), - }, + Some(bloom) => (bloom.hash_count() as u32, bloom.as_bytes().to_vec()), + None => (3, Vec::new()), } } - fn bloom_deserialize(bloom: Bloom) -> Option { - if bloom.bytes.is_empty() { + fn bloom_deserialize(hash_count: u32, bytes: Vec) -> Option { + if bytes.is_empty() { None } else { Some(BloomFilter::new_with( - bloom.hash_count as usize, - bloom.bytes.into_boxed_slice(), + hash_count as usize, + bytes.into_boxed_slice(), )) } } diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 045d0ec..510afd9 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -270,12 +270,12 @@ mod proptests { use futures::TryStreamExt; use libipld::{ multihash::{Code, MultihashDigest}, - Cid, Ipld, IpldCodec, + Ipld, IpldCodec, }; use proptest::strategy::Strategy; use std::collections::BTreeSet; use test_strategy::proptest; - use wnfs_common::{encode, BlockStore, MemoryBlockStore}; + use wnfs_common::{encode, MemoryBlockStore}; fn ipld_dags() -> impl Strategy, Cid)> { arb_ipld_dag(1..256, 0.5, |cids, _| { diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 377d517..fbc8694 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -409,3 +409,6 @@ pub mod pull; pub mod push; pub use error::*; + +pub(crate) mod serde_bloom_bytes; +pub(crate) mod serde_cid_vec; diff --git a/car-mirror/src/messages.rs b/car-mirror/src/messages.rs index 85b1640..d1a2c04 100644 --- a/car-mirror/src/messages.rs +++ b/car-mirror/src/messages.rs @@ -1,5 +1,8 @@ +use std::{collections::TryReserveError, convert::Infallible}; + use libipld_core::cid::Cid; use serde::{Deserialize, Serialize}; +use serde_ipld_dagcbor::{DecodeError, EncodeError}; /// Initial message for pull requests. /// @@ -9,12 +12,17 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PullRequest { /// Requested CID roots - #[serde(rename = "rs")] + #[serde(rename = "rs", with = "crate::serde_cid_vec")] pub resources: Vec, - /// A bloom containing already stored blocks - #[serde(flatten)] - pub bloom: Bloom, + /// Bloom filter hash count + #[serde(rename = "bk")] + pub bloom_hash_count: u32, + + /// Bloom filter Binary + #[serde(rename = "bb")] + #[serde(with = "crate::serde_bloom_bytes")] + pub bloom_bytes: Vec, } /// The response sent after the initial and subsequent push requests. @@ -25,24 +33,17 @@ pub struct PullRequest { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PushResponse { /// Incomplete subgraph roots - #[serde(rename = "sr")] + #[serde(rename = "sr", with = "crate::serde_cid_vec")] pub subgraph_roots: Vec, - /// A bloom containing already stored blocks - #[serde(flatten)] - pub bloom: Bloom, -} - -/// The serialization format for bloom filters in CAR mirror -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Bloom { /// Bloom filter hash count #[serde(rename = "bk")] - pub hash_count: u32, + pub bloom_hash_count: u32, /// Bloom filter Binary #[serde(rename = "bb")] - pub bytes: Vec, + #[serde(with = "crate::serde_bloom_bytes")] + pub bloom_bytes: Vec, } impl PushResponse { @@ -50,6 +51,16 @@ impl PushResponse { pub fn indicates_finished(&self) -> bool { self.subgraph_roots.is_empty() } + + /// Deserialize a push response from dag-cbor bytes + pub fn from_dag_cbor(slice: impl AsRef<[u8]>) -> Result> { + serde_ipld_dagcbor::from_slice(slice.as_ref()) + } + + /// Serialize a push response into dag-cbor bytes + pub fn to_dag_cbor(&self) -> Result, EncodeError> { + serde_ipld_dagcbor::to_vec(self) + } } impl PullRequest { @@ -57,4 +68,122 @@ impl PullRequest { pub fn indicates_finished(&self) -> bool { self.resources.is_empty() } + + /// Deserialize a pull request from dag-cbor bytes + pub fn from_dag_cbor(slice: impl AsRef<[u8]>) -> Result> { + serde_ipld_dagcbor::from_slice(slice.as_ref()) + } + + /// Serialize a pull request into dag-cbor bytes + pub fn to_dag_cbor(&self) -> Result, EncodeError> { + serde_ipld_dagcbor::to_vec(self) + } +} + +#[cfg(test)] +mod test { + use crate::{ + cache::NoCache, + common::{Config, ReceiverState}, + incremental_verification::IncrementalDagVerification, + messages::{PullRequest, PushResponse}, + }; + use anyhow::Result; + use testresult::TestResult; + use wnfs_common::MemoryBlockStore; + use wnfs_unixfs_file::builder::FileBuilder; + + async fn loaded_receiver_state() -> Result { + let store = &MemoryBlockStore::new(); + + let root_cid = FileBuilder::new() + .content_bytes(vec![42; 500_000]) + .build()? + .store(store) + .await?; + + let dag = IncrementalDagVerification::new([root_cid], store, &NoCache).await?; + + Ok(dag.into_receiver_state(Config::default().bloom_fpr)) + } + + async fn partial_receiver_state() -> Result { + let store = &MemoryBlockStore::new(); + let store2 = &MemoryBlockStore::new(); + + let previous_cid = FileBuilder::new() + .content_bytes(vec![42; 500_000]) + .build()? + .store(store) + .await?; + + let root_cid = FileBuilder::new() + .content_bytes(vec![42; 1_000_000]) + .build()? + .store(store2) + .await?; + + let mut dag = IncrementalDagVerification::new([previous_cid], store, &NoCache).await?; + dag.want_cids.insert(root_cid); + dag.update_have_cids(store, &NoCache).await?; + + Ok(dag.into_receiver_state(Config::default().bloom_fpr)) + } + + #[test_log::test(async_std::test)] + async fn test_encoding_format_json_concise() -> TestResult { + let receiver_state = partial_receiver_state().await?; + let pull_request: PullRequest = receiver_state.clone().into(); + let push_response: PushResponse = receiver_state.into(); + + // In this example, if the bloom weren't encoded as base64, it'd blow past the 150 byte limit. + // At the time of writing, these both encode into 97 characters. + assert!(serde_json::to_string(&pull_request)?.len() < 150); + assert!(serde_json::to_string(&push_response)?.len() < 150); + + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_dag_cbor_roundtrip() -> TestResult { + let receiver_state = partial_receiver_state().await?; + let pull_request: PullRequest = receiver_state.clone().into(); + let push_response: PushResponse = receiver_state.into(); + + let pull_back = PullRequest::from_dag_cbor(pull_request.to_dag_cbor()?)?; + let push_back = PushResponse::from_dag_cbor(push_response.to_dag_cbor()?)?; + + assert_eq!(pull_request, pull_back); + assert_eq!(push_response, push_back); + + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_pull_request_have_everything_indicates_finished() -> TestResult { + let pull_request: PullRequest = loaded_receiver_state().await?.into(); + assert!(pull_request.indicates_finished()); + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_push_response_have_everything_indicates_finished() -> TestResult { + let push_response: PushResponse = loaded_receiver_state().await?.into(); + assert!(push_response.indicates_finished()); + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_pull_request_partial_indicates_not_finished() -> TestResult { + let pull_request: PullRequest = partial_receiver_state().await?.into(); + assert!(!pull_request.indicates_finished()); + Ok(()) + } + + #[test_log::test(async_std::test)] + async fn test_push_response_partial_indicates_not_finished() -> TestResult { + let push_response: PushResponse = partial_receiver_state().await?.into(); + assert!(!push_response.indicates_finished()); + Ok(()) + } } diff --git a/car-mirror/src/serde_bloom_bytes.rs b/car-mirror/src/serde_bloom_bytes.rs new file mode 100644 index 0000000..6d9eb08 --- /dev/null +++ b/car-mirror/src/serde_bloom_bytes.rs @@ -0,0 +1,81 @@ +use serde::{de::Visitor, Deserializer, Serializer}; + +pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + struct BytesOrStringVisitor; + + impl Visitor<'_> for BytesOrStringVisitor { + type Value = Vec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("bytes, byte buf or string") + } + + fn visit_borrowed_bytes(self, v: &'_ [u8]) -> Result + where + E: serde::de::Error, + { + Ok(v.to_vec()) + } + + fn visit_byte_buf(self, v: Vec) -> Result + where + E: serde::de::Error, + { + Ok(v) + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(v.to_vec()) + } + + fn visit_borrowed_str(self, v: &'_ str) -> Result + where + E: serde::de::Error, + { + data_encoding::BASE64URL_NOPAD + .decode(v.as_bytes()) + .map_err(serde::de::Error::custom) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + data_encoding::BASE64URL_NOPAD + .decode(v.as_bytes()) + .map_err(serde::de::Error::custom) + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + data_encoding::BASE64URL_NOPAD + .decode(v.as_bytes()) + .map_err(serde::de::Error::custom) + } + } + + deserializer.deserialize_any(BytesOrStringVisitor) +} + +pub(crate) fn serialize(bloom_bytes: &Vec, serializer: S) -> Result +where + S: Serializer, +{ + if serializer.is_human_readable() { + serializer.serialize_str( + data_encoding::BASE64URL_NOPAD + .encode(bloom_bytes.as_ref()) + .as_ref(), + ) + } else { + serializer.serialize_bytes(bloom_bytes.as_ref()) + } +} diff --git a/car-mirror/src/serde_cid_vec.rs b/car-mirror/src/serde_cid_vec.rs new file mode 100644 index 0000000..657a791 --- /dev/null +++ b/car-mirror/src/serde_cid_vec.rs @@ -0,0 +1,28 @@ +use std::str::FromStr; + +use libipld::Cid; +use serde::{ser::SerializeSeq, Deserialize, Deserializer, Serializer}; + +pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let strings: Vec = Vec::::deserialize(deserializer)?; + let cids = strings + .iter() + .map(|s| Cid::from_str(s)) + .collect::, _>>() + .map_err(serde::de::Error::custom)?; + Ok(cids) +} + +pub(crate) fn serialize(cids: &Vec, serializer: S) -> Result +where + S: Serializer, +{ + let mut seq = serializer.serialize_seq(Some(cids.len()))?; + for cid in cids { + seq.serialize_element(&cid.to_string())?; + } + seq.end() +} diff --git a/examples/Cargo.toml b/examples/Cargo.toml deleted file mode 100644 index fa1fa7e..0000000 --- a/examples/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "examples" -version = "0.1.0" -publish = false -edition = "2021" -authors = ["Philipp Krüger "] - -[dev-dependencies] -car-mirror = { path = "../car-mirror", version = "0.1" } - -[[example]] -name = "counterparts" -path = "counterparts.rs" diff --git a/examples/counterparts.rs b/examples/counterparts.rs deleted file mode 100644 index 7622550..0000000 --- a/examples/counterparts.rs +++ /dev/null @@ -1,6 +0,0 @@ -use std::error::Error; - -pub fn main() -> Result<(), Box> { - println!("Alien Shore!"); - Ok(()) -}