diff --git a/Cargo.lock b/Cargo.lock
index 2782755e..9016602a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -638,9 +638,9 @@ checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "js-sys"
-version = "0.3.63"
+version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790"
+checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a"
dependencies = [
"wasm-bindgen",
]
@@ -1641,9 +1641,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73"
+checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@@ -1651,9 +1651,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb"
+checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd"
dependencies = [
"bumpalo",
"log",
@@ -1666,9 +1666,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-cli-support"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8315d6503415e5d44ff64f1ba34aefd8264c561df17e0f1c8eb8c96bde79c45e"
+checksum = "d21c60239a09bf9bab8dfa752be4e6c637db22296b9ded493800090448692da9"
dependencies = [
"anyhow",
"base64 0.9.3",
@@ -1688,9 +1688,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-externref-xform"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4522bf3be16c6274c87a5a2c5d2a62efa80253b025f8e813f9682d0d6a8a8fca"
+checksum = "bafbe1984f67cc12645f12ab65e6145e8ddce1ab265d0be58435f25bb0ce2608"
dependencies = [
"anyhow",
"walrus",
@@ -1698,9 +1698,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
-version = "0.4.36"
+version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e"
+checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
dependencies = [
"cfg-if",
"js-sys",
@@ -1710,9 +1710,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258"
+checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -1720,9 +1720,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8"
+checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
@@ -1733,9 +1733,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-multi-value-xform"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "113256596776ebb4b243512d3711e73d5475eaeff373e1ae65427c66e5aa2073"
+checksum = "581419e3995571a1d2d066e360ca1c0c09da097f5a53c98e6f00d96eddaf0ffe"
dependencies = [
"anyhow",
"walrus",
@@ -1743,9 +1743,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93"
+checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "wasm-bindgen-test"
@@ -1773,9 +1773,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-threads-xform"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89106aaf83a2b80464fc8f60a074a4575135b73a491e174f35bbeae6ff0d7ec6"
+checksum = "e05d272073981137e8426cf2a6830d43d1f84f988a050b2f8b210f0e266b8983"
dependencies = [
"anyhow",
"walrus",
@@ -1784,9 +1784,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-wasm-conventions"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "84e5ad27a7930400994cb40823d3d4a7ef235fac52d0c75ebd61fa40eba994a8"
+checksum = "0e9c65b1ff5041ea824ca24c519948aec16fb6611c617d601623c0657dfcd47b"
dependencies = [
"anyhow",
"walrus",
@@ -1794,9 +1794,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-wasm-interpreter"
-version = "0.2.86"
+version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e69500063b7b20f3e9422d78c2b381dd192c7c4ebaef34d205332877cd78e0d3"
+checksum = "7c5c796220738ab5d44666f37205728a74141c0039d1166bcf8110b26bafaa1e"
dependencies = [
"anyhow",
"log",
@@ -1819,9 +1819,9 @@ dependencies = [
[[package]]
name = "wasm-streams"
-version = "0.3.0"
+version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
+checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129"
dependencies = [
"futures-util",
"js-sys",
@@ -1838,9 +1838,9 @@ checksum = "5fe3d5405e9ea6c1317a656d6e0820912d8b7b3607823a7596117c8f666daf6f"
[[package]]
name = "web-sys"
-version = "0.3.63"
+version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2"
+checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -2040,7 +2040,7 @@ dependencies = [
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
- "wasm-streams 0.3.0",
+ "wasm-streams 0.4.0",
"web-sys",
"worker-kv",
"worker-macros",
@@ -2093,6 +2093,7 @@ name = "worker-sandbox"
version = "0.1.0"
dependencies = [
"blake2",
+ "bytes",
"cfg-if",
"chrono",
"console_error_panic_hook",
diff --git a/Cargo.toml b/Cargo.toml
index d9f9e192..85b9c79d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,3 +30,6 @@ lto = true
[profile.release.package."*"]
codegen-units = 1
opt-level = "z"
+
+[workspace.dependencies]
+wasm-bindgen = "=0.2.87"
\ No newline at end of file
diff --git a/package.json b/package.json
index 35d45efb..97bfc9f3 100644
--- a/package.json
+++ b/package.json
@@ -22,6 +22,6 @@
"vitest": "^0.32.0"
},
"scripts": {
- "test": ""
+ "test": "cd worker-sandbox && worker-build --dev && vitest"
}
}
diff --git a/worker-macros/Cargo.toml b/worker-macros/Cargo.toml
index f0aeca2d..beb9e99f 100644
--- a/worker-macros/Cargo.toml
+++ b/worker-macros/Cargo.toml
@@ -17,7 +17,7 @@ worker-sys = { path = "../worker-sys", version = "0.0.9" }
syn = "2.0.17"
proc-macro2 = "1.0.60"
quote = "1.0.28"
-wasm-bindgen = "=0.2.86"
+wasm-bindgen = {workspace=true}
wasm-bindgen-futures = "0.4.36"
wasm-bindgen-macro-support = "0.2.86"
diff --git a/worker-sandbox/Cargo.toml b/worker-sandbox/Cargo.toml
index e2815c61..6fa76ba8 100644
--- a/worker-sandbox/Cargo.toml
+++ b/worker-sandbox/Cargo.toml
@@ -14,6 +14,7 @@ default = ["console_error_panic_hook"]
[dependencies]
blake2 = "0.10.6"
+bytes = "1.4.0"
chrono = { version = "0.4.26", default-features = false, features = [
"wasmbind",
"clock",
@@ -26,11 +27,11 @@ http = "0.2.9"
regex = "1.8.4"
serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.96"
-worker = { path = "../worker", version = "0.0.17", features= ["queue", "d1"] }
+worker = { path = "../worker", version = "0.0.17", features = ["queue", "d1"] }
futures-channel = "0.3.28"
futures-util = { version = "0.3.28", default-features = false }
rand = "0.8.5"
-uuid = {version = "1.3.3", features = ["v4", "serde"]}
+uuid = { version = "1.3.3", features = ["v4", "serde"] }
serde-wasm-bindgen = "0.5.0"
md5 = "0.7.0"
router-service = { git = "https://github.com/zebp/router-service", version = "0.1.0" }
@@ -38,7 +39,9 @@ tower = "0.4.13"
[dev-dependencies]
futures-channel = { version = "0.3.28", features = ["sink"] }
-futures-util = { version = "0.3.28", default-features = false, features = ["sink"] }
+futures-util = { version = "0.3.28", default-features = false, features = [
+ "sink",
+] }
reqwest = { version = "0.11.18", features = [
"blocking",
"json",
diff --git a/worker-sandbox/src/d1.rs b/worker-sandbox/src/d1.rs
index 96f992cf..58268777 100644
--- a/worker-sandbox/src/d1.rs
+++ b/worker-sandbox/src/d1.rs
@@ -8,10 +8,7 @@ struct Person {
age: u32,
}
-pub async fn prepared_statement(
- _req: http::Request
,
- env: Env,
-) -> Result> {
+pub async fn prepared_statement(env: &Env) -> Result> {
let db = env.d1("DB")?;
let stmt = worker::query!(&db, "SELECT * FROM people WHERE name = ?", "Ryan Upton")?;
@@ -47,7 +44,7 @@ pub async fn prepared_statement(
Ok(http::Response::new("ok".into()))
}
-pub async fn batch(_req: http::Request, env: Env) -> Result> {
+pub async fn batch(env: &Env) -> Result> {
let db = env.d1("DB")?;
let mut results = db
.batch(vec![
@@ -71,7 +68,7 @@ pub async fn batch(_req: http::Request, env: Env) -> Result, env: Env) -> Result> {
+pub async fn exec(req: http::Request, env: &Env) -> Result> {
let db = env.d1("DB")?;
let result = db
@@ -84,13 +81,13 @@ pub async fn exec(req: http::Request, env: Env) -> Result, env: Env) -> Result> {
+pub async fn dump(env: &Env) -> Result> {
let db = env.d1("DB")?;
let bytes = db.dump().await?;
Ok(http::Response::new(bytes.into()))
}
-pub async fn error(_req: http::Request, env: Env) -> Result> {
+pub async fn error(env: &Env) -> Result> {
let db = env.d1("DB")?;
let error = db
.exec("THIS IS NOT VALID SQL")
@@ -98,7 +95,10 @@ pub async fn error(_req: http::Request, env: Env) -> Result {
+ if let Some(text) = msg.text() {
+ server.send_with_str(text).expect("could not relay text");
+ }
+ }
+ WebsocketEvent::Close(_) => {
+ // Sets a key in a test KV so the integration tests can query if we
+ // actually got the close event. We can't use the shared dat a for this
+ // because miniflare resets that every request.
+ some_namespace_kv
+ .put("got-close-event", "true")
+ .unwrap()
+ .execute()
+ .await
+ .unwrap();
+ }
+ }
+ }
+ });
+
+ let mut response = Response::builder()
+ .status(101)
+ .body(Body::empty())
+ .unwrap();
+
+ response.extensions_mut().insert(pair.client);
+
+ Ok(response)
+ })
+ .post("/xor/:num", |req, ctx| async move {
+ let num: u8 = match ctx.param("num").unwrap().parse() {
+ Ok(num) => num,
+ Err(_) => return Response::builder()
+ .status(400)
+ .body("invalid byte".into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ };
+
+ let xor_stream = req.into_body().into_stream().map_ok(move |buf| {
+ let mut vec = buf.to_vec();
+ vec.iter_mut().for_each(|x| *x ^= num);
+ Bytes::from(vec)
+ });
+
+ let body = worker::body::Body::from_stream(xor_stream)?;
+ let resp = Response::builder()
+ .body(body)
+ .unwrap();
+ Ok(resp)
+ })
+ .get("/request-init-fetch", |_, _| async move {
+ let init = RequestInit::new();
+ let req = http::Request::post("https://cloudflare.com").body(()).unwrap();
+ fetch_with_init(req, &init).await
+ })
+ .get("/request-init-fetch-post", |_, _| async move {
+ let mut init = RequestInit::new();
+ init.method = Method::POST;
+
+ let req = http::Request::post("https://httpbin.org/post").body(()).unwrap();
+ fetch_with_init(req, &init).await
+ })
+ .get("/cancelled-fetch", |_, _| async move {
+ let controller = AbortController::default();
+ let signal = controller.signal();
+
+ let (tx, rx) = futures_channel::oneshot::channel();
+
+ // Spawns a future that'll make our fetch request and not block this function.
+ wasm_bindgen_futures::spawn_local({
+ async move {
+ let req = http::Request::post("https://cloudflare.com").body(()).unwrap();
+ let resp = fetch_with_signal(req, &signal).await;
+ tx.send(resp).unwrap();
+ }
+ });
+
+ // And then we try to abort that fetch as soon as we start it, hopefully before
+ // cloudflare.com responds.
+ controller.abort();
+
+ let res = rx.await.unwrap();
+ let res = res.unwrap_or_else(|err| {
+ let text = err.to_string();
+ Response::new(text.into())
+ });
+
+ Ok(res)
+ })
+ .get("/fetch-timeout", |_, _| async move {
+ let controller = AbortController::default();
+ let signal = controller.signal();
+
+ let fetch_fut = async {
+ let req = http::Request::post("https://miniflare.mocks/delay").body(()).unwrap();
+ let resp = fetch_with_signal(req, &signal).await?;
+ let text = resp.into_body().text().await?;
+ Ok::(text)
+ };
+ let delay_fut = async {
+ Delay::from(Duration::from_millis(1)).await;
+ controller.abort();
+ Ok(Response::new("Cancelled".into()))
+ };
+
+ futures_util::pin_mut!(fetch_fut);
+ futures_util::pin_mut!(delay_fut);
+
+ match futures_util::future::select(delay_fut, fetch_fut).await {
+ Either::Left((res, cancelled_fut)) => {
+ // Ensure that the cancelled future returns an AbortError.
+ match cancelled_fut.await {
+ Err(e) if e.to_string().contains("AbortError") => { /* Yay! It worked, let's do nothing to celebrate */},
+ Err(e) => panic!("Fetch errored with a different error than expected: {:#?}", e),
+ Ok(text) => panic!("Fetch unexpectedly succeeded: {}", text)
+ }
+
+ res
+ },
+ Either::Right(_) => panic!("Delay future should have resolved first"),
+ }
+ })
+ .get("/redirect-default", |_, _| async move {
+ Ok(Response::builder()
+ .status(302)
+ .header("Location", "https://example.com/")
+ .body(Body::empty()).unwrap())
+ })
+ .get("/redirect-307", |_, _| async move {
+ Ok(Response::builder()
+ .status(307)
+ .header("Location", "https://example.com/")
+ .body(Body::empty()).unwrap())
+ })
+ .get("/now", |_, _| async move {
+ let now = chrono::Utc::now();
+ let js_date: Date = now.into();
+ Ok(Response::new(js_date.to_string().into()))
+ })
+ .get("/custom-response-body", |_, _| async move {
+ Ok(Response::new(vec![b'h', b'e', b'l', b'l', b'o'].into()))
+ })
+ .get("/init-called", |_, _| async move {
+ let init_called = GLOBAL_STATE.load(Ordering::SeqCst);
+ Ok(Response::new(init_called.to_string().into()))
+ })
+ .get("/cache-example", |req, _| async move {
+ //console_log!("url: {}", req.uri().to_string());
+ let cache = Cache::default();
+ let key = req.uri().to_string();
+ if let Some(resp) = cache.get(&key, true).await? {
+ //console_log!("Cache HIT!");
+ Ok(resp)
+ } else {
+ //console_log!("Cache MISS!");
+
+ let mut resp = Response::builder()
+ .header("content-type", "application/json")
+ // Cache API respects Cache-Control headers. Setting s-max-age to 10
+ // will limit the response to be in cache for 10 seconds max
+ .header("cache-control", "s-maxage=10")
+ .body(serde_json::json!({ "timestamp": Date::now().as_millis() }).to_string().into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ .unwrap();
+
+ cache.put(key, resp.clone()).await?;
+ Ok(resp)
+ }
+ })
+ .get("/cache-api/get/:key", |_req, ctx| async move {
+ if let Some(key) = ctx.param("key") {
+ let cache = Cache::default();
+ if let Some(resp) = cache.get(format!("https://{key}"), true).await? {
+ return Ok(resp);
+ } else {
+ return Ok(Response::new("cache miss".into()));
+ }
+ }
+
+ Response::builder()
+ .status(400)
+ .body("key missing".into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ })
+ .put("/cache-api/put/:key", |_req, ctx| async move {
+ if let Some(key) = ctx.param("key") {
+ let cache = Cache::default();
+
+ let mut resp = Response::builder()
+ .header("content-type", "application/json")
+ // Cache API respects Cache-Control headers. Setting s-max-age to 10
+ // will limit the response to be in cache for 10 seconds max
+ .header("cache-control", "s-maxage=10")
+ .body(serde_json::json!({ "timestamp": Date::now().as_millis() }).to_string().into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ .unwrap();
+
+ cache.put(format!("https://{key}"), resp.clone()).await?;
+ return Ok(resp);
+ }
+
+ Response::builder()
+ .status(400)
+ .body("key missing".into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ })
+ .post("/cache-api/delete/:key", |_req, ctx| async move {
+ if let Some(key) = ctx.param("key") {
+ let cache = Cache::default();
+
+ let res = cache.delete(format!("https://{key}"), true).await?;
+ return Ok(Response::new(serde_json::to_string(&res)?.into()));
+ }
+
+ Response::builder()
+ .status(400)
+ .body("key missing".into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ })
+ .get("/cache-stream", |req, _| async move {
+ //console_log!("url: {}", req.uri().to_string());
+ let cache = Cache::default();
+ let key = req.uri().to_string();
+ if let Some(resp) = cache.get(&key, true).await? {
+ //console_log!("Cache HIT!");
+ Ok(resp)
+ } else {
+ //console_log!("Cache MISS!");
+ let mut rng = rand::thread_rng();
+ let count = rng.gen_range(0..10);
+ let stream = futures_util::stream::repeat("Hello, world!\n")
+ .take(count)
+ .then(|text| async move {
+ Delay::from(Duration::from_millis(50)).await;
+ Result::Ok(text.as_bytes().to_vec())
+ });
+
+ let body = worker::body::Body::from_stream(stream)?;
+
+ //console_log!("resp = {:?}", resp);
+
+ let mut resp = Response::builder()
+ // Cache API respects Cache-Control headers. Setting s-max-age to 10
+ // will limit the response to be in cache for 10 seconds max
+ .header("cache-control", "s-maxage=10")
+ .body(body)
+ .unwrap();
+
+ cache.put(key, resp.clone()).await?;
+ Ok(resp)
+ }
+ })
+ .get("/remote-by-request", |req, ctx| async move {
+ let fetcher = ctx.data.service("remote")?;
+ fetcher.fetch_request(req).await
+ })
+ .get("/remote-by-path", |req, ctx| async move {
+ let fetcher = ctx.data.service("remote")?;
+ let mut init = RequestInit::new();
+ init.with_method(Method::POST);
+
+ fetcher.fetch(req.uri().to_string(), Some(init)).await
+ })
+ .post("/queue/send/:id", |_req, ctx| async move {
+ let id = match ctx.param("id").map(|id| Uuid::try_parse(id).ok()).and_then(|u|u) {
+ Some(id) => id,
+ None => {
+ return Response::builder()
+ .status(400)
+ .body("error".into())
+ .map_err(|_| Error::RustError("Failed to parse id, expected a UUID".into()));
+ }
+ };
+ let my_queue = match ctx.data.queue("my_queue") {
+ Ok(queue) => queue,
+ Err(err) => {
+ return Response::builder()
+ .status(500)
+ .body(format!("Failed to get queue: {err:?}").into())
+ .map_err(|e| Error::RustError(e.to_string()));
+ }
+ };
+ match my_queue.send(&QueueBody {
+ id: id.to_string(),
+ }).await {
+ Ok(_) => {
+ Ok(Response::new("Message sent".into()))
+ }
+ Err(err) => {
+ Response::builder()
+ .status(500)
+ .body(format!("Failed to send message to queue: {err:?}").into())
+ .map_err(|e| Error::RustError(e.to_string()))
+ }
+ }
+ })
+ .get("/queue", |_req, _ctx| async move {
+ let guard = GLOBAL_QUEUE_STATE.lock().unwrap();
+ let messages: Vec = guard.clone();
+ let json = serde_json::to_string(&messages).unwrap();
+ Ok(Response::new(Body::from(json)))
+ })
+ .get("/d1/prepared", |_, ctx| async move {
+ d1::prepared_statement(ctx.data.as_ref()).await
+ })
+ .get("/d1/batch", |_, ctx| async move {
+ d1::batch(ctx.data.as_ref()).await
+ })
+ .get("/d1/dump", |_, ctx| async move {
+ d1::dump(ctx.data.as_ref()).await
+ })
+ .post("/d1/exec", |req, ctx| async move {
+ d1::exec(req, ctx.data.as_ref()).await
+ })
+ .get("/d1/error", |_, ctx| async move {
+ d1::error(ctx.data.as_ref()).await
+ })
+ .get("/r2/list-empty", |_, ctx| async move {
+ r2::list_empty(ctx.data.as_ref()).await
+ })
+ .get("/r2/list", |_, ctx| async move {
+ r2::list(ctx.data.as_ref()).await
+ })
+ .get("/r2/get-empty", |_, ctx| async move {
+ r2::get_empty(ctx.data.as_ref()).await
+ })
+ .get("/r2/get", |_, ctx| async move {
+ r2::get(ctx.data.as_ref()).await
+ })
+ .put("/r2/put", |_, ctx| async move {
+ r2::put(ctx.data.as_ref()).await
+ })
+ .put("/r2/put-properties", |_, ctx| async move {
+ r2::put_properties(ctx.data.as_ref()).await
+ })
+ .put("/r2/put-multipart", |_, ctx| async move {
+ r2::put_multipart(ctx.data.as_ref()).await
+ })
+ .delete("/r2/delete", |_, ctx| async move {
+ r2::delete(ctx.data.as_ref()).await
+ })
.any("/*catchall", |_, ctx| async move {
Ok(Response::builder()
.status(404)
@@ -407,12 +770,12 @@ pub struct QueueBody {
pub async fn queue(message_batch: MessageBatch, _env: Env, _ctx: Context) -> Result<()> {
let mut guard = GLOBAL_QUEUE_STATE.lock().unwrap();
for message in message_batch.messages()? {
- console_log!(
+ /*console_log!(
"Received queue message {:?}, with id {} and timestamp: {}",
message.body,
message.id,
message.timestamp.to_string()
- );
+ );*/
guard.push(message.body);
}
Ok(())
diff --git a/worker-sandbox/tests/d1.spec.ts b/worker-sandbox/tests/d1.spec.ts
index 5e426a52..1a7849d1 100644
--- a/worker-sandbox/tests/d1.spec.ts
+++ b/worker-sandbox/tests/d1.spec.ts
@@ -1,11 +1,8 @@
import { describe, test, expect, beforeAll } from "vitest";
-
-const hasLocalDevServer = await fetch("http://localhost:8787/request")
- .then((resp) => resp.ok)
- .catch(() => false);
+import { mf } from "./mf";
async function exec(query: string): Promise {
- const resp = await fetch("http://localhost:8787/d1/exec", {
+ const resp = await mf.dispatchFetch("https://fake.host/d1/exec", {
method: "POST",
body: query.split("\n").join(""),
});
@@ -15,7 +12,7 @@ async function exec(query: string): Promise {
return Number(body);
}
-describe.skipIf(!hasLocalDevServer)("d1", () => {
+describe("d1", () => {
test("create table", async () => {
const query = `CREATE TABLE IF NOT EXISTS uniqueTable (
id INTEGER PRIMARY KEY,
@@ -49,22 +46,22 @@ describe.skipIf(!hasLocalDevServer)("d1", () => {
});
test("prepared statement", async () => {
- const resp = await fetch("http://localhost:8787/d1/prepared");
+ const resp = await mf.dispatchFetch("https://fake.host/d1/prepared");
expect(resp.status).toBe(200);
});
test("batch", async () => {
- const resp = await fetch("http://localhost:8787/d1/batch");
+ const resp = await mf.dispatchFetch("https://fake.host/d1/batch");
expect(resp.status).toBe(200);
});
test("dump", async () => {
- const resp = await fetch("http://localhost:8787/d1/dump");
+ const resp = await mf.dispatchFetch("https://fake.host/d1/dump");
expect(resp.status).toBe(200);
});
- test("dump", async () => {
- const resp = await fetch("http://localhost:8787/d1/error");
+ test("error", async () => {
+ const resp = await mf.dispatchFetch("https://fake.host/d1/error");
expect(resp.status).toBe(200);
});
});
diff --git a/worker-sandbox/tests/mf.ts b/worker-sandbox/tests/mf.ts
index a2560212..0c6bc64d 100644
--- a/worker-sandbox/tests/mf.ts
+++ b/worker-sandbox/tests/mf.ts
@@ -6,6 +6,7 @@ export const mf = new Miniflare({
cache: true,
cachePersist: false,
d1Persist: false,
+ d1Databases: ["DB"],
kvPersist: false,
r2Persist: false,
modules: true,
diff --git a/worker-sandbox/tests/request.spec.ts b/worker-sandbox/tests/request.spec.ts
index 56a24a0c..96f85b62 100644
--- a/worker-sandbox/tests/request.spec.ts
+++ b/worker-sandbox/tests/request.spec.ts
@@ -118,7 +118,7 @@ test("catchall", async () => {
method: "OPTIONS",
});
- expect(await resp.text()).toBe("/hello-world");
+ expect(await resp.text()).toBe("hello-world");
});
test("redirect default", async () => {
diff --git a/worker-sandbox/tests/subrequest.spec.ts b/worker-sandbox/tests/subrequest.spec.ts
index 83e1ec2c..ffe5aee2 100644
--- a/worker-sandbox/tests/subrequest.spec.ts
+++ b/worker-sandbox/tests/subrequest.spec.ts
@@ -17,7 +17,7 @@ describe("subrequest", () => {
expect(await resp.text()).toBe("Cancelled");
});
- test.skip("request init fetch post", async () => {
+ test("request init fetch post", async () => {
const resp = await mf.dispatchFetch(
"https://fake.host/request-init-fetch-post"
);
diff --git a/worker-sys/Cargo.toml b/worker-sys/Cargo.toml
index 248ced76..fb897475 100644
--- a/worker-sys/Cargo.toml
+++ b/worker-sys/Cargo.toml
@@ -10,7 +10,7 @@ description = "Low-level extern definitions / FFI bindings to the Cloudflare Wor
[dependencies]
cfg-if = "1.0.0"
js-sys = "0.3.63"
-wasm-bindgen = "=0.2.86"
+wasm-bindgen = {workspace=true}
[dependencies.web-sys]
version = "0.3.63"
diff --git a/worker-sys/src/types/incoming_request_cf_properties.rs b/worker-sys/src/types/incoming_request_cf_properties.rs
index 7e745422..057492db 100644
--- a/worker-sys/src/types/incoming_request_cf_properties.rs
+++ b/worker-sys/src/types/incoming_request_cf_properties.rs
@@ -14,6 +14,9 @@ extern "C" {
#[wasm_bindgen(method, getter)]
pub fn asn(this: &IncomingRequestCfProperties) -> u32;
+ #[wasm_bindgen(method, getter, js_name=asOrganization)]
+ pub fn as_organization(this: &IncomingRequestCfProperties) -> String;
+
#[wasm_bindgen(method, getter)]
pub fn country(this: &IncomingRequestCfProperties) -> Option;
diff --git a/worker/Cargo.toml b/worker/Cargo.toml
index 445d6fb7..1f9b2c40 100644
--- a/worker/Cargo.toml
+++ b/worker/Cargo.toml
@@ -25,10 +25,10 @@ serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.96"
tokio = { version = "1.28", default-features = false }
url = "2.4.0"
-wasm-bindgen = "=0.2.86"
+wasm-bindgen = {workspace=true}
wasm-bindgen-futures = "0.4.36"
serde-wasm-bindgen = "0.5.0"
-wasm-streams = "0.3.0"
+wasm-streams = "0.4.0"
worker-kv = "0.6.0"
worker-macros = { path = "../worker-macros", version = "0.0.9" }
worker-sys = { path = "../worker-sys", version = "0.0.9" }
diff --git a/worker/js/hacks.js b/worker/js/hacks.js
deleted file mode 100644
index d40cd7ef..00000000
--- a/worker/js/hacks.js
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * @param {Request | Response} r
- * @returns {Promise}
- */
-export async function collectBytes(r) {
- return await new Response(
- r.body.pipeThrough(
- new TransformStream({
- transform(chunk, controller) {
- console.log("TRANSFORM", chunk);
- const cloned = new Uint8Array(chunk);
- controller.enqueue(cloned);
- },
- })
- )
- ).arrayBuffer();
-}
diff --git a/worker/src/body.rs b/worker/src/body.rs
index 7b78db84..ace962eb 100644
--- a/worker/src/body.rs
+++ b/worker/src/body.rs
@@ -7,6 +7,7 @@ mod wasm;
pub use body::Body;
pub(crate) use body::BodyInner;
+pub(crate) use body::BoxBodyReader;
pub use http_body::Body as HttpBody;
pub use to_bytes::to_bytes;
diff --git a/worker/src/body/body.rs b/worker/src/body/body.rs
index c74ed63e..2b0c45ef 100644
--- a/worker/src/body/body.rs
+++ b/worker/src/body/body.rs
@@ -3,28 +3,18 @@ use std::{
task::{Context, Poll},
};
-use bytes::Bytes;
-use futures_util::Stream;
-use http::HeaderMap;
-use js_sys::{ArrayBuffer, Promise, Uint8Array};
-use serde::de::DeserializeOwned;
-use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue};
-
use crate::{
body::{wasm::WasmStreamBody, HttpBody},
futures::SendJsFuture,
Error,
};
-
-// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream
-// because the array buffer backing them gets detached.
-#[wasm_bindgen(module = "/js/hacks.js")]
-extern "C" {
- #[wasm_bindgen(js_name = "collectBytes", catch)]
- fn collect_bytes(
- stream: &JsValue,
- ) -> Result;
-}
+use bytes::Bytes;
+use futures_util::{AsyncRead, Stream, TryStream, TryStreamExt};
+use http::HeaderMap;
+use js_sys::Uint8Array;
+use serde::de::DeserializeOwned;
+use wasm_bindgen::{JsCast, JsValue};
+use web_sys::ReadableStream;
type BoxBody = http_body::combinators::UnsyncBoxBody;
@@ -116,25 +106,6 @@ impl Body {
// performance as there's no polling overhead.
match self.0 {
BodyInner::Regular(body) => super::to_bytes(body).await,
- /*
- Working but clones all body
- BodyInner::Request(req) if req.body().is_some() => {
- // let body = req.body().unwrap();
- let promise = collect_bytes(&req.unchecked_into())?;
- let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
- let bytes = Uint8Array::new(&bytes);
-
- Ok(bytes.to_vec().into())
- }
- BodyInner::Response(res) if res.body().is_some() => {
- let promise = collect_bytes(&res.unchecked_into())?;
- let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
- let bytes = Uint8Array::new(&bytes);
-
- Ok(bytes.to_vec().into())
- }
- */
- // Failing
BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await),
BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await),
_ => Ok(Bytes::new()),
@@ -184,19 +155,14 @@ impl Body {
.and_then(|buf| serde_json::from_slice(&buf).map_err(Error::SerdeJsonError))
}
- pub(crate) fn is_none(&self) -> bool {
- match &self.0 {
- BodyInner::None => true,
- BodyInner::Regular(_) => false,
- BodyInner::Request(req) => req.body().is_none(),
- BodyInner::Response(res) => res.body().is_none(),
- }
- }
-
pub(crate) fn inner(&self) -> &BodyInner {
&self.0
}
+ pub(crate) fn into_inner(self) -> BodyInner {
+ self.0
+ }
+
/// Turns the body into a regular streaming body, if it's not already, and returns the underlying body.
fn as_inner_box_body(&mut self) -> Option<&mut BoxBody> {
match &self.0 {
@@ -211,6 +177,46 @@ impl Body {
_ => unreachable!(),
}
}
+
+ pub(crate) fn into_readable_stream(self) -> Option {
+ match self.into_inner() {
+ crate::body::BodyInner::Request(req) => req.body(),
+ crate::body::BodyInner::Response(res) => res.body(),
+ crate::body::BodyInner::Regular(s) => Some(
+ wasm_streams::ReadableStream::from_async_read(
+ crate::body::BoxBodyReader::new(s),
+ 1024,
+ )
+ .into_raw(),
+ ),
+ crate::body::BodyInner::None => None,
+ }
+ }
+
+ /// Create a `Body` using a [`Stream`](futures::stream::Stream)
+ pub fn from_stream(stream: S) -> Result
+ where
+ S: TryStream + 'static,
+ S::Ok: Into>,
+ S::Error: Into,
+ {
+ let js_stream = stream
+ .map_ok(|item| -> Vec { item.into() })
+ .map_ok(|chunk| {
+ let array = Uint8Array::new_with_length(chunk.len() as _);
+ array.copy_from(&chunk);
+
+ array.into()
+ })
+ .map_err(|err| -> crate::Error { err.into() })
+ .map_err(|e| JsValue::from(e.to_string()));
+
+ let stream = wasm_streams::ReadableStream::from_stream(js_stream);
+ let stream: ReadableStream = stream.into_raw().dyn_into().unwrap();
+
+ let edge_res = web_sys::Response::new_with_opt_readable_stream(Some(&stream))?;
+ Ok(Self::from(edge_res))
+ }
}
impl Default for Body {
@@ -313,6 +319,56 @@ impl HttpBody for Body {
}
}
+pub struct BoxBodyReader {
+ inner: BoxBody,
+ store: Vec,
+}
+
+impl BoxBodyReader {
+ pub fn new(inner: BoxBody) -> Self {
+ BoxBodyReader {
+ inner,
+ store: Vec::new(),
+ }
+ }
+}
+
+impl AsyncRead for BoxBodyReader {
+ // Required method
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll> {
+ if !self.store.is_empty() {
+ let size = self.store.len().min(buf.len());
+ buf[..size].clone_from_slice(&self.store[..size]);
+ self.store = self.store.split_off(size);
+ Poll::Ready(Ok(size))
+ } else {
+ match Pin::new(&mut self.inner).poll_data(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(opt) => match opt {
+ Some(result) => match result {
+ Ok(data) => {
+ use bytes::Buf;
+ self.store.extend_from_slice(data.chunk());
+ let size = self.store.len().min(buf.len());
+ buf[..size].clone_from_slice(&self.store[..size]);
+ self.store = self.store.split_off(size);
+ Poll::Ready(Ok(size))
+ }
+ Err(e) => {
+ Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)))
+ }
+ },
+ None => Poll::Ready(Ok(0)), // Not sure about this
+ },
+ }
+ }
+ }
+}
+
impl Stream for Body {
type Item = Result;
diff --git a/worker/src/cf.rs b/worker/src/cf.rs
index 41fe5a04..8de0054e 100644
--- a/worker/src/cf.rs
+++ b/worker/src/cf.rs
@@ -1,7 +1,3 @@
-mod properties;
-
-pub use properties::{CfProperties, MinifyConfig, PolishConfig};
-
/// In addition to the methods on the `Request` struct, the `Cf` struct on an inbound Request contains information about the request provided by Cloudflare’s edge.
///
/// [Details](https://developers.cloudflare.com/workers/runtime-apis/request#incomingrequestcfproperties)
@@ -18,6 +14,10 @@ impl Cf {
Self { inner }
}
+ pub fn inner(&self) -> &worker_sys::IncomingRequestCfProperties {
+ &self.inner
+ }
+
/// The three-letter airport code (e.g. `ATX`, `LUX`) representing
/// the colocation which processed the request
pub fn colo(&self) -> String {
@@ -29,6 +29,11 @@ impl Cf {
self.inner.asn()
}
+ /// The Autonomous System organization name of the request, e.g. `Cloudflare, Inc.`
+ pub fn as_organization(&self) -> String {
+ self.inner.as_organization()
+ }
+
/// The two-letter country code of origin for the request.
/// This is the same value as that provided in the CF-IPCountry header, e.g. `"US"`
pub fn country(&self) -> Option {
@@ -89,9 +94,7 @@ impl Cf {
/// Information about the client's authorization.
/// Only set when using Cloudflare Access or API Shield.
pub fn tls_client_auth(&self) -> Option {
- self.inner
- .tls_client_auth()
- .map(|inner| TlsClientAuth { inner })
+ self.inner.tls_client_auth().map(Into::into)
}
/// The TLS version of the connection to Cloudflare, e.g. TLSv1.3.
@@ -178,6 +181,12 @@ pub struct RequestPriority {
pub group_weight: usize,
}
+impl From for Cf {
+ fn from(inner: worker_sys::IncomingRequestCfProperties) -> Self {
+ Self { inner }
+ }
+}
+
/// Only set when using Cloudflare Access or API Shield
#[derive(Debug)]
pub struct TlsClientAuth {
@@ -236,3 +245,9 @@ impl TlsClientAuth {
self.inner.cert_subject_dn_rfc2253()
}
}
+
+impl From for TlsClientAuth {
+ fn from(inner: worker_sys::TlsClientAuth) -> Self {
+ Self { inner }
+ }
+}
diff --git a/worker/src/durable.rs b/worker/src/durable.rs
index 1c628617..cf119ca4 100644
--- a/worker/src/durable.rs
+++ b/worker/src/durable.rs
@@ -27,13 +27,13 @@ use chrono::{DateTime, Utc};
use futures_util::Future;
use js_sys::{Map, Number, Object};
use serde::{de::DeserializeOwned, Serialize};
-use wasm_bindgen::{prelude::*, JsCast};
+use wasm_bindgen::prelude::*;
+use wasm_bindgen_futures::future_to_promise;
use worker_sys::{
DurableObject as EdgeDurableObject, DurableObjectId,
DurableObjectNamespace as EdgeObjectNamespace, DurableObjectState, DurableObjectStorage,
DurableObjectTransaction,
};
-use wasm_bindgen_futures::future_to_promise;
/// A Durable Object stub is a client object used to send requests to a remote Durable Object.
pub struct Stub {
@@ -759,6 +759,7 @@ impl DurableObject for Chatroom {
pub trait DurableObject {
fn new(state: State, env: Env) -> Self;
async fn fetch(&mut self, req: http::Request) -> Result>;
+ #[allow(clippy::diverging_sub_expression)]
async fn alarm(&mut self) -> Result> {
unimplemented!("alarm() handler not implemented")
}
diff --git a/worker/src/env.rs b/worker/src/env.rs
index 57607f81..3a67ead8 100644
--- a/worker/src/env.rs
+++ b/worker/src/env.rs
@@ -6,7 +6,7 @@ use crate::Queue;
use crate::{durable::ObjectNamespace, Bucket, DynamicDispatcher, Fetcher, Result};
use js_sys::Object;
-use wasm_bindgen::{prelude::*, JsCast, JsValue};
+use wasm_bindgen::prelude::*;
use worker_kv::KvStore;
#[wasm_bindgen]
diff --git a/worker/src/fetch.rs b/worker/src/fetch.rs
index 12e9a7aa..22588d3d 100644
--- a/worker/src/fetch.rs
+++ b/worker/src/fetch.rs
@@ -5,7 +5,7 @@ use crate::{
body::Body,
futures::SendJsFuture,
http::{request, response},
- Error, Result,
+ AbortSignal, Error, RequestInit, Result,
};
/// Fetch a resource from the network.
@@ -46,6 +46,47 @@ pub async fn fetch(req: http::Request>) -> Result>,
+ init: &RequestInit,
+) -> Result> {
+ let fut = {
+ let req = req.map(Into::into);
+ let global = js_sys::global().unchecked_into::();
+
+ let req = request::into_wasm(req);
+ let promise = global.fetch_with_request_and_init(&req, &init.into());
+
+ SendJsFuture::from(promise)
+ };
+
+ fut.await
+ .map(|res| response::from_wasm(res.unchecked_into()))
+ .map_err(Error::from)
+}
+
+pub async fn fetch_with_signal(
+ req: http::Request>,
+ signal: &AbortSignal,
+) -> Result> {
+ let mut init = web_sys::RequestInit::new();
+ init.signal(Some(signal.inner()));
+
+ let fut = {
+ let req = req.map(Into::into);
+ let global = js_sys::global().unchecked_into::();
+
+ let req = request::into_wasm(req);
+ let promise = global.fetch_with_request_and_init(&req, &init);
+
+ SendJsFuture::from(promise)
+ };
+
+ fut.await
+ .map(|res| response::from_wasm(res.unchecked_into()))
+ .map_err(Error::from)
+}
+
fn _assert_send() {
use crate::futures::assert_send_value;
assert_send_value(fetch(http::Request::new(())));
diff --git a/worker/src/fetcher.rs b/worker/src/fetcher.rs
index 533d4f15..06fd0742 100644
--- a/worker/src/fetcher.rs
+++ b/worker/src/fetcher.rs
@@ -5,7 +5,7 @@ use crate::{
env::EnvBinding,
futures::SendJsFuture,
http::{request, response},
- Result,
+ RequestInit, Result,
};
/// A struct for invoking fetch events to other Workers.
@@ -16,7 +16,27 @@ unsafe impl Sync for Fetcher {}
impl Fetcher {
/// Invoke a fetch event in a worker with a url and optionally a [RequestInit].
- pub async fn fetch(&self, req: http::Request) -> Result> {
+ pub async fn fetch(
+ &self,
+ url: impl Into,
+ init: Option,
+ ) -> Result> {
+ let path = url.into();
+ let fut = {
+ let promise = match init {
+ Some(ref init) => self.0.fetch_with_str_and_init(&path, &init.into()),
+ None => self.0.fetch_with_str(&path),
+ };
+
+ SendJsFuture::from(promise)
+ };
+
+ let res = fut.await?.dyn_into()?;
+ Ok(response::from_wasm(res))
+ }
+
+ /// Invoke a fetch event with an existing [Request].
+ pub async fn fetch_request(&self, req: http::Request) -> Result> {
let fut = {
let req = request::into_wasm(req);
let promise = self.0.fetch(&req);
diff --git a/worker/src/headers.rs b/worker/src/headers.rs
new file mode 100644
index 00000000..1d97ab61
--- /dev/null
+++ b/worker/src/headers.rs
@@ -0,0 +1,183 @@
+use crate::{error::Error, Result};
+
+use std::{
+ iter::{FromIterator, Map},
+ result::Result as StdResult,
+ str::FromStr,
+};
+
+use http::{header::HeaderName, HeaderMap, HeaderValue};
+use js_sys::Array;
+use wasm_bindgen::JsValue;
+use worker_sys::ext::HeadersExt;
+
+/// A [Headers](https://developer.mozilla.org/en-US/docs/Web/API/Headers) representation used in
+/// Request and Response objects.
+pub struct Headers(pub web_sys::Headers);
+
+impl std::fmt::Debug for Headers {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str("Headers {\n")?;
+ for (k, v) in self.entries() {
+ f.write_str(&format!("{k} = {v}\n"))?;
+ }
+ f.write_str("}\n")
+ }
+}
+
+impl Headers {
+ /// Construct a new `Headers` struct.
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Returns all the values of a header within a `Headers` object with a given name.
+ /// Returns an error if the name is invalid (e.g. contains spaces)
+ pub fn get(&self, name: &str) -> Result