From 65454ee91cb70dabc748e6b40bc087a36f54afe4 Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Mon, 11 Mar 2024 08:10:52 -0700 Subject: [PATCH] Fix issues with Request.clone in http types (#440) * Bump wasm-streams * Use Body::into_readable_stream * fmt * clippy * Remove unused js hack * clippy * Added remaining routes and fixed testes (#473) * Added test script to package.json. * Added worker-sandbox r2 routes. * Added worker-build to the npm test script. * Added worker-sandbox r2 routes - cleanup. * Added worker-sandbox queue routes. * Added worker-sandbox service-bindings routes - implied porting and adapting some improvements from the latest version. * Added worker-sandbox service-bindings routes - implied porting and adapting some improvements from the latest version - fmt. * Added worker-sandbox cache stream route - Implied porting some more recent stuff. * Added worker-sandbox remaining cache routes. * Added worker-sandbox init called route. * Added worker-sandbox custom-response-body route. * Added worker-sandbox now route. * Added worker-sandbox redirect-307 route. * Added worker-sandbox redirect-default route. * Added worker-sandbox subrequests routes and fetch helpers that were requeried. * Added worker-sandbox subrequests fetch-timeout to be shorter. * Added worker-sandbox request catchall fixed test case. * Added worker-sandbox request xor route. * Added worker-sandbox request xor route. * Added worker-sandbox websocket route. * worker-sandbox queue routes clean up. * worker unused dependencies clean up. * worker-sandbox: Added missing d1 routes and improved d1 tests to use miniflare. * worker-sandbox: Clippy. * worker-sandbox: fmt. --------- Co-authored-by: Alexandre Faria --- Cargo.lock | 63 +-- Cargo.toml | 3 + package.json | 2 +- worker-macros/Cargo.toml | 2 +- worker-sandbox/Cargo.toml | 9 +- worker-sandbox/src/d1.rs | 18 +- worker-sandbox/src/lib.rs | 373 +++++++++++++++++- worker-sandbox/tests/d1.spec.ts | 19 +- worker-sandbox/tests/mf.ts | 1 + worker-sandbox/tests/request.spec.ts | 2 +- worker-sandbox/tests/subrequest.spec.ts | 2 +- worker-sys/Cargo.toml | 2 +- .../types/incoming_request_cf_properties.rs | 3 + worker/Cargo.toml | 4 +- worker/js/hacks.js | 17 - worker/src/body.rs | 1 + worker/src/body/body.rs | 146 ++++--- worker/src/cf.rs | 29 +- worker/src/durable.rs | 5 +- worker/src/env.rs | 2 +- worker/src/fetch.rs | 43 +- worker/src/fetcher.rs | 24 +- worker/src/headers.rs | 183 +++++++++ worker/src/http/request.rs | 25 +- worker/src/http/response.rs | 20 +- worker/src/lib.rs | 5 + worker/src/queue.rs | 2 +- .../src/{cf/properties.rs => request_init.rs} | 136 ++++++- 28 files changed, 947 insertions(+), 194 deletions(-) delete mode 100644 worker/js/hacks.js create mode 100644 worker/src/headers.rs rename worker/src/{cf/properties.rs => request_init.rs} (68%) diff --git a/Cargo.lock b/Cargo.lock index 2782755e..9016602a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -638,9 +638,9 @@ checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" [[package]] name = "js-sys" -version = "0.3.63" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" +checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" dependencies = [ "wasm-bindgen", ] @@ -1641,9 +1641,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" +checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1651,9 +1651,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb" +checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" dependencies = [ "bumpalo", "log", @@ -1666,9 +1666,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-cli-support" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8315d6503415e5d44ff64f1ba34aefd8264c561df17e0f1c8eb8c96bde79c45e" +checksum = "d21c60239a09bf9bab8dfa752be4e6c637db22296b9ded493800090448692da9" dependencies = [ "anyhow", "base64 0.9.3", @@ -1688,9 +1688,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-externref-xform" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4522bf3be16c6274c87a5a2c5d2a62efa80253b025f8e813f9682d0d6a8a8fca" +checksum = "bafbe1984f67cc12645f12ab65e6145e8ddce1ab265d0be58435f25bb0ce2608" dependencies = [ "anyhow", "walrus", @@ -1698,9 +1698,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.36" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" dependencies = [ "cfg-if", "js-sys", @@ -1710,9 +1710,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258" +checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1720,9 +1720,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" +checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", @@ -1733,9 +1733,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-multi-value-xform" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "113256596776ebb4b243512d3711e73d5475eaeff373e1ae65427c66e5aa2073" +checksum = "581419e3995571a1d2d066e360ca1c0c09da097f5a53c98e6f00d96eddaf0ffe" dependencies = [ "anyhow", "walrus", @@ -1743,9 +1743,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "wasm-bindgen-test" @@ -1773,9 +1773,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-threads-xform" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89106aaf83a2b80464fc8f60a074a4575135b73a491e174f35bbeae6ff0d7ec6" +checksum = "e05d272073981137e8426cf2a6830d43d1f84f988a050b2f8b210f0e266b8983" dependencies = [ "anyhow", "walrus", @@ -1784,9 +1784,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-wasm-conventions" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84e5ad27a7930400994cb40823d3d4a7ef235fac52d0c75ebd61fa40eba994a8" +checksum = "0e9c65b1ff5041ea824ca24c519948aec16fb6611c617d601623c0657dfcd47b" dependencies = [ "anyhow", "walrus", @@ -1794,9 +1794,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-wasm-interpreter" -version = "0.2.86" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e69500063b7b20f3e9422d78c2b381dd192c7c4ebaef34d205332877cd78e0d3" +checksum = "7c5c796220738ab5d44666f37205728a74141c0039d1166bcf8110b26bafaa1e" dependencies = [ "anyhow", "log", @@ -1819,9 +1819,9 @@ dependencies = [ [[package]] name = "wasm-streams" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" dependencies = [ "futures-util", "js-sys", @@ -1838,9 +1838,9 @@ checksum = "5fe3d5405e9ea6c1317a656d6e0820912d8b7b3607823a7596117c8f666daf6f" [[package]] name = "web-sys" -version = "0.3.63" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" dependencies = [ "js-sys", "wasm-bindgen", @@ -2040,7 +2040,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams 0.3.0", + "wasm-streams 0.4.0", "web-sys", "worker-kv", "worker-macros", @@ -2093,6 +2093,7 @@ name = "worker-sandbox" version = "0.1.0" dependencies = [ "blake2", + "bytes", "cfg-if", "chrono", "console_error_panic_hook", diff --git a/Cargo.toml b/Cargo.toml index d9f9e192..85b9c79d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,3 +30,6 @@ lto = true [profile.release.package."*"] codegen-units = 1 opt-level = "z" + +[workspace.dependencies] +wasm-bindgen = "=0.2.87" \ No newline at end of file diff --git a/package.json b/package.json index 35d45efb..97bfc9f3 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,6 @@ "vitest": "^0.32.0" }, "scripts": { - "test": "" + "test": "cd worker-sandbox && worker-build --dev && vitest" } } diff --git a/worker-macros/Cargo.toml b/worker-macros/Cargo.toml index f0aeca2d..beb9e99f 100644 --- a/worker-macros/Cargo.toml +++ b/worker-macros/Cargo.toml @@ -17,7 +17,7 @@ worker-sys = { path = "../worker-sys", version = "0.0.9" } syn = "2.0.17" proc-macro2 = "1.0.60" quote = "1.0.28" -wasm-bindgen = "=0.2.86" +wasm-bindgen = {workspace=true} wasm-bindgen-futures = "0.4.36" wasm-bindgen-macro-support = "0.2.86" diff --git a/worker-sandbox/Cargo.toml b/worker-sandbox/Cargo.toml index e2815c61..6fa76ba8 100644 --- a/worker-sandbox/Cargo.toml +++ b/worker-sandbox/Cargo.toml @@ -14,6 +14,7 @@ default = ["console_error_panic_hook"] [dependencies] blake2 = "0.10.6" +bytes = "1.4.0" chrono = { version = "0.4.26", default-features = false, features = [ "wasmbind", "clock", @@ -26,11 +27,11 @@ http = "0.2.9" regex = "1.8.4" serde = { version = "1.0.164", features = ["derive"] } serde_json = "1.0.96" -worker = { path = "../worker", version = "0.0.17", features= ["queue", "d1"] } +worker = { path = "../worker", version = "0.0.17", features = ["queue", "d1"] } futures-channel = "0.3.28" futures-util = { version = "0.3.28", default-features = false } rand = "0.8.5" -uuid = {version = "1.3.3", features = ["v4", "serde"]} +uuid = { version = "1.3.3", features = ["v4", "serde"] } serde-wasm-bindgen = "0.5.0" md5 = "0.7.0" router-service = { git = "https://github.com/zebp/router-service", version = "0.1.0" } @@ -38,7 +39,9 @@ tower = "0.4.13" [dev-dependencies] futures-channel = { version = "0.3.28", features = ["sink"] } -futures-util = { version = "0.3.28", default-features = false, features = ["sink"] } +futures-util = { version = "0.3.28", default-features = false, features = [ + "sink", +] } reqwest = { version = "0.11.18", features = [ "blocking", "json", diff --git a/worker-sandbox/src/d1.rs b/worker-sandbox/src/d1.rs index 96f992cf..58268777 100644 --- a/worker-sandbox/src/d1.rs +++ b/worker-sandbox/src/d1.rs @@ -8,10 +8,7 @@ struct Person { age: u32, } -pub async fn prepared_statement( - _req: http::Request, - env: Env, -) -> Result> { +pub async fn prepared_statement(env: &Env) -> Result> { let db = env.d1("DB")?; let stmt = worker::query!(&db, "SELECT * FROM people WHERE name = ?", "Ryan Upton")?; @@ -47,7 +44,7 @@ pub async fn prepared_statement( Ok(http::Response::new("ok".into())) } -pub async fn batch(_req: http::Request, env: Env) -> Result> { +pub async fn batch(env: &Env) -> Result> { let db = env.d1("DB")?; let mut results = db .batch(vec![ @@ -71,7 +68,7 @@ pub async fn batch(_req: http::Request, env: Env) -> Result, env: Env) -> Result> { +pub async fn exec(req: http::Request, env: &Env) -> Result> { let db = env.d1("DB")?; let result = db @@ -84,13 +81,13 @@ pub async fn exec(req: http::Request, env: Env) -> Result, env: Env) -> Result> { +pub async fn dump(env: &Env) -> Result> { let db = env.d1("DB")?; let bytes = db.dump().await?; Ok(http::Response::new(bytes.into())) } -pub async fn error(_req: http::Request, env: Env) -> Result> { +pub async fn error(env: &Env) -> Result> { let db = env.d1("DB")?; let error = db .exec("THIS IS NOT VALID SQL") @@ -98,7 +95,10 @@ pub async fn error(_req: http::Request, env: Env) -> Result { + if let Some(text) = msg.text() { + server.send_with_str(text).expect("could not relay text"); + } + } + WebsocketEvent::Close(_) => { + // Sets a key in a test KV so the integration tests can query if we + // actually got the close event. We can't use the shared dat a for this + // because miniflare resets that every request. + some_namespace_kv + .put("got-close-event", "true") + .unwrap() + .execute() + .await + .unwrap(); + } + } + } + }); + + let mut response = Response::builder() + .status(101) + .body(Body::empty()) + .unwrap(); + + response.extensions_mut().insert(pair.client); + + Ok(response) + }) + .post("/xor/:num", |req, ctx| async move { + let num: u8 = match ctx.param("num").unwrap().parse() { + Ok(num) => num, + Err(_) => return Response::builder() + .status(400) + .body("invalid byte".into()) + .map_err(|e| Error::RustError(e.to_string())) + }; + + let xor_stream = req.into_body().into_stream().map_ok(move |buf| { + let mut vec = buf.to_vec(); + vec.iter_mut().for_each(|x| *x ^= num); + Bytes::from(vec) + }); + + let body = worker::body::Body::from_stream(xor_stream)?; + let resp = Response::builder() + .body(body) + .unwrap(); + Ok(resp) + }) + .get("/request-init-fetch", |_, _| async move { + let init = RequestInit::new(); + let req = http::Request::post("https://cloudflare.com").body(()).unwrap(); + fetch_with_init(req, &init).await + }) + .get("/request-init-fetch-post", |_, _| async move { + let mut init = RequestInit::new(); + init.method = Method::POST; + + let req = http::Request::post("https://httpbin.org/post").body(()).unwrap(); + fetch_with_init(req, &init).await + }) + .get("/cancelled-fetch", |_, _| async move { + let controller = AbortController::default(); + let signal = controller.signal(); + + let (tx, rx) = futures_channel::oneshot::channel(); + + // Spawns a future that'll make our fetch request and not block this function. + wasm_bindgen_futures::spawn_local({ + async move { + let req = http::Request::post("https://cloudflare.com").body(()).unwrap(); + let resp = fetch_with_signal(req, &signal).await; + tx.send(resp).unwrap(); + } + }); + + // And then we try to abort that fetch as soon as we start it, hopefully before + // cloudflare.com responds. + controller.abort(); + + let res = rx.await.unwrap(); + let res = res.unwrap_or_else(|err| { + let text = err.to_string(); + Response::new(text.into()) + }); + + Ok(res) + }) + .get("/fetch-timeout", |_, _| async move { + let controller = AbortController::default(); + let signal = controller.signal(); + + let fetch_fut = async { + let req = http::Request::post("https://miniflare.mocks/delay").body(()).unwrap(); + let resp = fetch_with_signal(req, &signal).await?; + let text = resp.into_body().text().await?; + Ok::(text) + }; + let delay_fut = async { + Delay::from(Duration::from_millis(1)).await; + controller.abort(); + Ok(Response::new("Cancelled".into())) + }; + + futures_util::pin_mut!(fetch_fut); + futures_util::pin_mut!(delay_fut); + + match futures_util::future::select(delay_fut, fetch_fut).await { + Either::Left((res, cancelled_fut)) => { + // Ensure that the cancelled future returns an AbortError. + match cancelled_fut.await { + Err(e) if e.to_string().contains("AbortError") => { /* Yay! It worked, let's do nothing to celebrate */}, + Err(e) => panic!("Fetch errored with a different error than expected: {:#?}", e), + Ok(text) => panic!("Fetch unexpectedly succeeded: {}", text) + } + + res + }, + Either::Right(_) => panic!("Delay future should have resolved first"), + } + }) + .get("/redirect-default", |_, _| async move { + Ok(Response::builder() + .status(302) + .header("Location", "https://example.com/") + .body(Body::empty()).unwrap()) + }) + .get("/redirect-307", |_, _| async move { + Ok(Response::builder() + .status(307) + .header("Location", "https://example.com/") + .body(Body::empty()).unwrap()) + }) + .get("/now", |_, _| async move { + let now = chrono::Utc::now(); + let js_date: Date = now.into(); + Ok(Response::new(js_date.to_string().into())) + }) + .get("/custom-response-body", |_, _| async move { + Ok(Response::new(vec![b'h', b'e', b'l', b'l', b'o'].into())) + }) + .get("/init-called", |_, _| async move { + let init_called = GLOBAL_STATE.load(Ordering::SeqCst); + Ok(Response::new(init_called.to_string().into())) + }) + .get("/cache-example", |req, _| async move { + //console_log!("url: {}", req.uri().to_string()); + let cache = Cache::default(); + let key = req.uri().to_string(); + if let Some(resp) = cache.get(&key, true).await? { + //console_log!("Cache HIT!"); + Ok(resp) + } else { + //console_log!("Cache MISS!"); + + let mut resp = Response::builder() + .header("content-type", "application/json") + // Cache API respects Cache-Control headers. Setting s-max-age to 10 + // will limit the response to be in cache for 10 seconds max + .header("cache-control", "s-maxage=10") + .body(serde_json::json!({ "timestamp": Date::now().as_millis() }).to_string().into()) + .map_err(|e| Error::RustError(e.to_string())) + .unwrap(); + + cache.put(key, resp.clone()).await?; + Ok(resp) + } + }) + .get("/cache-api/get/:key", |_req, ctx| async move { + if let Some(key) = ctx.param("key") { + let cache = Cache::default(); + if let Some(resp) = cache.get(format!("https://{key}"), true).await? { + return Ok(resp); + } else { + return Ok(Response::new("cache miss".into())); + } + } + + Response::builder() + .status(400) + .body("key missing".into()) + .map_err(|e| Error::RustError(e.to_string())) + }) + .put("/cache-api/put/:key", |_req, ctx| async move { + if let Some(key) = ctx.param("key") { + let cache = Cache::default(); + + let mut resp = Response::builder() + .header("content-type", "application/json") + // Cache API respects Cache-Control headers. Setting s-max-age to 10 + // will limit the response to be in cache for 10 seconds max + .header("cache-control", "s-maxage=10") + .body(serde_json::json!({ "timestamp": Date::now().as_millis() }).to_string().into()) + .map_err(|e| Error::RustError(e.to_string())) + .unwrap(); + + cache.put(format!("https://{key}"), resp.clone()).await?; + return Ok(resp); + } + + Response::builder() + .status(400) + .body("key missing".into()) + .map_err(|e| Error::RustError(e.to_string())) + }) + .post("/cache-api/delete/:key", |_req, ctx| async move { + if let Some(key) = ctx.param("key") { + let cache = Cache::default(); + + let res = cache.delete(format!("https://{key}"), true).await?; + return Ok(Response::new(serde_json::to_string(&res)?.into())); + } + + Response::builder() + .status(400) + .body("key missing".into()) + .map_err(|e| Error::RustError(e.to_string())) + }) + .get("/cache-stream", |req, _| async move { + //console_log!("url: {}", req.uri().to_string()); + let cache = Cache::default(); + let key = req.uri().to_string(); + if let Some(resp) = cache.get(&key, true).await? { + //console_log!("Cache HIT!"); + Ok(resp) + } else { + //console_log!("Cache MISS!"); + let mut rng = rand::thread_rng(); + let count = rng.gen_range(0..10); + let stream = futures_util::stream::repeat("Hello, world!\n") + .take(count) + .then(|text| async move { + Delay::from(Duration::from_millis(50)).await; + Result::Ok(text.as_bytes().to_vec()) + }); + + let body = worker::body::Body::from_stream(stream)?; + + //console_log!("resp = {:?}", resp); + + let mut resp = Response::builder() + // Cache API respects Cache-Control headers. Setting s-max-age to 10 + // will limit the response to be in cache for 10 seconds max + .header("cache-control", "s-maxage=10") + .body(body) + .unwrap(); + + cache.put(key, resp.clone()).await?; + Ok(resp) + } + }) + .get("/remote-by-request", |req, ctx| async move { + let fetcher = ctx.data.service("remote")?; + fetcher.fetch_request(req).await + }) + .get("/remote-by-path", |req, ctx| async move { + let fetcher = ctx.data.service("remote")?; + let mut init = RequestInit::new(); + init.with_method(Method::POST); + + fetcher.fetch(req.uri().to_string(), Some(init)).await + }) + .post("/queue/send/:id", |_req, ctx| async move { + let id = match ctx.param("id").map(|id| Uuid::try_parse(id).ok()).and_then(|u|u) { + Some(id) => id, + None => { + return Response::builder() + .status(400) + .body("error".into()) + .map_err(|_| Error::RustError("Failed to parse id, expected a UUID".into())); + } + }; + let my_queue = match ctx.data.queue("my_queue") { + Ok(queue) => queue, + Err(err) => { + return Response::builder() + .status(500) + .body(format!("Failed to get queue: {err:?}").into()) + .map_err(|e| Error::RustError(e.to_string())); + } + }; + match my_queue.send(&QueueBody { + id: id.to_string(), + }).await { + Ok(_) => { + Ok(Response::new("Message sent".into())) + } + Err(err) => { + Response::builder() + .status(500) + .body(format!("Failed to send message to queue: {err:?}").into()) + .map_err(|e| Error::RustError(e.to_string())) + } + } + }) + .get("/queue", |_req, _ctx| async move { + let guard = GLOBAL_QUEUE_STATE.lock().unwrap(); + let messages: Vec = guard.clone(); + let json = serde_json::to_string(&messages).unwrap(); + Ok(Response::new(Body::from(json))) + }) + .get("/d1/prepared", |_, ctx| async move { + d1::prepared_statement(ctx.data.as_ref()).await + }) + .get("/d1/batch", |_, ctx| async move { + d1::batch(ctx.data.as_ref()).await + }) + .get("/d1/dump", |_, ctx| async move { + d1::dump(ctx.data.as_ref()).await + }) + .post("/d1/exec", |req, ctx| async move { + d1::exec(req, ctx.data.as_ref()).await + }) + .get("/d1/error", |_, ctx| async move { + d1::error(ctx.data.as_ref()).await + }) + .get("/r2/list-empty", |_, ctx| async move { + r2::list_empty(ctx.data.as_ref()).await + }) + .get("/r2/list", |_, ctx| async move { + r2::list(ctx.data.as_ref()).await + }) + .get("/r2/get-empty", |_, ctx| async move { + r2::get_empty(ctx.data.as_ref()).await + }) + .get("/r2/get", |_, ctx| async move { + r2::get(ctx.data.as_ref()).await + }) + .put("/r2/put", |_, ctx| async move { + r2::put(ctx.data.as_ref()).await + }) + .put("/r2/put-properties", |_, ctx| async move { + r2::put_properties(ctx.data.as_ref()).await + }) + .put("/r2/put-multipart", |_, ctx| async move { + r2::put_multipart(ctx.data.as_ref()).await + }) + .delete("/r2/delete", |_, ctx| async move { + r2::delete(ctx.data.as_ref()).await + }) .any("/*catchall", |_, ctx| async move { Ok(Response::builder() .status(404) @@ -407,12 +770,12 @@ pub struct QueueBody { pub async fn queue(message_batch: MessageBatch, _env: Env, _ctx: Context) -> Result<()> { let mut guard = GLOBAL_QUEUE_STATE.lock().unwrap(); for message in message_batch.messages()? { - console_log!( + /*console_log!( "Received queue message {:?}, with id {} and timestamp: {}", message.body, message.id, message.timestamp.to_string() - ); + );*/ guard.push(message.body); } Ok(()) diff --git a/worker-sandbox/tests/d1.spec.ts b/worker-sandbox/tests/d1.spec.ts index 5e426a52..1a7849d1 100644 --- a/worker-sandbox/tests/d1.spec.ts +++ b/worker-sandbox/tests/d1.spec.ts @@ -1,11 +1,8 @@ import { describe, test, expect, beforeAll } from "vitest"; - -const hasLocalDevServer = await fetch("http://localhost:8787/request") - .then((resp) => resp.ok) - .catch(() => false); +import { mf } from "./mf"; async function exec(query: string): Promise { - const resp = await fetch("http://localhost:8787/d1/exec", { + const resp = await mf.dispatchFetch("https://fake.host/d1/exec", { method: "POST", body: query.split("\n").join(""), }); @@ -15,7 +12,7 @@ async function exec(query: string): Promise { return Number(body); } -describe.skipIf(!hasLocalDevServer)("d1", () => { +describe("d1", () => { test("create table", async () => { const query = `CREATE TABLE IF NOT EXISTS uniqueTable ( id INTEGER PRIMARY KEY, @@ -49,22 +46,22 @@ describe.skipIf(!hasLocalDevServer)("d1", () => { }); test("prepared statement", async () => { - const resp = await fetch("http://localhost:8787/d1/prepared"); + const resp = await mf.dispatchFetch("https://fake.host/d1/prepared"); expect(resp.status).toBe(200); }); test("batch", async () => { - const resp = await fetch("http://localhost:8787/d1/batch"); + const resp = await mf.dispatchFetch("https://fake.host/d1/batch"); expect(resp.status).toBe(200); }); test("dump", async () => { - const resp = await fetch("http://localhost:8787/d1/dump"); + const resp = await mf.dispatchFetch("https://fake.host/d1/dump"); expect(resp.status).toBe(200); }); - test("dump", async () => { - const resp = await fetch("http://localhost:8787/d1/error"); + test("error", async () => { + const resp = await mf.dispatchFetch("https://fake.host/d1/error"); expect(resp.status).toBe(200); }); }); diff --git a/worker-sandbox/tests/mf.ts b/worker-sandbox/tests/mf.ts index a2560212..0c6bc64d 100644 --- a/worker-sandbox/tests/mf.ts +++ b/worker-sandbox/tests/mf.ts @@ -6,6 +6,7 @@ export const mf = new Miniflare({ cache: true, cachePersist: false, d1Persist: false, + d1Databases: ["DB"], kvPersist: false, r2Persist: false, modules: true, diff --git a/worker-sandbox/tests/request.spec.ts b/worker-sandbox/tests/request.spec.ts index 56a24a0c..96f85b62 100644 --- a/worker-sandbox/tests/request.spec.ts +++ b/worker-sandbox/tests/request.spec.ts @@ -118,7 +118,7 @@ test("catchall", async () => { method: "OPTIONS", }); - expect(await resp.text()).toBe("/hello-world"); + expect(await resp.text()).toBe("hello-world"); }); test("redirect default", async () => { diff --git a/worker-sandbox/tests/subrequest.spec.ts b/worker-sandbox/tests/subrequest.spec.ts index 83e1ec2c..ffe5aee2 100644 --- a/worker-sandbox/tests/subrequest.spec.ts +++ b/worker-sandbox/tests/subrequest.spec.ts @@ -17,7 +17,7 @@ describe("subrequest", () => { expect(await resp.text()).toBe("Cancelled"); }); - test.skip("request init fetch post", async () => { + test("request init fetch post", async () => { const resp = await mf.dispatchFetch( "https://fake.host/request-init-fetch-post" ); diff --git a/worker-sys/Cargo.toml b/worker-sys/Cargo.toml index 248ced76..fb897475 100644 --- a/worker-sys/Cargo.toml +++ b/worker-sys/Cargo.toml @@ -10,7 +10,7 @@ description = "Low-level extern definitions / FFI bindings to the Cloudflare Wor [dependencies] cfg-if = "1.0.0" js-sys = "0.3.63" -wasm-bindgen = "=0.2.86" +wasm-bindgen = {workspace=true} [dependencies.web-sys] version = "0.3.63" diff --git a/worker-sys/src/types/incoming_request_cf_properties.rs b/worker-sys/src/types/incoming_request_cf_properties.rs index 7e745422..057492db 100644 --- a/worker-sys/src/types/incoming_request_cf_properties.rs +++ b/worker-sys/src/types/incoming_request_cf_properties.rs @@ -14,6 +14,9 @@ extern "C" { #[wasm_bindgen(method, getter)] pub fn asn(this: &IncomingRequestCfProperties) -> u32; + #[wasm_bindgen(method, getter, js_name=asOrganization)] + pub fn as_organization(this: &IncomingRequestCfProperties) -> String; + #[wasm_bindgen(method, getter)] pub fn country(this: &IncomingRequestCfProperties) -> Option; diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 445d6fb7..1f9b2c40 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -25,10 +25,10 @@ serde = { version = "1.0.164", features = ["derive"] } serde_json = "1.0.96" tokio = { version = "1.28", default-features = false } url = "2.4.0" -wasm-bindgen = "=0.2.86" +wasm-bindgen = {workspace=true} wasm-bindgen-futures = "0.4.36" serde-wasm-bindgen = "0.5.0" -wasm-streams = "0.3.0" +wasm-streams = "0.4.0" worker-kv = "0.6.0" worker-macros = { path = "../worker-macros", version = "0.0.9" } worker-sys = { path = "../worker-sys", version = "0.0.9" } diff --git a/worker/js/hacks.js b/worker/js/hacks.js deleted file mode 100644 index d40cd7ef..00000000 --- a/worker/js/hacks.js +++ /dev/null @@ -1,17 +0,0 @@ -/** - * @param {Request | Response} r - * @returns {Promise} - */ -export async function collectBytes(r) { - return await new Response( - r.body.pipeThrough( - new TransformStream({ - transform(chunk, controller) { - console.log("TRANSFORM", chunk); - const cloned = new Uint8Array(chunk); - controller.enqueue(cloned); - }, - }) - ) - ).arrayBuffer(); -} diff --git a/worker/src/body.rs b/worker/src/body.rs index 7b78db84..ace962eb 100644 --- a/worker/src/body.rs +++ b/worker/src/body.rs @@ -7,6 +7,7 @@ mod wasm; pub use body::Body; pub(crate) use body::BodyInner; +pub(crate) use body::BoxBodyReader; pub use http_body::Body as HttpBody; pub use to_bytes::to_bytes; diff --git a/worker/src/body/body.rs b/worker/src/body/body.rs index c74ed63e..2b0c45ef 100644 --- a/worker/src/body/body.rs +++ b/worker/src/body/body.rs @@ -3,28 +3,18 @@ use std::{ task::{Context, Poll}, }; -use bytes::Bytes; -use futures_util::Stream; -use http::HeaderMap; -use js_sys::{ArrayBuffer, Promise, Uint8Array}; -use serde::de::DeserializeOwned; -use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue}; - use crate::{ body::{wasm::WasmStreamBody, HttpBody}, futures::SendJsFuture, Error, }; - -// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream -// because the array buffer backing them gets detached. -#[wasm_bindgen(module = "/js/hacks.js")] -extern "C" { - #[wasm_bindgen(js_name = "collectBytes", catch)] - fn collect_bytes( - stream: &JsValue, - ) -> Result; -} +use bytes::Bytes; +use futures_util::{AsyncRead, Stream, TryStream, TryStreamExt}; +use http::HeaderMap; +use js_sys::Uint8Array; +use serde::de::DeserializeOwned; +use wasm_bindgen::{JsCast, JsValue}; +use web_sys::ReadableStream; type BoxBody = http_body::combinators::UnsyncBoxBody; @@ -116,25 +106,6 @@ impl Body { // performance as there's no polling overhead. match self.0 { BodyInner::Regular(body) => super::to_bytes(body).await, - /* - Working but clones all body - BodyInner::Request(req) if req.body().is_some() => { - // let body = req.body().unwrap(); - let promise = collect_bytes(&req.unchecked_into())?; - let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into(); - let bytes = Uint8Array::new(&bytes); - - Ok(bytes.to_vec().into()) - } - BodyInner::Response(res) if res.body().is_some() => { - let promise = collect_bytes(&res.unchecked_into())?; - let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into(); - let bytes = Uint8Array::new(&bytes); - - Ok(bytes.to_vec().into()) - } - */ - // Failing BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await), BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await), _ => Ok(Bytes::new()), @@ -184,19 +155,14 @@ impl Body { .and_then(|buf| serde_json::from_slice(&buf).map_err(Error::SerdeJsonError)) } - pub(crate) fn is_none(&self) -> bool { - match &self.0 { - BodyInner::None => true, - BodyInner::Regular(_) => false, - BodyInner::Request(req) => req.body().is_none(), - BodyInner::Response(res) => res.body().is_none(), - } - } - pub(crate) fn inner(&self) -> &BodyInner { &self.0 } + pub(crate) fn into_inner(self) -> BodyInner { + self.0 + } + /// Turns the body into a regular streaming body, if it's not already, and returns the underlying body. fn as_inner_box_body(&mut self) -> Option<&mut BoxBody> { match &self.0 { @@ -211,6 +177,46 @@ impl Body { _ => unreachable!(), } } + + pub(crate) fn into_readable_stream(self) -> Option { + match self.into_inner() { + crate::body::BodyInner::Request(req) => req.body(), + crate::body::BodyInner::Response(res) => res.body(), + crate::body::BodyInner::Regular(s) => Some( + wasm_streams::ReadableStream::from_async_read( + crate::body::BoxBodyReader::new(s), + 1024, + ) + .into_raw(), + ), + crate::body::BodyInner::None => None, + } + } + + /// Create a `Body` using a [`Stream`](futures::stream::Stream) + pub fn from_stream(stream: S) -> Result + where + S: TryStream + 'static, + S::Ok: Into>, + S::Error: Into, + { + let js_stream = stream + .map_ok(|item| -> Vec { item.into() }) + .map_ok(|chunk| { + let array = Uint8Array::new_with_length(chunk.len() as _); + array.copy_from(&chunk); + + array.into() + }) + .map_err(|err| -> crate::Error { err.into() }) + .map_err(|e| JsValue::from(e.to_string())); + + let stream = wasm_streams::ReadableStream::from_stream(js_stream); + let stream: ReadableStream = stream.into_raw().dyn_into().unwrap(); + + let edge_res = web_sys::Response::new_with_opt_readable_stream(Some(&stream))?; + Ok(Self::from(edge_res)) + } } impl Default for Body { @@ -313,6 +319,56 @@ impl HttpBody for Body { } } +pub struct BoxBodyReader { + inner: BoxBody, + store: Vec, +} + +impl BoxBodyReader { + pub fn new(inner: BoxBody) -> Self { + BoxBodyReader { + inner, + store: Vec::new(), + } + } +} + +impl AsyncRead for BoxBodyReader { + // Required method + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if !self.store.is_empty() { + let size = self.store.len().min(buf.len()); + buf[..size].clone_from_slice(&self.store[..size]); + self.store = self.store.split_off(size); + Poll::Ready(Ok(size)) + } else { + match Pin::new(&mut self.inner).poll_data(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(opt) => match opt { + Some(result) => match result { + Ok(data) => { + use bytes::Buf; + self.store.extend_from_slice(data.chunk()); + let size = self.store.len().min(buf.len()); + buf[..size].clone_from_slice(&self.store[..size]); + self.store = self.store.split_off(size); + Poll::Ready(Ok(size)) + } + Err(e) => { + Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))) + } + }, + None => Poll::Ready(Ok(0)), // Not sure about this + }, + } + } + } +} + impl Stream for Body { type Item = Result; diff --git a/worker/src/cf.rs b/worker/src/cf.rs index 41fe5a04..8de0054e 100644 --- a/worker/src/cf.rs +++ b/worker/src/cf.rs @@ -1,7 +1,3 @@ -mod properties; - -pub use properties::{CfProperties, MinifyConfig, PolishConfig}; - /// In addition to the methods on the `Request` struct, the `Cf` struct on an inbound Request contains information about the request provided by Cloudflare’s edge. /// /// [Details](https://developers.cloudflare.com/workers/runtime-apis/request#incomingrequestcfproperties) @@ -18,6 +14,10 @@ impl Cf { Self { inner } } + pub fn inner(&self) -> &worker_sys::IncomingRequestCfProperties { + &self.inner + } + /// The three-letter airport code (e.g. `ATX`, `LUX`) representing /// the colocation which processed the request pub fn colo(&self) -> String { @@ -29,6 +29,11 @@ impl Cf { self.inner.asn() } + /// The Autonomous System organization name of the request, e.g. `Cloudflare, Inc.` + pub fn as_organization(&self) -> String { + self.inner.as_organization() + } + /// The two-letter country code of origin for the request. /// This is the same value as that provided in the CF-IPCountry header, e.g. `"US"` pub fn country(&self) -> Option { @@ -89,9 +94,7 @@ impl Cf { /// Information about the client's authorization. /// Only set when using Cloudflare Access or API Shield. pub fn tls_client_auth(&self) -> Option { - self.inner - .tls_client_auth() - .map(|inner| TlsClientAuth { inner }) + self.inner.tls_client_auth().map(Into::into) } /// The TLS version of the connection to Cloudflare, e.g. TLSv1.3. @@ -178,6 +181,12 @@ pub struct RequestPriority { pub group_weight: usize, } +impl From for Cf { + fn from(inner: worker_sys::IncomingRequestCfProperties) -> Self { + Self { inner } + } +} + /// Only set when using Cloudflare Access or API Shield #[derive(Debug)] pub struct TlsClientAuth { @@ -236,3 +245,9 @@ impl TlsClientAuth { self.inner.cert_subject_dn_rfc2253() } } + +impl From for TlsClientAuth { + fn from(inner: worker_sys::TlsClientAuth) -> Self { + Self { inner } + } +} diff --git a/worker/src/durable.rs b/worker/src/durable.rs index 1c628617..cf119ca4 100644 --- a/worker/src/durable.rs +++ b/worker/src/durable.rs @@ -27,13 +27,13 @@ use chrono::{DateTime, Utc}; use futures_util::Future; use js_sys::{Map, Number, Object}; use serde::{de::DeserializeOwned, Serialize}; -use wasm_bindgen::{prelude::*, JsCast}; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::future_to_promise; use worker_sys::{ DurableObject as EdgeDurableObject, DurableObjectId, DurableObjectNamespace as EdgeObjectNamespace, DurableObjectState, DurableObjectStorage, DurableObjectTransaction, }; -use wasm_bindgen_futures::future_to_promise; /// A Durable Object stub is a client object used to send requests to a remote Durable Object. pub struct Stub { @@ -759,6 +759,7 @@ impl DurableObject for Chatroom { pub trait DurableObject { fn new(state: State, env: Env) -> Self; async fn fetch(&mut self, req: http::Request) -> Result>; + #[allow(clippy::diverging_sub_expression)] async fn alarm(&mut self) -> Result> { unimplemented!("alarm() handler not implemented") } diff --git a/worker/src/env.rs b/worker/src/env.rs index 57607f81..3a67ead8 100644 --- a/worker/src/env.rs +++ b/worker/src/env.rs @@ -6,7 +6,7 @@ use crate::Queue; use crate::{durable::ObjectNamespace, Bucket, DynamicDispatcher, Fetcher, Result}; use js_sys::Object; -use wasm_bindgen::{prelude::*, JsCast, JsValue}; +use wasm_bindgen::prelude::*; use worker_kv::KvStore; #[wasm_bindgen] diff --git a/worker/src/fetch.rs b/worker/src/fetch.rs index 12e9a7aa..22588d3d 100644 --- a/worker/src/fetch.rs +++ b/worker/src/fetch.rs @@ -5,7 +5,7 @@ use crate::{ body::Body, futures::SendJsFuture, http::{request, response}, - Error, Result, + AbortSignal, Error, RequestInit, Result, }; /// Fetch a resource from the network. @@ -46,6 +46,47 @@ pub async fn fetch(req: http::Request>) -> Result>, + init: &RequestInit, +) -> Result> { + let fut = { + let req = req.map(Into::into); + let global = js_sys::global().unchecked_into::(); + + let req = request::into_wasm(req); + let promise = global.fetch_with_request_and_init(&req, &init.into()); + + SendJsFuture::from(promise) + }; + + fut.await + .map(|res| response::from_wasm(res.unchecked_into())) + .map_err(Error::from) +} + +pub async fn fetch_with_signal( + req: http::Request>, + signal: &AbortSignal, +) -> Result> { + let mut init = web_sys::RequestInit::new(); + init.signal(Some(signal.inner())); + + let fut = { + let req = req.map(Into::into); + let global = js_sys::global().unchecked_into::(); + + let req = request::into_wasm(req); + let promise = global.fetch_with_request_and_init(&req, &init); + + SendJsFuture::from(promise) + }; + + fut.await + .map(|res| response::from_wasm(res.unchecked_into())) + .map_err(Error::from) +} + fn _assert_send() { use crate::futures::assert_send_value; assert_send_value(fetch(http::Request::new(()))); diff --git a/worker/src/fetcher.rs b/worker/src/fetcher.rs index 533d4f15..06fd0742 100644 --- a/worker/src/fetcher.rs +++ b/worker/src/fetcher.rs @@ -5,7 +5,7 @@ use crate::{ env::EnvBinding, futures::SendJsFuture, http::{request, response}, - Result, + RequestInit, Result, }; /// A struct for invoking fetch events to other Workers. @@ -16,7 +16,27 @@ unsafe impl Sync for Fetcher {} impl Fetcher { /// Invoke a fetch event in a worker with a url and optionally a [RequestInit]. - pub async fn fetch(&self, req: http::Request) -> Result> { + pub async fn fetch( + &self, + url: impl Into, + init: Option, + ) -> Result> { + let path = url.into(); + let fut = { + let promise = match init { + Some(ref init) => self.0.fetch_with_str_and_init(&path, &init.into()), + None => self.0.fetch_with_str(&path), + }; + + SendJsFuture::from(promise) + }; + + let res = fut.await?.dyn_into()?; + Ok(response::from_wasm(res)) + } + + /// Invoke a fetch event with an existing [Request]. + pub async fn fetch_request(&self, req: http::Request) -> Result> { let fut = { let req = request::into_wasm(req); let promise = self.0.fetch(&req); diff --git a/worker/src/headers.rs b/worker/src/headers.rs new file mode 100644 index 00000000..1d97ab61 --- /dev/null +++ b/worker/src/headers.rs @@ -0,0 +1,183 @@ +use crate::{error::Error, Result}; + +use std::{ + iter::{FromIterator, Map}, + result::Result as StdResult, + str::FromStr, +}; + +use http::{header::HeaderName, HeaderMap, HeaderValue}; +use js_sys::Array; +use wasm_bindgen::JsValue; +use worker_sys::ext::HeadersExt; + +/// A [Headers](https://developer.mozilla.org/en-US/docs/Web/API/Headers) representation used in +/// Request and Response objects. +pub struct Headers(pub web_sys::Headers); + +impl std::fmt::Debug for Headers { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Headers {\n")?; + for (k, v) in self.entries() { + f.write_str(&format!("{k} = {v}\n"))?; + } + f.write_str("}\n") + } +} + +impl Headers { + /// Construct a new `Headers` struct. + pub fn new() -> Self { + Default::default() + } + + /// Returns all the values of a header within a `Headers` object with a given name. + /// Returns an error if the name is invalid (e.g. contains spaces) + pub fn get(&self, name: &str) -> Result> { + self.0.get(name).map_err(Error::from) + } + + /// Returns a boolean stating whether a `Headers` object contains a certain header. + /// Returns an error if the name is invalid (e.g. contains spaces) + pub fn has(&self, name: &str) -> Result { + self.0.has(name).map_err(Error::from) + } + + /// Returns an error if the name is invalid (e.g. contains spaces) + pub fn append(&mut self, name: &str, value: &str) -> Result<()> { + self.0.append(name, value).map_err(Error::from) + } + + /// Sets a new value for an existing header inside a `Headers` object, or adds the header if it does not already exist. + /// Returns an error if the name is invalid (e.g. contains spaces) + pub fn set(&mut self, name: &str, value: &str) -> Result<()> { + self.0.set(name, value).map_err(Error::from) + } + + /// Deletes a header from a `Headers` object. + /// Returns an error if the name is invalid (e.g. contains spaces) + /// or if the JS Headers object's guard is immutable (e.g. for an incoming request) + pub fn delete(&mut self, name: &str) -> Result<()> { + self.0.delete(name).map_err(Error::from) + } + + /// Returns an iterator allowing to go through all key/value pairs contained in this object. + pub fn entries(&self) -> HeaderIterator { + self.0 + .entries() + .into_iter() + // The entries iterator.next() will always return a proper value: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols + .map((|a| a.unwrap().into()) as F1) + // The entries iterator always returns an array[2] of strings + .map(|a: Array| (a.get(0).as_string().unwrap(), a.get(1).as_string().unwrap())) + } + + /// Returns an iterator allowing you to go through all keys of the key/value pairs contained in + /// this object. + pub fn keys(&self) -> impl Iterator { + js_sys::Object::keys(&self.0) + .into_iter() + // The keys iterator.next() will always return a proper value containing a string + .map(|a| a.as_string().unwrap()) + } + + /// Returns an iterator allowing you to go through all values of the key/value pairs contained + /// in this object. + pub fn values(&self) -> impl Iterator { + js_sys::Object::values(&self.0) + .into_iter() + // The values iterator.next() will always return a proper value containing a string + .map(|a| a.as_string().unwrap()) + } +} + +impl Default for Headers { + fn default() -> Self { + // This cannot throw an error: https://developer.mozilla.org/en-US/docs/Web/API/Headers/Headers + Headers(web_sys::Headers::new().unwrap()) + } +} + +type F1 = fn(StdResult) -> Array; +type HeaderIterator = Map, fn(Array) -> (String, String)>; + +impl IntoIterator for &Headers { + type Item = (String, String); + + type IntoIter = HeaderIterator; + + fn into_iter(self) -> Self::IntoIter { + self.entries() + } +} + +impl> FromIterator<(T, T)> for Headers { + fn from_iter>(iter: U) -> Self { + let mut headers = Headers::new(); + iter.into_iter().for_each(|(name, value)| { + headers.append(name.as_ref(), value.as_ref()).ok(); + }); + headers + } +} + +impl<'a, T: AsRef> FromIterator<&'a (T, T)> for Headers { + fn from_iter>(iter: U) -> Self { + let mut headers = Headers::new(); + iter.into_iter().for_each(|(name, value)| { + headers.append(name.as_ref(), value.as_ref()).ok(); + }); + headers + } +} + +impl AsRef for Headers { + fn as_ref(&self) -> &JsValue { + &self.0 + } +} + +impl From<&HeaderMap> for Headers { + fn from(map: &HeaderMap) -> Self { + map.keys() + .flat_map(|name| { + map.get_all(name) + .into_iter() + .map(move |value| (name.to_string(), value.to_str().unwrap().to_owned())) + }) + .collect() + } +} + +impl From for Headers { + fn from(map: HeaderMap) -> Self { + (&map).into() + } +} + +impl From<&Headers> for HeaderMap { + fn from(headers: &Headers) -> Self { + headers + .into_iter() + .map(|(name, value)| { + ( + HeaderName::from_str(&name).unwrap(), + HeaderValue::from_str(&value).unwrap(), + ) + }) + .collect() + } +} + +impl From for HeaderMap { + fn from(headers: Headers) -> Self { + (&headers).into() + } +} + +impl Clone for Headers { + fn clone(&self) -> Self { + // Headers constructor doesn't throw an error + Headers(web_sys::Headers::new_with_headers(&self.0).unwrap()) + } +} diff --git a/worker/src/http/request.rs b/worker/src/http/request.rs index fefab465..8b89611d 100644 --- a/worker/src/http/request.rs +++ b/worker/src/http/request.rs @@ -1,13 +1,9 @@ //! Functions for translating requests to and from JS -use bytes::Buf; -use futures_util::StreamExt; -use js_sys::Uint8Array; use wasm_bindgen::JsCast; -use worker_sys::console_log; use worker_sys::ext::{HeadersExt, RequestExt}; -use crate::{AbortSignal, Cf, CfProperties}; +use crate::{AbortSignal, Cf}; use crate::body::Body; @@ -117,12 +113,12 @@ pub fn into_wasm(mut req: http::Request) -> web_sys::Request { init.redirect(redirect.into()); } - if let Some(cf) = req.extensions_mut().remove::() { + if let Some(cf) = req.extensions_mut().remove::() { // TODO: this should be handled in worker-sys let r = ::js_sys::Reflect::set( init.as_ref(), &wasm_bindgen::JsValue::from("cf"), - &wasm_bindgen::JsValue::from(&cf), + &wasm_bindgen::JsValue::from(cf.inner()), ); debug_assert!( r.is_ok(), @@ -131,19 +127,8 @@ pub fn into_wasm(mut req: http::Request) -> web_sys::Request { let _ = r; } - let body = req.into_body(); - let body = if body.is_none() { - None - } else { - let stream = wasm_streams::ReadableStream::from_stream(body.map(|chunk| { - chunk - .map(|buf| js_sys::Uint8Array::from(buf.chunk()).into()) - .map_err(|_| wasm_bindgen::JsValue::NULL) - })); - - Some(stream.into_raw().unchecked_into()) - }; - init.body(body.as_ref()); + let s = req.into_body().into_readable_stream(); + init.body(s.map(|s| s.into()).as_ref()); web_sys::Request::new_with_str_and_init(&uri, &init).unwrap() } diff --git a/worker/src/http/response.rs b/worker/src/http/response.rs index 81497a8b..802ceca0 100644 --- a/worker/src/http/response.rs +++ b/worker/src/http/response.rs @@ -1,13 +1,10 @@ //! Functions for translating responses to and from JS -use bytes::Buf; -use futures_util::StreamExt; use wasm_bindgen::JsCast; use worker_sys::ext::{HeadersExt, ResponseExt, ResponseInitExt}; -use crate::WebSocket; - use crate::body::Body; +use crate::WebSocket; /// Create a [`http::Response`] from a [`web_sys::Response`]. /// @@ -87,18 +84,7 @@ pub fn into_wasm(mut res: http::Response) -> web_sys::Response { init.websocket(ws.as_ref()); } - let body = res.into_body(); - let body = if body.is_none() { - None - } else { - let stream = wasm_streams::ReadableStream::from_stream(body.map(|chunk| { - chunk - .map(|buf| js_sys::Uint8Array::from(buf.chunk()).into()) - .map_err(|_| wasm_bindgen::JsValue::NULL) - })); - - Some(stream.into_raw().unchecked_into()) - }; + let s = res.into_body().into_readable_stream(); - web_sys::Response::new_with_opt_readable_stream_and_init(body.as_ref(), &init).unwrap() + web_sys::Response::new_with_opt_readable_stream_and_init(s.as_ref(), &init).unwrap() } diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 95f201ad..ae104db2 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -33,11 +33,14 @@ pub use crate::dynamic_dispatch::*; pub use crate::env::{Env, EnvBinding, Secret, Var}; pub use crate::error::Error; pub use crate::fetch::fetch; +pub use crate::fetch::fetch_with_init; +pub use crate::fetch::fetch_with_signal; pub use crate::fetcher::Fetcher; // pub use crate::futures::spawn_local; #[cfg(feature = "queue")] pub use crate::queue::*; pub use crate::r2::*; +pub use crate::request_init::RequestInit; pub use crate::schedule::*; pub use crate::socket::*; pub use crate::streams::*; @@ -59,10 +62,12 @@ mod error; mod fetch; mod fetcher; mod futures; +mod headers; pub mod http; #[cfg(feature = "queue")] mod queue; mod r2; +mod request_init; mod schedule; mod socket; mod streams; diff --git a/worker/src/queue.rs b/worker/src/queue.rs index 60dce091..8de04584 100644 --- a/worker/src/queue.rs +++ b/worker/src/queue.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use crate::{env::EnvBinding, futures::SendJsFuture, Date, Error, Result}; use js_sys::Array; use serde::{de::DeserializeOwned, Serialize}; -use wasm_bindgen::{prelude::*, JsCast}; +use wasm_bindgen::prelude::*; use worker_sys::{MessageBatch as MessageBatchSys, Queue as EdgeQueue}; static BODY_KEY_STR: &str = "body"; diff --git a/worker/src/cf/properties.rs b/worker/src/request_init.rs similarity index 68% rename from worker/src/cf/properties.rs rename to worker/src/request_init.rs index 99f5ab4e..26aa418a 100644 --- a/worker/src/cf/properties.rs +++ b/worker/src/request_init.rs @@ -1,12 +1,97 @@ -// TODO: the worker-sys crate should contain the JS bindings rather than doing it in here - use std::collections::HashMap; +use crate::headers::Headers; +use crate::http::Method; + use js_sys::Object; -use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; +use serde::Serialize; +use wasm_bindgen::prelude::*; + +/// Optional options struct that contains settings to apply to the `Request`. +pub struct RequestInit { + /// Currently requires a manual conversion from your data into a [`wasm_bindgen::JsValue`]. + pub body: Option, + /// Headers associated with the outbound `Request`. + pub headers: Headers, + /// Cloudflare-specific properties that can be set on the `Request` that control how Cloudflare’s + /// edge handles the request. + pub cf: CfProperties, + /// The HTTP Method used for this `Request`. + pub method: Method, + /// The redirect mode to use: follow, error, or manual. The default for a new Request object is + /// follow. Note, however, that the incoming Request property of a FetchEvent will have redirect + /// mode manual. + pub redirect: RequestRedirect, +} + +impl RequestInit { + pub fn new() -> Self { + Default::default() + } + + pub fn with_headers(&mut self, headers: Headers) -> &mut Self { + self.headers = headers; + self + } + + pub fn with_method(&mut self, method: Method) -> &mut Self { + self.method = method; + self + } + + pub fn with_redirect(&mut self, redirect: RequestRedirect) -> &mut Self { + self.redirect = redirect; + self + } + + pub fn with_body(&mut self, body: Option) -> &mut Self { + self.body = body; + self + } + + pub fn with_cf_properties(&mut self, props: CfProperties) -> &mut Self { + self.cf = props; + self + } +} + +impl From<&RequestInit> for web_sys::RequestInit { + fn from(req: &RequestInit) -> Self { + let mut inner = web_sys::RequestInit::new(); + inner.headers(req.headers.as_ref()); + inner.method(req.method.as_ref()); + inner.redirect(req.redirect.into()); + inner.body(req.body.as_ref()); + + // set the Cloudflare-specific `cf` property on FFI RequestInit + let r = ::js_sys::Reflect::set( + inner.as_ref(), + &JsValue::from("cf"), + &JsValue::from(&req.cf), + ); + debug_assert!( + r.is_ok(), + "setting properties should never fail on our dictionary objects" + ); + let _ = r; + + inner + } +} + +impl Default for RequestInit { + fn default() -> Self { + Self { + body: None, + headers: Headers::new(), + cf: CfProperties::default(), + method: Method::GET, + redirect: RequestRedirect::default(), + } + } +} /// -#[derive(Clone)] pub struct CfProperties { /// Whether Cloudflare Apps should be enabled for this request. Defaults to `true`. pub apps: Option, @@ -25,7 +110,7 @@ pub struct CfProperties { pub cache_ttl: Option, /// This option is a version of the cacheTtl feature which chooses a TTL based on the response’s /// status code. If the response to this request has a status code that matches, Cloudflare will - /// cache for the instructed time, and override cache instructives sent by the origin. For + /// cache for the instructed time, and override cache directives sent by the origin. For /// example: { "200-299": 86400, 404: 1, "500-599": 0 }. The value can be any integer, including /// zero and negative integers. A value of 0 indicates that the cache asset expires immediately. /// Any negative value instructs Cloudflare not to cache at all. @@ -58,13 +143,11 @@ pub struct CfProperties { pub scrape_shield: Option, } -unsafe impl Send for CfProperties {} -unsafe impl Sync for CfProperties {} - impl From<&CfProperties> for JsValue { fn from(props: &CfProperties) -> Self { let obj = js_sys::Object::new(); let defaults = CfProperties::default(); + let serializer = serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true); set_prop( &obj, @@ -110,7 +193,7 @@ impl From<&CfProperties> for JsValue { set_prop( &obj, &JsValue::from("cacheTtlByStatus"), - &serde_wasm_bindgen::to_value(&ttl_status_map).unwrap_or_default(), + &ttl_status_map.serialize(&serializer).unwrap_or_default(), ); set_prop( @@ -198,9 +281,6 @@ pub struct MinifyConfig { pub css: bool, } -unsafe impl Send for MinifyConfig {} -unsafe impl Sync for MinifyConfig {} - /// Configuration options for Cloudflare's image optimization feature: /// #[wasm_bindgen] @@ -211,9 +291,6 @@ pub enum PolishConfig { Lossless, } -unsafe impl Send for PolishConfig {} -unsafe impl Sync for PolishConfig {} - impl Default for PolishConfig { fn default() -> Self { Self::Off @@ -229,3 +306,32 @@ impl From for &str { } } } + +#[wasm_bindgen] +#[derive(Default, Clone, Copy)] +pub enum RequestRedirect { + Error, + #[default] + Follow, + Manual, +} + +impl From for &str { + fn from(redirect: RequestRedirect) -> Self { + match redirect { + RequestRedirect::Error => "error", + RequestRedirect::Follow => "follow", + RequestRedirect::Manual => "manual", + } + } +} + +impl From for web_sys::RequestRedirect { + fn from(redir: RequestRedirect) -> Self { + match redir { + RequestRedirect::Error => web_sys::RequestRedirect::Error, + RequestRedirect::Follow => web_sys::RequestRedirect::Follow, + RequestRedirect::Manual => web_sys::RequestRedirect::Manual, + } + } +}