Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
zebp committed Nov 27, 2023
1 parent ecf2321 commit 39b3701
Show file tree
Hide file tree
Showing 8 changed files with 555 additions and 672 deletions.
1,089 changes: 461 additions & 628 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"devDependencies": {
"@types/node": "^20.3.0",
"@types/uuid": "^9.0.2",
"miniflare": "^3.0.1",
"miniflare": "^3.20230807.0",
"typescript": "^5.1.3",
"uuid": "^9.0.0",
"vitest": "^0.32.0"
Expand Down
4 changes: 2 additions & 2 deletions worker-sandbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub async fn main(
.get("/status-code", |_, _| async move {
Response::builder()
.status(http::StatusCode::IM_A_TEAPOT)
.body(Body::none())
.body(Body::empty())
.map_err(|e| Error::RustError(e.to_string()))
})
.post("/headers", |req, _| async move {
Expand Down Expand Up @@ -381,7 +381,7 @@ pub async fn main(

// Make sure that cloning a non-JS request returns none
assert!(http::Request::get("https://example.com")
.body(body::Body::none())
.body(body::Body::empty())
.unwrap()
.clone_inner()
.is_none());
Expand Down
2 changes: 1 addition & 1 deletion worker-sandbox/tests/clone.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, test, expect } from "vitest";
import { mf } from "./mf";

describe("cache", () => {
describe("clone", () => {
test("clone", async () => {
const resp = await mf.dispatchFetch("https://fake.host/clone", {
method: "POST",
Expand Down
78 changes: 39 additions & 39 deletions worker-sandbox/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,57 @@ workers_dev = true
compatibility_date = "2022-09-12" # required
compatibility_flags = ["streams_enable_constructors"]

kv_namespaces = [
{ binding = "SOME_NAMESPACE", id = "SOME_NAMESPACE", preview_id = "SOME_NAMESPACE" },
{ binding = "FILE_SIZES", id = "FILE_SIZES", preview_id = "FILE_SIZES" },
]
# kv_namespaces = [
# { binding = "SOME_NAMESPACE", id = "SOME_NAMESPACE", preview_id = "SOME_NAMESPACE" },
# { binding = "FILE_SIZES", id = "FILE_SIZES", preview_id = "FILE_SIZES" },
# ]

vars = { SOME_VARIABLE = "some value" }
# vars = { SOME_VARIABLE = "some value" }

[[services]]
binding = "remote"
service = "remote-service"
# [[services]]
# binding = "remote"
# service = "remote-service"

[miniflare.mounts]
remote-service = "./remote-service"
# [miniflare.mounts]
# remote-service = "./remote-service"

[durable_objects]
bindings = [{ name = "COUNTER", class_name = "Counter" }, { name = "ALARM", class_name = "AlarmObject" }]
# [durable_objects]
# bindings = [{ name = "COUNTER", class_name = "Counter" }, { name = "ALARM", class_name = "AlarmObject" }]

[[d1_databases]]
binding = 'DB'
database_name = 'my_db'
database_id = '.'
preview_database_id = '.'
# [[d1_databases]]
# binding = 'DB'
# database_name = 'my_db'
# database_id = '.'
# preview_database_id = '.'

[[queues.consumers]]
queue = "my_queue"
# [[queues.consumers]]
# queue = "my_queue"

[[queues.producers]]
queue = "my_queue"
binding = "my_queue"
[[r2_buckets]]
binding = 'EMPTY_BUCKET'
bucket_name = 'empty_bucket'
preview_bucket_name = 'empty_bucket'
# [[queues.producers]]
# queue = "my_queue"
# binding = "my_queue"
# [[r2_buckets]]
# binding = 'EMPTY_BUCKET'
# bucket_name = 'empty_bucket'
# preview_bucket_name = 'empty_bucket'

[[r2_buckets]]
binding = 'PUT_BUCKET'
bucket_name = 'put_bucket'
preview_bucket_name = 'put_bucket'
# [[r2_buckets]]
# binding = 'PUT_BUCKET'
# bucket_name = 'put_bucket'
# preview_bucket_name = 'put_bucket'

[[r2_buckets]]
binding = 'SEEDED_BUCKET'
bucket_name = 'seeded_bucket'
preview_bucket_name = 'seeded_bucket'
# [[r2_buckets]]
# binding = 'SEEDED_BUCKET'
# bucket_name = 'seeded_bucket'
# preview_bucket_name = 'seeded_bucket'

[[r2_buckets]]
binding = 'DELETE_BUCKET'
bucket_name = 'delete_bucket'
preview_bucket_name = 'delete_bucket'
# [[r2_buckets]]
# binding = 'DELETE_BUCKET'
# bucket_name = 'delete_bucket'
# preview_bucket_name = 'delete_bucket'

[build]
command = "worker-build --release"
command = "worker-build --dev"

[build.upload]
dir = "build/worker"
Expand Down
17 changes: 17 additions & 0 deletions worker/js/hacks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* @param {Request | Response} r
* @returns {Promise<ArrayBuffer>}
*/
export async function collectBytes(r) {
return await new Response(
r.body.pipeThrough(
new TransformStream({
transform(chunk, controller) {
console.log("TRANSFORM", chunk);
const cloned = new Uint8Array(chunk);
controller.enqueue(cloned);
},
})
)
).arrayBuffer();
}
33 changes: 32 additions & 1 deletion worker/src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@ use std::{
use bytes::Bytes;
use futures_util::Stream;
use http::HeaderMap;
use js_sys::{ArrayBuffer, Promise, Uint8Array};
use serde::de::DeserializeOwned;
use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue};

use crate::{
body::{wasm::WasmStreamBody, HttpBody},
futures::SendJsFuture,
Error,
};

// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream
// because the array buffer backing them gets detached.
#[wasm_bindgen(module = "/js/hacks.js")]
extern "C" {
#[wasm_bindgen(js_name = "collectBytes", catch)]
fn collect_bytes(
stream: &JsValue,
) -> Result<Promise, JsValue>;
}

type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>;

fn try_downcast<T, K>(k: K) -> Result<T, K>
Expand Down Expand Up @@ -103,10 +115,29 @@ impl Body {
// Check the type of the body we have. Using the `array_buffer` function on the JS types might improve
// performance as there's no polling overhead.
match self.0 {
BodyInner::None => Ok(Bytes::new()),
BodyInner::Regular(body) => super::to_bytes(body).await,
/*
Working but clones all body
BodyInner::Request(req) if req.body().is_some() => {
// let body = req.body().unwrap();
let promise = collect_bytes(&req.unchecked_into())?;
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
let bytes = Uint8Array::new(&bytes);
Ok(bytes.to_vec().into())
}
BodyInner::Response(res) if res.body().is_some() => {
let promise = collect_bytes(&res.unchecked_into())?;
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
let bytes = Uint8Array::new(&bytes);
Ok(bytes.to_vec().into())
}
*/
// Failing
BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await),
BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await),
_ => Ok(Bytes::new()),
}
}

Expand Down
2 changes: 2 additions & 0 deletions worker/src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use bytes::Buf;
use futures_util::StreamExt;
use js_sys::Uint8Array;
use wasm_bindgen::JsCast;
use worker_sys::console_log;
use worker_sys::ext::{HeadersExt, RequestExt};

use crate::{AbortSignal, Cf, CfProperties};
Expand Down

0 comments on commit 39b3701

Please sign in to comment.